Repository: ignite Updated Branches: refs/heads/master 41b742cd6 -> 54fb41517
IGNITE-3858 IGFS: Support direct PROXY mode invocation in methods: create / append. This closes #1070. This closes #1084. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a97483a4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a97483a4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a97483a4 Branch: refs/heads/master Commit: a97483a4ce2c00bd0cca025c4ef4bfa181897aa9 Parents: 0d5ee78 Author: tledkov-gridgain <[email protected]> Authored: Thu Sep 22 10:51:05 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Thu Sep 22 10:51:05 2016 +0300 ---------------------------------------------------------------------- .../igfs/IgfsAbstractOutputStream.java | 266 ++++++++++++++++ .../internal/processors/igfs/IgfsImpl.java | 27 +- .../processors/igfs/IgfsOutputStreamImpl.java | 319 ++++--------------- .../igfs/IgfsOutputStreamProxyImpl.java | 163 ++++++++++ .../igfs/IgfsAbstractBaseSelfTest.java | 2 +- 5 files changed, 518 insertions(+), 259 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a97483a4/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractOutputStream.java new file mode 100644 index 0000000..c1e751e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractOutputStream.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.events.IgfsEvent; +import org.apache.ignite.igfs.IgfsOutputStream; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +import java.io.DataInput; +import java.io.IOException; +import java.nio.ByteBuffer; + +import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_WRITE; + +/** + * Output stream to store data into grid cache with separate blocks. + */ +abstract class IgfsAbstractOutputStream extends IgfsOutputStream { + /** IGFS context. */ + protected final IgfsContext igfsCtx; + + /** Path to file. */ + protected final IgfsPath path; + + /** Buffer size. */ + protected final int bufSize; + + /** File worker batch. */ + protected final IgfsFileWorkerBatch batch; + + /** Mutex for synchronization. */ + protected final Object mux = new Object(); + + /** Flag for this stream open/closed state. */ + protected boolean closed; + + /** Local buffer to store stream data as consistent block. */ + protected ByteBuffer buf; + + /** Bytes written. */ + protected long bytes; + + /** Time consumed by write operations. */ + protected long time; + + /** + * Constructs file output stream. + * + * @param igfsCtx IGFS context. + * @param path Path to stored file. + * @param bufSize The size of the buffer to be used. + * @param batch Optional secondary file system batch. + */ + IgfsAbstractOutputStream(IgfsContext igfsCtx, IgfsPath path, int bufSize, @Nullable IgfsFileWorkerBatch batch) { + synchronized (mux) { + this.path = path; + this.bufSize = optimizeBufferSize(bufSize); + this.igfsCtx = igfsCtx; + this.batch = batch; + } + + igfsCtx.metrics().incrementFilesOpenedForWrite(); + } + + /** + * Optimize buffer size. + * + * @param bufSize Original byffer size. + * @return Optimized buffer size. + */ + protected abstract int optimizeBufferSize(int bufSize); + + /** {@inheritDoc} */ + @Override public void write(int b) throws IOException { + synchronized (mux) { + checkClosed(null, 0); + + b &= 0xFF; + + long startTime = System.nanoTime(); + + if (buf == null) + buf = allocateNewBuffer(); + + buf.put((byte)b); + + sendBufferIfFull(); + + time += System.nanoTime() - startTime; + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("NullableProblems") + @Override public void write(byte[] b, int off, int len) throws IOException { + A.notNull(b, "b"); + + if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { + throw new IndexOutOfBoundsException("Invalid bounds [data.length=" + b.length + ", offset=" + off + + ", length=" + len + ']'); + } + + synchronized (mux) { + checkClosed(null, 0); + + // Check if there is anything to write. + if (len == 0) + return; + + long startTime = System.nanoTime(); + + if (buf == null) { + if (len >= bufSize) { + // Send data right away. + ByteBuffer tmpBuf = ByteBuffer.wrap(b, off, len); + + send(tmpBuf, tmpBuf.remaining()); + } + else { + buf = allocateNewBuffer(); + + buf.put(b, off, len); + } + } + else { + // Re-allocate buffer if needed. + if (buf.remaining() < len) + buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip()); + + buf.put(b, off, len); + + sendBufferIfFull(); + } + + time += System.nanoTime() - startTime; + } + } + + /** {@inheritDoc} */ + @Override public void transferFrom(DataInput in, int len) throws IOException { + synchronized (mux) { + checkClosed(in, len); + + long startTime = System.nanoTime(); + + // Clean-up local buffer before streaming. + sendBufferIfNotEmpty(); + + // Perform transfer. + send(in, len); + + time += System.nanoTime() - startTime; + } + } + + /** + * Validate this stream is open. + * + * @param in Data input. + * @param len Data len in bytes. + * @throws IOException If this stream is closed. + */ + protected void checkClosed(@Nullable DataInput in, int len) throws IOException { + assert Thread.holdsLock(mux); + + if (closed) { + // Must read data from stream before throwing exception. + if (in != null) + in.skipBytes(len); + + throw new IOException("Stream has been closed: " + this); + } + } + + /** + * Send local buffer if it full. + * + * @throws IOException If failed. + */ + private void sendBufferIfFull() throws IOException { + if (buf.position() >= bufSize) + sendBuffer(); + } + + /** + * Send local buffer if at least something is stored there. + * + * @throws IOException If failed. + */ + void sendBufferIfNotEmpty() throws IOException { + if (buf != null && buf.position() > 0) + sendBuffer(); + } + + /** + * Send all local-buffered data to server. + * + * @throws IOException In case of IO exception. + */ + private void sendBuffer() throws IOException { + buf.flip(); + + send(buf, buf.remaining()); + + buf = null; + } + + /** + * Store data block. + * + * @param data Block. + * @param writeLen Write length. + * @throws IOException If failed. + */ + protected abstract void send(Object data, int writeLen) throws IOException; + + /** + * Allocate new buffer. + * + * @return New buffer. + */ + private ByteBuffer allocateNewBuffer() { + return ByteBuffer.allocate(bufSize); + } + + /** + * Updates IGFS metrics when the stream is closed. + */ + protected void updateMetricsOnClose() { + IgfsLocalMetrics metrics = igfsCtx.metrics(); + + metrics.addWrittenBytesTime(bytes, time); + metrics.decrementFilesOpenedForWrite(); + + GridEventStorageManager evts = igfsCtx.kernalContext().event(); + + if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_WRITE)) + evts.record(new IgfsEvent(path, igfsCtx.localNode(), + EVT_IGFS_FILE_CLOSED_WRITE, bytes)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsAbstractOutputStream.class, this); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a97483a4/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index 87a4699..bee9d9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -92,7 +92,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -274,7 +274,7 @@ public final class IgfsImpl implements IgfsEx { } dualPool = secondaryFs != null ? new IgniteThreadPoolExecutor(4, Integer.MAX_VALUE, 5000L, - new LinkedBlockingQueue<Runnable>(), new IgfsThreadFactory(cfg.getName()), null) : null; + new SynchronousQueue<Runnable>(), new IgfsThreadFactory(cfg.getName()), null) : null; } /** {@inheritDoc} */ @@ -1088,6 +1088,17 @@ public final class IgfsImpl implements IgfsEx { else dirProps = fileProps = new HashMap<>(props); + if (mode == PROXY) { + assert secondaryFs != null; + + OutputStream secondaryStream = secondaryFs.create(path, bufSize, overwrite, replication, + groupBlockSize(), props); + + IgfsFileWorkerBatch batch = newBatch(path, secondaryStream); + + return new IgfsOutputStreamProxyImpl(igfsCtx, path, info(path), bufferSize(bufSize), batch); + } + // Prepare context for DUAL mode. IgfsSecondaryFileSystemCreateContext secondaryCtx = null; @@ -1142,7 +1153,15 @@ public final class IgfsImpl implements IgfsEx { final IgfsMode mode = resolveMode(path); - IgfsFileWorkerBatch batch; + if (mode == PROXY) { + assert secondaryFs != null; + + OutputStream secondaryStream = secondaryFs.append(path, bufSize, create, props); + + IgfsFileWorkerBatch batch = newBatch(path, secondaryStream); + + return new IgfsOutputStreamProxyImpl(igfsCtx, path, info(path), bufferSize(bufSize), batch); + } if (mode != PRIMARY) { assert IgfsUtils.isDualMode(mode); @@ -1151,7 +1170,7 @@ public final class IgfsImpl implements IgfsEx { IgfsCreateResult desc = meta.appendDual(secondaryFs, path, bufSize, create); - batch = newBatch(path, desc.secondaryOutputStream()); + IgfsFileWorkerBatch batch = newBatch(path, desc.secondaryOutputStream()); return new IgfsOutputStreamImpl(igfsCtx, path, desc.info(), bufferSize(bufSize), mode, batch); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a97483a4/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java index 6dec0c1..f976242 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java @@ -18,14 +18,10 @@ package org.apache.ignite.internal.processors.igfs; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.events.IgfsEvent; import org.apache.ignite.igfs.IgfsException; import org.apache.ignite.igfs.IgfsMode; -import org.apache.ignite.igfs.IgfsOutputStream; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; -import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; @@ -35,7 +31,6 @@ import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; -import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_WRITE; import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC; import static org.apache.ignite.igfs.IgfsMode.PRIMARY; import static org.apache.ignite.igfs.IgfsMode.PROXY; @@ -43,57 +38,30 @@ import static org.apache.ignite.igfs.IgfsMode.PROXY; /** * Output stream to store data into grid cache with separate blocks. */ -class IgfsOutputStreamImpl extends IgfsOutputStream { +class IgfsOutputStreamImpl extends IgfsAbstractOutputStream { /** Maximum number of blocks in buffer. */ private static final int MAX_BLOCKS_CNT = 16; - /** IGFS context. */ - private final IgfsContext igfsCtx; - - /** Path to file. */ - private final IgfsPath path; - - /** Buffer size. */ - private final int bufSize; - /** IGFS mode. */ private final IgfsMode mode; - /** File worker batch. */ - private final IgfsFileWorkerBatch batch; - - /** Mutex for synchronization. */ - private final Object mux = new Object(); - /** Write completion future. */ private final IgniteInternalFuture<Boolean> writeFut; - /** Flag for this stream open/closed state. */ - private boolean closed; - - /** Local buffer to store stream data as consistent block. */ - private ByteBuffer buf; - - /** Bytes written. */ - private long bytes; - - /** Time consumed by write operations. */ - private long time; - /** File descriptor. */ private IgfsEntryInfo fileInfo; - /** Space in file to write data. */ - private long space; + /** Affinity written by this output stream. */ + private IgfsFileAffinityRange streamRange; + + /** Data length in remainder. */ + protected int remainderDataLen; /** Intermediate remainder to keep data. */ private byte[] remainder; - /** Data length in remainder. */ - private int remainderDataLen; - - /** Affinity written by this output stream. */ - private IgfsFileAffinityRange streamRange; + /** Space in file to write data. */ + protected long space; /** * Constructs file output stream. @@ -107,6 +75,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStream { */ IgfsOutputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int bufSize, IgfsMode mode, @Nullable IgfsFileWorkerBatch batch) { + super(igfsCtx, path, bufSize, batch); + assert fileInfo != null && fileInfo.isFile() : "Unexpected file info: " + fileInfo; assert mode != null && mode != PROXY && (mode == PRIMARY && batch == null || batch != null); @@ -115,108 +85,55 @@ class IgfsOutputStreamImpl extends IgfsOutputStream { throw new IgfsException("Failed to acquire file lock (concurrently modified?): " + path); synchronized (mux) { - this.path = path; - this.bufSize = optimizeBufferSize(bufSize, fileInfo); - this.igfsCtx = igfsCtx; this.fileInfo = fileInfo; this.mode = mode; - this.batch = batch; streamRange = initialStreamRange(fileInfo); writeFut = igfsCtx.data().writeStart(fileInfo.id()); } - - igfsCtx.metrics().incrementFilesOpenedForWrite(); } - /** {@inheritDoc} */ - @Override public void write(int b) throws IOException { - synchronized (mux) { - checkClosed(null, 0); - - b &= 0xFF; - - long startTime = System.nanoTime(); - - if (buf == null) - buf = allocateNewBuffer(); - - buf.put((byte)b); - - sendBufferIfFull(); - - time += System.nanoTime() - startTime; - } + /** + * @return Length of file. + */ + private long length() { + return fileInfo.length(); } /** {@inheritDoc} */ - @SuppressWarnings("NullableProblems") - @Override public void write(byte[] b, int off, int len) throws IOException { - A.notNull(b, "b"); - - if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { - throw new IndexOutOfBoundsException("Invalid bounds [data.length=" + b.length + ", offset=" + off + - ", length=" + len + ']'); - } - - synchronized (mux) { - checkClosed(null, 0); - - // Check if there is anything to write. - if (len == 0) - return; - - long startTime = System.nanoTime(); - - if (buf == null) { - if (len >= bufSize) { - // Send data right away. - ByteBuffer tmpBuf = ByteBuffer.wrap(b, off, len); - - send(tmpBuf, tmpBuf.remaining()); - } - else { - buf = allocateNewBuffer(); - - buf.put(b, off, len); - } - } - else { - // Re-allocate buffer if needed. - if (buf.remaining() < len) - buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip()); + @Override protected int optimizeBufferSize(int bufSize) { + assert bufSize > 0; - buf.put(b, off, len); + if (fileInfo == null) + return bufSize; - sendBufferIfFull(); - } + int blockSize = fileInfo.blockSize(); - time += System.nanoTime() - startTime; - } - } + if (blockSize <= 0) + return bufSize; - /** {@inheritDoc} */ - @Override public void transferFrom(DataInput in, int len) throws IOException { - synchronized (mux) { - checkClosed(in, len); + if (bufSize <= blockSize) + // Optimize minimum buffer size to be equal file's block size. + return blockSize; - long startTime = System.nanoTime(); + int maxBufSize = blockSize * MAX_BLOCKS_CNT; - // Clean-up local buffer before streaming. - sendBufferIfNotEmpty(); + if (bufSize > maxBufSize) + // There is no profit or optimization from larger buffers. + return maxBufSize; - // Perform transfer. - send(in, len); + if (fileInfo.length() == 0) + // Make buffer size multiple of block size (optimized for new files). + return bufSize / blockSize * blockSize; - time += System.nanoTime() - startTime; - } + return bufSize; } /** * Flushes this output stream and forces any buffered output bytes to be written out. * - * @exception IOException if an I/O error occurs. + * @throws IOException if an I/O error occurs. */ @Override public void flush() throws IOException { synchronized (mux) { @@ -250,40 +167,6 @@ class IgfsOutputStreamImpl extends IgfsOutputStream { } } - /** - * Await acknowledgments. - * - * @throws IOException If failed. - */ - private void awaitAcks() throws IOException { - try { - igfsCtx.data().awaitAllAcksReceived(fileInfo.id()); - } - catch (IgniteCheckedException e) { - throw new IOException("Failed to wait for flush acknowledge: " + fileInfo.id, e); - } - } - - /** - * Flush remainder. - * - * @throws IOException If failed. - */ - private void flushRemainder() throws IOException { - try { - if (remainder != null) { - igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0, - ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch); - - remainder = null; - remainderDataLen = 0; - } - } - catch (IgniteCheckedException e) { - throw new IOException("Failed to flush data (remainder) [path=" + path + ", space=" + space + ']', e); - } - } - /** {@inheritDoc} */ @Override public final void close() throws IOException { synchronized (mux) { @@ -355,75 +238,33 @@ class IgfsOutputStreamImpl extends IgfsOutputStream { if (err != null) throw err; - igfsCtx.metrics().addWrittenBytesTime(bytes, time); - igfsCtx.metrics().decrementFilesOpenedForWrite(); - - GridEventStorageManager evts = igfsCtx.kernalContext().event(); - - if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_WRITE)) - evts.record(new IgfsEvent(path, igfsCtx.kernalContext().discovery().localNode(), - EVT_IGFS_FILE_CLOSED_WRITE, bytes)); - } - } - - /** - * Validate this stream is open. - * - * @throws IOException If this stream is closed. - */ - private void checkClosed(@Nullable DataInput in, int len) throws IOException { - assert Thread.holdsLock(mux); - - if (closed) { - // Must read data from stream before throwing exception. - if (in != null) - in.skipBytes(len); - - throw new IOException("Stream has been closed: " + this); + updateMetricsOnClose(); } } /** - * Send local buffer if it full. - * - * @throws IOException If failed. - */ - private void sendBufferIfFull() throws IOException { - if (buf.position() >= bufSize) - sendBuffer(); - } - - /** - * Send local buffer if at least something is stored there. + * Flush remainder. * * @throws IOException If failed. */ - private void sendBufferIfNotEmpty() throws IOException { - if (buf != null && buf.position() > 0) - sendBuffer(); - } - - /** - * Send all local-buffered data to server. - * - * @throws IOException In case of IO exception. - */ - private void sendBuffer() throws IOException { - buf.flip(); + private void flushRemainder() throws IOException { + try { + if (remainder != null) { - send(buf, buf.remaining()); + remainder = igfsCtx.data().storeDataBlocks(fileInfo, length() + space, null, + 0, ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch); - buf = null; + remainder = null; + remainderDataLen = 0; + } + } + catch (IgniteCheckedException e) { + throw new IOException("Failed to flush data (remainder) [path=" + path + ", space=" + space + ']', e); + } } - /** - * Store data block. - * - * @param data Block. - * @param writeLen Write length. - * @throws IOException If failed. - */ - private void send(Object data, int writeLen) throws IOException { + /** {@inheritDoc} */ + @Override protected void send(Object data, int writeLen) throws IOException { assert Thread.holdsLock(mux); assert data instanceof ByteBuffer || data instanceof DataInput; @@ -449,20 +290,20 @@ class IgfsOutputStreamImpl extends IgfsOutputStream { } if (data instanceof ByteBuffer) - ((ByteBuffer) data).get(remainder, remainderDataLen, writeLen); + ((ByteBuffer)data).get(remainder, remainderDataLen, writeLen); else - ((DataInput) data).readFully(remainder, remainderDataLen, writeLen); + ((DataInput)data).readFully(remainder, remainderDataLen, writeLen); remainderDataLen += writeLen; } else { if (data instanceof ByteBuffer) { - remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, - remainderDataLen, (ByteBuffer) data, false, streamRange, batch); + remainder = igfsCtx.data().storeDataBlocks(fileInfo, length() + space, remainder, + remainderDataLen, (ByteBuffer)data, false, streamRange, batch); } else { - remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, - remainderDataLen, (DataInput) data, writeLen, false, streamRange, batch); + remainder = igfsCtx.data().storeDataBlocks(fileInfo, length() + space, remainder, + remainderDataLen, (DataInput)data, writeLen, false, streamRange, batch); } remainderDataLen = remainder == null ? 0 : remainder.length; @@ -474,12 +315,17 @@ class IgfsOutputStreamImpl extends IgfsOutputStream { } /** - * Allocate new buffer. + * Await acknowledgments. * - * @return New buffer. + * @throws IOException If failed. */ - private ByteBuffer allocateNewBuffer() { - return ByteBuffer.allocate(bufSize); + private void awaitAcks() throws IOException { + try { + igfsCtx.data().awaitAllAcksReceived(fileInfo.id()); + } + catch (IgniteCheckedException e) { + throw new IOException("Failed to wait for flush acknowledge: " + fileInfo.id, e); + } } /** @@ -516,41 +362,6 @@ class IgfsOutputStreamImpl extends IgfsOutputStream { return affKey == null ? null : new IgfsFileAffinityRange(off, off, affKey); } - /** - * Optimize buffer size. - * - * @param bufSize Requested buffer size. - * @param fileInfo File info. - * @return Optimized buffer size. - */ - private static int optimizeBufferSize(int bufSize, IgfsEntryInfo fileInfo) { - assert bufSize > 0; - - if (fileInfo == null) - return bufSize; - - int blockSize = fileInfo.blockSize(); - - if (blockSize <= 0) - return bufSize; - - if (bufSize <= blockSize) - // Optimize minimum buffer size to be equal file's block size. - return blockSize; - - int maxBufSize = blockSize * MAX_BLOCKS_CNT; - - if (bufSize > maxBufSize) - // There is no profit or optimization from larger buffers. - return maxBufSize; - - if (fileInfo.length() == 0) - // Make buffer size multiple of block size (optimized for new files). - return bufSize / blockSize * blockSize; - - return bufSize; - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgfsOutputStreamImpl.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/a97483a4/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamProxyImpl.java new file mode 100644 index 0000000..7b74a1f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamProxyImpl.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +import java.io.DataInput; +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * Output stream to store data into grid cache with separate blocks. + */ +class IgfsOutputStreamProxyImpl extends IgfsAbstractOutputStream { + /** File info. */ + private IgfsFile info; + + /** + * Constructs file output stream. + * + * @param igfsCtx IGFS context. + * @param path Path to stored file. + * @param info File info. + * @param bufSize The size of the buffer to be used. + * @param batch Optional secondary file system batch. + */ + IgfsOutputStreamProxyImpl(IgfsContext igfsCtx, IgfsPath path, IgfsFile info, int bufSize, + @Nullable IgfsFileWorkerBatch batch) { + super(igfsCtx, path, bufSize, batch); + + assert batch != null; + + this.info = info; + } + + /** {@inheritDoc} */ + @Override protected int optimizeBufferSize(int bufSize) { + assert bufSize > 0; + + return bufSize; + } + + /** + * Flushes this output stream and forces any buffered output bytes to be written out. + * + * @throws IOException if an I/O error occurs. + */ + @Override public void flush() throws IOException { + synchronized (mux) { + checkClosed(null, 0); + + sendBufferIfNotEmpty(); + } + } + + /** {@inheritDoc} */ + @Override public final void close() throws IOException { + synchronized (mux) { + // Do nothing if stream is already closed. + if (closed) + return; + + // Set closed flag immediately. + closed = true; + + // Flush data. + IOException err = null; + + try { + sendBufferIfNotEmpty(); + } + catch (Exception e) { + err = new IOException("Failed to flush data during stream close [path=" + path + + ", fileInfo=" + info + ']', e); + } + + // Finish batch before file unlocking to support the assertion that unlocked file batch, + // if any, must be in finishing state (e.g. append see more IgfsImpl.newBatch) + batch.finish(); + + // Finally, await secondary file system flush. + try { + batch.await(); + } + catch (IgniteCheckedException e) { + if (err == null) + err = new IOException("Failed to close secondary file system stream [path=" + path + + ", fileInfo=" + info + ']', e); + else + err.addSuppressed(e); + } + + // Throw error, if any. + if (err != null) + throw err; + + updateMetricsOnClose(); + } + } + + /** {@inheritDoc} */ + @Override protected void send(Object data, int writeLen) throws IOException { + assert Thread.holdsLock(mux); + assert data instanceof ByteBuffer || data instanceof DataInput; + + try { + // Increment metrics. + bytes += writeLen; + + byte [] dataBuf = new byte[writeLen]; + + if (data instanceof ByteBuffer) { + ByteBuffer byteBuf = (ByteBuffer)data; + + byteBuf.get(dataBuf); + } + else { + DataInput dataIn = (DataInput)data; + + try { + dataIn.readFully(dataBuf); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + if (!batch.write(dataBuf)) + throw new IgniteCheckedException("Cannot write more data to the secondary file system output " + + "stream because it was marked as closed: " + batch.path()); + else + igfsCtx.metrics().addWriteBlocks(1, 1); + + } + catch (IgniteCheckedException e) { + throw new IOException("Failed to store data into file: " + path, e); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsOutputStreamProxyImpl.class, this); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a97483a4/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractBaseSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractBaseSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractBaseSelfTest.java index 3f62cf5..14a653b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractBaseSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractBaseSelfTest.java @@ -904,7 +904,7 @@ public abstract class IgfsAbstractBaseSelfTest extends IgfsCommonAbstractTest { protected void clear(IgniteFileSystem igfs, IgfsSecondaryFileSystemTestAdapter igfsSecondary) throws Exception { clear(igfs); - if (dual) + if (mode != PRIMARY) clear(igfsSecondary); }
