IGNITE-1631: IGFS: Fixed append operation with [create=true] for DUAL modes. This closes #580.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f1995531 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f1995531 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f1995531 Branch: refs/heads/ignite-2004 Commit: f199553142ea5a459904f9e7bcef5e265b4e5d5c Parents: bbf87df Author: vozerov-gridgain <[email protected]> Authored: Thu Mar 31 13:59:11 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Thu Mar 31 13:59:11 2016 +0300 ---------------------------------------------------------------------- .../internal/processors/igfs/IgfsImpl.java | 4 +- .../processors/igfs/IgfsMetaManager.java | 315 +++++++++++-------- .../processors/igfs/IgfsAbstractSelfTest.java | 3 - 3 files changed, 183 insertions(+), 139 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f1995531/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index 358aaf0..ec3a45e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -96,13 +96,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED; -import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_RENAMED; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_READ; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_WRITE; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE; -import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_RENAMED; import static org.apache.ignite.events.EventType.EVT_IGFS_META_UPDATED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -1072,7 +1070,7 @@ public final class IgfsImpl implements IgfsEx { await(path); - IgfsSecondaryOutputStreamDescriptor desc = meta.appendDual(secondaryFs, path, bufSize); + IgfsSecondaryOutputStreamDescriptor desc = meta.appendDual(secondaryFs, path, bufSize, create); batch = newBatch(path, desc.out()); http://git-wip-us.apache.org/repos/asf/ignite/blob/f1995531/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java index a4212ba..7b1d68c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java @@ -60,6 +60,7 @@ import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.IgniteOutClosureX; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T1; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionConcurrency; @@ -514,11 +515,11 @@ public class IgfsMetaManager extends IgfsManager { * Lock the file explicitly outside of transaction. * * @param fileId File ID to lock. - * @param delete If file is being locked for delete. + * @param del If file is being locked for delete. * @return Locked file info or {@code null} if file cannot be locked or doesn't exist. * @throws IgniteCheckedException If the file with such id does not exist, or on another failure. */ - public @Nullable IgfsEntryInfo lock(IgniteUuid fileId, boolean delete) throws IgniteCheckedException { + public @Nullable IgfsEntryInfo lock(IgniteUuid fileId, boolean del) throws IgniteCheckedException { if (busyLock.enterBusy()) { try { validTxState(false); @@ -535,7 +536,7 @@ public class IgfsMetaManager extends IgfsManager { if (oldInfo.lockId() != null) return null; // The file is already locked, we cannot lock it. - IgfsEntryInfo newInfo = invokeLock(fileId, delete); + IgfsEntryInfo newInfo = invokeLock(fileId, del); tx.commit(); @@ -556,11 +557,11 @@ public class IgfsMetaManager extends IgfsManager { /** * Create file lock ID. * - * @param delete If lock ID is required for file deletion. + * @param del If lock ID is required for file deletion. * @return Lock ID. */ - private IgniteUuid createFileLockId(boolean delete) { - if (delete) + private IgniteUuid createFileLockId(boolean del) { + if (del) return IgfsUtils.DELETE_LOCK_ID; return IgniteUuid.fromUuid(locNode.id()); @@ -1736,12 +1737,12 @@ public class IgfsMetaManager extends IgfsManager { * Invoke lock processor. * * @param id File ID. - * @param delete Whether lock is taken for delete. + * @param del Whether lock is taken for delete. * @return Resulting file info. * @throws IgniteCheckedException If failed. */ - private IgfsEntryInfo invokeLock(IgniteUuid id, boolean delete) throws IgniteCheckedException { - return invokeAndGet(id, new IgfsMetaFileLockProcessor(createFileLockId(delete))); + private IgfsEntryInfo invokeLock(IgniteUuid id, boolean del) throws IgniteCheckedException { + return invokeAndGet(id, new IgfsMetaFileLockProcessor(createFileLockId(del))); } /** @@ -1786,7 +1787,7 @@ public class IgfsMetaManager extends IgfsManager { } /** - * Create the file in DUAL mode. + * A delegate method that performs file creation in the synchronization task. * * @param fs File system. * @param path Path. @@ -1797,134 +1798,162 @@ public class IgfsMetaManager extends IgfsManager { * @param replication Replication factor. * @param blockSize Block size. * @param affKey Affinity key. + * @param infos Map from paths to corresponding infos. + * @param pendingEvts A non-null collection the events are to be accumulated in. + * @param t1 A signle-object tuple to hold the created output stream. * @return Output stream descriptor. - * @throws IgniteCheckedException If file creation failed. + * @throws Exception On error. */ - public IgfsSecondaryOutputStreamDescriptor createDual(final IgfsSecondaryFileSystem fs, - final IgfsPath path, - final boolean simpleCreate, - @Nullable final Map<String, String> props, - final boolean overwrite, - final int bufSize, - final short replication, - final long blockSize, - final IgniteUuid affKey) - throws IgniteCheckedException - { - if (busyLock.enterBusy()) { - try { - assert fs != null; - assert path != null; + IgfsSecondaryOutputStreamDescriptor onSuccessCreate(IgfsSecondaryFileSystem fs, IgfsPath path, + boolean simpleCreate, @Nullable final Map<String, String> props, boolean overwrite, + int bufSize, short replication, long blockSize, IgniteUuid affKey, Map<IgfsPath, IgfsEntryInfo> infos, + final Deque<IgfsEvent> pendingEvts, final T1<OutputStream> t1) throws Exception { + validTxState(true); - // Events to fire (can be done outside of a transaction). - final Deque<IgfsEvent> pendingEvts = new LinkedList<>(); + assert !infos.isEmpty(); - SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> task = - new SynchronizationTask<IgfsSecondaryOutputStreamDescriptor>() { - /** Output stream to the secondary file system. */ - private OutputStream out; + // Determine the first existing parent. + IgfsPath parentPath = null; - @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath, - IgfsEntryInfo> infos) throws Exception { - validTxState(true); + for (IgfsPath curPath : infos.keySet()) { + if (parentPath == null || curPath.isSubDirectoryOf(parentPath)) + parentPath = curPath; + } - assert !infos.isEmpty(); + assert parentPath != null; - // Determine the first existing parent. - IgfsPath parentPath = null; + IgfsEntryInfo parentInfo = infos.get(parentPath); - for (IgfsPath curPath : infos.keySet()) { - if (parentPath == null || curPath.isSubDirectoryOf(parentPath)) - parentPath = curPath; - } + // Delegate to the secondary file system. + OutputStream out = simpleCreate ? fs.create(path, overwrite) : + fs.create(path, bufSize, overwrite, replication, blockSize, props); - assert parentPath != null; + t1.set(out); - IgfsEntryInfo parentInfo = infos.get(parentPath); + IgfsPath parent0 = path.parent(); - // Delegate to the secondary file system. - out = simpleCreate ? fs.create(path, overwrite) : - fs.create(path, bufSize, overwrite, replication, blockSize, props); + assert parent0 != null : "path.parent() is null (are we creating ROOT?): " + path; - IgfsPath parent0 = path.parent(); + // If some of the parent directories were missing, synchronize again. + if (!parentPath.equals(parent0)) { + parentInfo = synchronize(fs, parentPath, parentInfo, parent0, true, null); - assert parent0 != null : "path.parent() is null (are we creating ROOT?): " + path; + // Fire notification about missing directories creation. + if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) { + IgfsPath evtPath = parent0; - // If some of the parent directories were missing, synchronize again. - if (!parentPath.equals(parent0)) { - parentInfo = synchronize(fs, parentPath, parentInfo, parent0, true, null); + while (!parentPath.equals(evtPath)) { + pendingEvts.addFirst(new IgfsEvent(evtPath, locNode, + EventType.EVT_IGFS_DIR_CREATED)); - // Fire notification about missing directories creation. - if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) { - IgfsPath evtPath = parent0; + evtPath = evtPath.parent(); - while (!parentPath.equals(evtPath)) { - pendingEvts.addFirst(new IgfsEvent(evtPath, locNode, - EventType.EVT_IGFS_DIR_CREATED)); + assert evtPath != null; // If this fails, then ROOT does not exist. + } + } + } - evtPath = evtPath.parent(); + // Get created file info. + IgfsFile status = fs.info(path); - assert evtPath != null; // If this fails, then ROOT does not exist. - } - } - } + if (status == null) + throw fsException("Failed to open output stream to the file created in " + + "the secondary file system because it no longer exists: " + path); + else if (status.isDirectory()) + throw fsException("Failed to open output stream to the file created in " + + "the secondary file system because the path points to a directory: " + path); - // Get created file info. - IgfsFile status = fs.info(path); + IgfsEntryInfo newInfo = IgfsUtils.createFile( + IgniteUuid.randomUuid(), + status.blockSize(), + status.length(), + affKey, + createFileLockId(false), + igfsCtx.igfs().evictExclude(path, false), + status.properties(), + status.accessTime(), + status.modificationTime() + ); - if (status == null) - throw fsException("Failed to open output stream to the file created in " + - "the secondary file system because it no longer exists: " + path); - else if (status.isDirectory()) - throw fsException("Failed to open output stream to the file created in " + - "the secondary file system because the path points to a directory: " + path); + // Add new file info to the listing optionally removing the previous one. + assert parentInfo != null; - IgfsEntryInfo newInfo = IgfsUtils.createFile( - IgniteUuid.randomUuid(), - status.blockSize(), - status.length(), - affKey, - createFileLockId(false), - igfsCtx.igfs().evictExclude(path, false), - status.properties(), - status.accessTime(), - status.modificationTime() - ); + IgniteUuid oldId = putIfAbsentNonTx(parentInfo.id(), path.name(), newInfo); - // Add new file info to the listing optionally removing the previous one. - assert parentInfo != null; + if (oldId != null) { + IgfsEntryInfo oldInfo = info(oldId); - IgniteUuid oldId = putIfAbsentNonTx(parentInfo.id(), path.name(), newInfo); + assert oldInfo != null; // Otherwise cache is in inconsistent state. - if (oldId != null) { - IgfsEntryInfo oldInfo = info(oldId); + // The contact is that we cannot overwrite a file locked for writing: + if (oldInfo.lockId() != null) + throw fsException("Failed to overwrite file (file is opened for writing) [path=" + + path + ", fileId=" + oldId + ", lockId=" + oldInfo.lockId() + ']'); - assert oldInfo != null; // Otherwise cache is in inconsistent state. + id2InfoPrj.remove(oldId); // Remove the old one. + id2InfoPrj.invoke(parentInfo.id(), new IgfsMetaDirectoryListingRemoveProcessor( + path.name(), parentInfo.listing().get(path.name()).fileId())); - // The contact is that we cannot overwrite a file locked for writing: - if (oldInfo.lockId() != null) - throw fsException("Failed to overwrite file (file is opened for writing) [path=" + - path + ", fileId=" + oldId + ", lockId=" + oldInfo.lockId() + ']'); + createNewEntry(newInfo, parentInfo.id(), path.name()); // Put new one. - id2InfoPrj.remove(oldId); // Remove the old one. - id2InfoPrj.invoke(parentInfo.id(), new IgfsMetaDirectoryListingRemoveProcessor( - path.name(), parentInfo.listing().get(path.name()).fileId())); + igfsCtx.data().delete(oldInfo); + } - createNewEntry(newInfo, parentInfo.id(), path.name()); // Put new one. + // Record CREATE event if needed. + if (oldId == null && evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED)) + pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_CREATED)); - igfsCtx.data().delete(oldInfo); - } + return new IgfsSecondaryOutputStreamDescriptor(newInfo, out); + } - // Record CREATE event if needed. - if (oldId == null && evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED)) - pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_CREATED)); + /** + * Create the file in DUAL mode. + * + * @param fs File system. + * @param path Path. + * @param simpleCreate "Simple create" flag. + * @param props Properties.. + * @param overwrite Overwrite flag. + * @param bufSize Buffer size. + * @param replication Replication factor. + * @param blockSize Block size. + * @param affKey Affinity key. + * @return Output stream descriptor. + * @throws IgniteCheckedException If file creation failed. + */ + public IgfsSecondaryOutputStreamDescriptor createDual(final IgfsSecondaryFileSystem fs, + final IgfsPath path, + final boolean simpleCreate, + @Nullable final Map<String, String> props, + final boolean overwrite, + final int bufSize, + final short replication, + final long blockSize, + final IgniteUuid affKey) + throws IgniteCheckedException + { + if (busyLock.enterBusy()) { + try { + assert fs != null; + assert path != null; + + // Events to fire (can be done outside of a transaction). + final Deque<IgfsEvent> pendingEvts = new LinkedList<>(); + + SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> task = + new SynchronizationTask<IgfsSecondaryOutputStreamDescriptor>() { + /** Container for the secondary file system output stream. */ + private final T1<OutputStream> outT1 = new T1<>(null); - return new IgfsSecondaryOutputStreamDescriptor(newInfo, out); + @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath, + IgfsEntryInfo> infos) throws Exception { + return onSuccessCreate(fs, path, simpleCreate, props, + overwrite, bufSize, replication, blockSize, affKey, infos, pendingEvts, outT1); } @Override public IgfsSecondaryOutputStreamDescriptor onFailure(Exception err) throws IgniteCheckedException { - U.closeQuiet(out); + U.closeQuiet(outT1.get()); U.error(log, "File create in DUAL mode failed [path=" + path + ", simpleCreate=" + simpleCreate + ", props=" + props + ", overwrite=" + overwrite + ", bufferSize=" + @@ -1957,20 +1986,24 @@ public class IgfsMetaManager extends IgfsManager { * @param fs File system. * @param path Path. * @param bufSize Buffer size. + * @param create Create flag. * @return Output stream descriptor. * @throws IgniteCheckedException If output stream open for append has failed. */ - public IgfsSecondaryOutputStreamDescriptor appendDual(final IgfsSecondaryFileSystem fs, final IgfsPath path, - final int bufSize) throws IgniteCheckedException { + IgfsSecondaryOutputStreamDescriptor appendDual(final IgfsSecondaryFileSystem fs, final IgfsPath path, + final int bufSize, final boolean create) throws IgniteCheckedException { if (busyLock.enterBusy()) { try { assert fs != null; assert path != null; + // Events to fire (can be done outside of a transaction). + final Deque<IgfsEvent> pendingEvts = new LinkedList<>(); + SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> task = new SynchronizationTask<IgfsSecondaryOutputStreamDescriptor>() { - /** Output stream to the secondary file system. */ - private OutputStream out; + /** Container for the secondary file system output stream. */ + private final T1<OutputStream> outT1 = new T1<>(null); @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath, IgfsEntryInfo> infos) throws Exception { @@ -1978,55 +2011,71 @@ public class IgfsMetaManager extends IgfsManager { final IgfsEntryInfo info = infos.get(path); - if (info.isDirectory()) - throw fsException("Failed to open output stream to the file in the " + - "secondary file system because the path points to a directory: " + path); + final IgfsEntryInfo lockedInfo; - out = fs.append(path, bufSize, false, null); + if (info == null) + return onSuccessCreate(fs, path, true/*simpleCreate*/, null, + false/*overwrite*/, bufSize, (short)0, 0, null, infos, pendingEvts, outT1); + else { + if (info.isDirectory()) + throw fsException("Failed to open output stream to the file in the " + + "secondary file system because the path points to a directory: " + path); + + outT1.set(fs.append(path, bufSize, false, null)); - // Synchronize file ending. - long len = info.length(); - int blockSize = info.blockSize(); + // Synchronize file ending. + long len = info.length(); + int blockSize = info.blockSize(); - int remainder = (int)(len % blockSize); + int remainder = (int) (len % blockSize); - if (remainder > 0) { - int blockIdx = (int)(len / blockSize); + if (remainder > 0) { + int blockIdx = (int) (len / blockSize); - try (IgfsSecondaryFileSystemPositionedReadable reader = fs.open(path, bufSize)) { - IgniteInternalFuture<byte[]> fut = - igfsCtx.data().dataBlock(info, path, blockIdx, reader); + try (IgfsSecondaryFileSystemPositionedReadable reader = fs.open(path, bufSize)) { + IgniteInternalFuture<byte[]> fut = + igfsCtx.data().dataBlock(info, path, blockIdx, reader); - assert fut != null; + assert fut != null; - fut.get(); + fut.get(); + } } - } - if (info.lockId() != null) { - throw fsException("Failed to open file (file is opened for writing) [path=" + - path + ", fileId=" + info.id() + ", lockId=" + info.lockId() + ']'); + if (info.lockId() != null) { + throw fsException("Failed to open file (file is opened for writing) [path=" + + path + ", fileId=" + info.id() + ", lockId=" + info.lockId() + ']'); + } + + // Set lock and return. + lockedInfo = invokeLock(info.id(), false); } - // Set lock and return. - IgfsEntryInfo lockedInfo = invokeLock(info.id(), false); + if (evts.isRecordable(EventType.EVT_IGFS_FILE_OPENED_WRITE)) + pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_OPENED_WRITE)); - return new IgfsSecondaryOutputStreamDescriptor(lockedInfo, out); + return new IgfsSecondaryOutputStreamDescriptor(lockedInfo, outT1.get()); } @Override public IgfsSecondaryOutputStreamDescriptor onFailure(@Nullable Exception err) throws IgniteCheckedException { - U.closeQuiet(out); + U.closeQuiet(outT1.get()); U.error(log, "File append in DUAL mode failed [path=" + path + ", bufferSize=" + bufSize + ']', err); throw new IgniteCheckedException("Failed to append to the file due to secondary file " + "system exception: " + path, err); - } - }; + } + }; - return synchronizeAndExecute(task, fs, true, path); + try { + return synchronizeAndExecute(task, fs, !create/*strict*/, path); + } + finally { + for (IgfsEvent evt : pendingEvts) + evts.record(evt); + } } finally { busyLock.leaveBusy(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f1995531/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java index b361d42..e4d39d6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java @@ -1543,9 +1543,6 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { * @throws Exception If failed. */ public void testAppend() throws Exception { - if (dual) - fail("Test fails in DUAL modes, see https://issues.apache.org/jira/browse/IGNITE-1631"); - create(igfs, paths(DIR, SUBDIR), null); assert igfs.exists(SUBDIR);
