IGNITE-2860: IGFS: Reworked base meta operations.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ebf40752 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ebf40752 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ebf40752 Branch: refs/heads/ignite-1786 Commit: ebf40752854cb12c5c6202ecb8546a0090482ad6 Parents: 3e53f17 Author: vozerov-gridgain <[email protected]> Authored: Fri Mar 18 16:38:45 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Fri Mar 18 16:38:45 2016 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/igfs/IgfsPath.java | 9 + .../internal/processors/igfs/IgfsImpl.java | 59 +- .../processors/igfs/IgfsMetaManager.java | 1058 +++++++++--------- .../internal/processors/igfs/IgfsPathIds.java | 291 +++++ .../processors/igfs/IgfsPathsCreateResult.java | 77 ++ .../internal/processors/igfs/IgfsUtils.java | 23 +- .../processors/igfs/IgfsAbstractSelfTest.java | 68 +- .../igfs/IgfsMetaManagerSelfTest.java | 31 +- .../processors/igfs/IgfsProcessorSelfTest.java | 26 +- .../processors/igfs/IgfsStartCacheTest.java | 9 +- .../apache/ignite/igfs/IgfsEventsTestSuite.java | 10 +- 11 files changed, 1039 insertions(+), 622 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf40752/modules/core/src/main/java/org/apache/ignite/igfs/IgfsPath.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsPath.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsPath.java index fb0621c..bbb4efb 100644 --- a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsPath.java +++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsPath.java @@ -159,6 +159,15 @@ public final class IgfsPath implements Comparable<IgfsPath>, Externalizable { } /** + * Get components in array form. + * + * @return Components array. + */ + public String[] componentsArray() { + return path.length() == 1 ? new String[0] : path.substring(1).split(SLASH); + } + + /** * Returns the parent of a path or {@code null} if at root. * * @return The parent of a path or {@code null} if at root. http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf40752/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index 3065427..9ec583c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -17,24 +17,6 @@ package org.apache.ignite.internal.processors.igfs; -import java.io.IOException; -import java.io.OutputStream; -import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -94,6 +76,25 @@ import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED; import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_RENAMED; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_READ; @@ -1033,8 +1034,15 @@ public final class IgfsImpl implements IgfsEx { else dirProps = fileProps = new HashMap<>(props); - IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = meta.create(path, false/*append*/, overwrite, dirProps, - cfg.getBlockSize(), affKey, evictExclude(path, true), fileProps); + IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = meta.create( + path, + dirProps, + overwrite, + cfg.getBlockSize(), + affKey, + evictExclude(path, true), + fileProps + ); assert t2 != null; @@ -1104,8 +1112,15 @@ public final class IgfsImpl implements IgfsEx { else dirProps = fileProps = new HashMap<>(props); - IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = meta.create(path, true/*append*/, false/*overwrite*/, - dirProps, cfg.getBlockSize(), null/*affKey*/, evictExclude(path, true), fileProps); + IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = meta.append( + path, + dirProps, + create, + cfg.getBlockSize(), + null/*affKey*/, + evictExclude(path, true), + fileProps + ); assert t2 != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf40752/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java index b4774f2..d91b0bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java @@ -17,31 +17,6 @@ package org.apache.ignite.internal.processors.igfs; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Deque; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.CountDownLatch; -import javax.cache.processor.EntryProcessor; -import javax.cache.processor.EntryProcessorException; -import javax.cache.processor.MutableEntry; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteInterruptedException; @@ -82,16 +57,36 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_CREATED; -import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_RENAMED; -import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CREATED; -import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_RENAMED; -import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; + import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.builder; -import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; -import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; /** * Cache based structure (meta data) manager. @@ -334,6 +329,51 @@ public class IgfsMetaManager extends IgfsManager { } /** + * Gets all file IDs for components of specified path. Result cannot be empty - there is at least root element. + * But each element (except the first) can be {@code null} if such files don't exist. + * + * @param path Path. + * @return Collection of file IDs for components of specified path. + * @throws IgniteCheckedException If failed. + */ + public IgfsPathIds pathIds(IgfsPath path) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + validTxState(false); + + // Prepare parts. + String[] components = path.componentsArray(); + + String[] parts = new String[components.length + 1]; + + System.arraycopy(components, 0, parts, 1, components.length); + + // Prepare IDs. + IgniteUuid[] ids = new IgniteUuid[parts.length]; + + ids[0] = IgfsUtils.ROOT_ID; + + for (int i = 1; i < ids.length; i++) { + IgniteUuid id = fileId(ids[i - 1], parts[i], false); + + if (id != null) + ids[i] = id; + else + break; + } + + // Return. + return new IgfsPathIds(path, parts, ids); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get file IDS because Grid is stopping: " + path); + } + + /** * Gets all file IDs for components of specified path possibly skipping existing transaction. Result cannot * be empty - there is at least root element. But each element (except the first) can be {@code null} if such * files don't exist. @@ -823,107 +863,84 @@ public class IgfsMetaManager extends IgfsManager { try { validTxState(false); - // 1. First get source and destination path IDs. - List<IgniteUuid> srcPathIds = fileIds(srcPath); - List<IgniteUuid> dstPathIds = fileIds(dstPath); + // Prepare path IDs. + IgfsPathIds srcPathIds = pathIds(srcPath); + IgfsPathIds dstPathIds = pathIds(dstPath); - final Set<IgniteUuid> allIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR); + // Source path must exists. + if (!srcPathIds.allExists()) + throw new IgfsPathNotFoundException("Failed to perform move because source path is not " + + "found: " + srcPath); - allIds.addAll(srcPathIds); + // At this point we need to understand name of resulting entry. It will be either destination leaf + // or source leaf depending on existence. + String dstName; - final IgniteUuid dstLeafId = dstPathIds.get(dstPathIds.size() - 1); + if (dstPathIds.lastExists()) + // Full destination path exists -> use source name. + dstName = srcPathIds.lastPart(); + else { + if (dstPathIds.lastParentExists()) { + // Destination path doesn't exists -> use destination name. + dstName = dstPathIds.lastPart(); - if (dstLeafId == null) { - // Delete null entry for the unexisting destination element: - dstPathIds.remove(dstPathIds.size() - 1); + dstPathIds = dstPathIds.parent(); + } + else + // Destination parent is not found either -> exception. + throw new IgfsPathNotFoundException("Failed to perform move because destination path is not " + + "found: " + dstPath.parent()); } - allIds.addAll(dstPathIds); + // Lock participating IDs. + final Set<IgniteUuid> lockIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR); - if (allIds.remove(null)) { - throw new IgfsPathNotFoundException("Failed to perform move because some path component was " + - "not found. [src=" + srcPath + ", dst=" + dstPath + ']'); - } + srcPathIds.addExistingIds(lockIds); + dstPathIds.addExistingIds(lockIds); - // 2. Start transaction. IgniteInternalTx tx = startTx(); try { - // 3. Obtain the locks. - final Map<IgniteUuid, IgfsFileInfo> allInfos = lockIds(allIds); + // Obtain the locks. + final Map<IgniteUuid, IgfsFileInfo> lockInfos = lockIds(lockIds); - // 4. Verify integrity of source directory. - if (!verifyPathIntegrity(srcPath, srcPathIds, allInfos)) { + // Verify integrity of source and destination paths. + if (!srcPathIds.verifyIntegrity(lockInfos)) throw new IgfsPathNotFoundException("Failed to perform move because source directory " + "structure changed concurrently [src=" + srcPath + ", dst=" + dstPath + ']'); - } - - // 5. Verify integrity of destination directory. - final IgfsPath dstDirPath = dstLeafId != null ? dstPath : dstPath.parent(); - if (!verifyPathIntegrity(dstDirPath, dstPathIds, allInfos)) { + if (!dstPathIds.verifyIntegrity(lockInfos)) throw new IgfsPathNotFoundException("Failed to perform move because destination directory " + "structure changed concurrently [src=" + srcPath + ", dst=" + dstPath + ']'); - } - - // 6. Calculate source and destination targets which will be changed. - IgniteUuid srcTargetId = srcPathIds.get(srcPathIds.size() - 2); - IgfsFileInfo srcTargetInfo = allInfos.get(srcTargetId); - String srcName = srcPath.name(); - - IgniteUuid dstTargetId; - IgfsFileInfo dstTargetInfo; - String dstName; - if (dstLeafId != null) { - // Destination leaf exists. Check if it is an empty directory. - IgfsFileInfo dstLeafInfo = allInfos.get(dstLeafId); - - assert dstLeafInfo != null; - - if (dstLeafInfo.isDirectory()) { - // Destination is a directory. - dstTargetId = dstLeafId; - dstTargetInfo = dstLeafInfo; - dstName = srcPath.name(); - } - else { - // Error, destination is existing file. - throw new IgfsPathAlreadyExistsException("Failed to perform move " + - "because destination points to " + - "existing file [src=" + srcPath + ", dst=" + dstPath + ']'); - } - } - else { - // Destination leaf doesn't exist, so we operate on parent. - dstTargetId = dstPathIds.get(dstPathIds.size() - 1); - dstTargetInfo = allInfos.get(dstTargetId); - dstName = dstPath.name(); - } + // Addiional check: is destination directory? + IgfsFileInfo dstParentInfo = lockInfos.get(dstPathIds.lastId()); - assert dstTargetInfo != null; - assert dstTargetInfo.isDirectory(); + if (dstParentInfo.isFile()) + throw new IgfsPathAlreadyExistsException("Failed to perform move because destination points " + + "to existing file [src=" + srcPath + ", dst=" + dstPath + ']'); - // 7. Last check: does destination target already have listing entry with the same name? - if (dstTargetInfo.hasChild(dstName)) { + // Additional check: does destination already has child with the same name? + if (dstParentInfo.hasChild(dstName)) throw new IgfsPathAlreadyExistsException("Failed to perform move because destination already " + "contains entry with the same name existing file [src=" + srcPath + ", dst=" + dstPath + ']'); - } - // 8. Actual move: remove from source parent and add to destination target. - IgfsListingEntry entry = srcTargetInfo.listing().get(srcName); + // Actual move: remove from source parent and add to destination target. + IgfsFileInfo srcParentInfo = lockInfos.get(srcPathIds.lastParentId()); - transferEntry(entry, srcTargetId, srcName, dstTargetId, dstName); + IgfsFileInfo srcInfo = lockInfos.get(srcPathIds.lastId()); + String srcName = srcPathIds.lastPart(); + IgfsListingEntry srcEntry = srcParentInfo.listing().get(srcName); - tx.commit(); + transferEntry(srcEntry, srcParentInfo.id(), srcName, dstParentInfo.id(), dstName); - IgfsPath realNewPath = new IgfsPath(dstDirPath, dstName); + tx.commit(); - IgfsFileInfo moved = allInfos.get(srcPathIds.get(srcPathIds.size() - 1)); + IgfsPath newPath = new IgfsPath(dstPathIds.path(), dstName); // Set the new path to the info to simplify event creation: - return IgfsFileInfo.builder(moved).path(realNewPath).build(); + return IgfsFileInfo.builder(srcInfo).path(newPath).build(); } finally { tx.close(); @@ -1117,72 +1134,57 @@ public class IgfsMetaManager extends IgfsManager { try { validTxState(false); - final SortedSet<IgniteUuid> allIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR); + IgfsPathIds pathIds = pathIds(path); - List<IgniteUuid> pathIdList = fileIds(path); - - assert pathIdList.size() > 1; + // Continue only if the whole path present. + if (!pathIds.allExists()) + return null; // A fragment of the path no longer exists. - final IgniteUuid victimId = pathIdList.get(pathIdList.size() - 1); + IgniteUuid victimId = pathIds.lastId(); + String victimName = pathIds.lastPart(); - assert !IgfsUtils.isRootOrTrashId(victimId) : "Cannot delete root or trash directories."; + if (IgfsUtils.isRootId(victimId)) + throw new IgfsException("Cannot remove root directory"); - allIds.addAll(pathIdList); + // Prepare IDs to lock. + SortedSet<IgniteUuid> allIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR); - if (allIds.remove(null)) - return null; // A fragment of the path no longer exists. + pathIds.addExistingIds(allIds); IgniteUuid trashId = IgfsUtils.randomTrashId(); - boolean added = allIds.add(trashId); - assert added; + allIds.add(trashId); - final IgniteInternalTx tx = startTx(); + IgniteInternalTx tx = startTx(); try { - final Map<IgniteUuid, IgfsFileInfo> infoMap = lockIds(allIds); + // Lock participants. + Map<IgniteUuid, IgfsFileInfo> lockInfos = lockIds(allIds); - // Directory starure was changed concurrently, so the original path no longer exists: - if (!verifyPathIntegrity(path, pathIdList, infoMap)) + // Ensure that all participants are still in place. + if (!pathIds.verifyIntegrity(lockInfos)) return null; - final IgfsFileInfo victimInfo = infoMap.get(victimId); + IgfsFileInfo victimInfo = lockInfos.get(victimId); + // Cannot delete non-empty directory if recursive flag is not set. if (!recursive && victimInfo.hasChildren()) - // Throw exception if not empty and not recursive. throw new IgfsDirectoryNotEmptyException("Failed to remove directory (directory is not " + "empty and recursive flag is not set)."); - IgfsFileInfo destInfo = infoMap.get(trashId); - - assert destInfo != null; + // Prepare trash data. + IgfsFileInfo trashInfo = lockInfos.get(trashId); + final String trashName = victimId.toString(); - final String srcFileName = path.name(); + assert !trashInfo.hasChild(trashName) : "Failed to add file name into the " + + "destination directory (file already exists) [destName=" + trashName + ']'; - final String destFileName = victimId.toString(); + IgniteUuid parentId = pathIds.lastParentId(); + IgfsFileInfo parentInfo = lockInfos.get(parentId); - assert !destInfo.hasChild(destFileName) : "Failed to add file name into the " + - "destination directory (file already exists) [destName=" + destFileName + ']'; - - IgfsFileInfo srcParentInfo = infoMap.get(pathIdList.get(pathIdList.size() - 2)); - - assert srcParentInfo != null; - - IgniteUuid srcParentId = srcParentInfo.id(); - assert srcParentId.equals(pathIdList.get(pathIdList.size() - 2)); - - IgfsListingEntry srcEntry = srcParentInfo.listing().get(srcFileName); - - assert srcEntry != null : "Deletion victim not found in parent listing [path=" + path + - ", name=" + srcFileName + ", listing=" + srcParentInfo.listing() + ']'; - - assert victimId.equals(srcEntry.fileId()); - - transferEntry(srcEntry, srcParentId, srcFileName, trashId, destFileName); + transferEntry(parentInfo.listing().get(victimName), parentId, victimName, trashId, trashName); if (victimInfo.isFile()) - // Update a file info of the removed file with a file path, - // which will be used by delete worker for event notifications. invokeUpdatePath(victimId, path); tx.commit(); @@ -1647,74 +1649,57 @@ public class IgfsMetaManager extends IgfsManager { * * @param path The path to create. * @param props The properties to use for created directories. - * @return True iff a directory was created during the operation. + * @return True if a directory was created during the operation. * @throws IgniteCheckedException If a non-directory file exists on the requested path, and in case of other errors. */ boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException { - assert props != null; validTxState(false); - DirectoryChainBuilder b = null; - while (true) { if (busyLock.enterBusy()) { try { - b = new DirectoryChainBuilder(path, props); + // Prepare path IDs. + IgfsPathIds pathIds = pathIds(path); + + // Prepare lock IDs. Essentially, they consist of two parts: existing IDs and potential new IDs. + Set<IgniteUuid> lockIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR); + + pathIds.addExistingIds(lockIds); + pathIds.addSurrogateIds(lockIds); + + assert lockIds.size() == pathIds.count(); // Start TX. IgniteInternalTx tx = startTx(); try { - final Map<IgniteUuid, IgfsFileInfo> lockedInfos = lockIds(b.idSet); + final Map<IgniteUuid, IgfsFileInfo> lockInfos = lockIds(lockIds); - // If the path was changed, we close the current Tx and repeat the procedure again - // starting from taking the path ids. - if (verifyPathIntegrity(b.existingPath, b.idList, lockedInfos)) { - // Locked path okay, trying to proceed with the remainder creation. - IgfsFileInfo lowermostExistingInfo = lockedInfos.get(b.lowermostExistingId); + if (!pathIds.verifyIntegrity(lockInfos)) + // Directory structure changed concurrently. So we simply re-try. + continue; - // Check only the lowermost directory in the existing directory chain - // because others are already checked in #verifyPathIntegrity() above. - if (!lowermostExistingInfo.isDirectory()) + // Check if the whole structure is already in place. + if (pathIds.allExists()) { + if (lockInfos.get(pathIds.lastExistingId()).isDirectory()) + return false; + else throw new IgfsParentNotDirectoryException("Failed to create directory (parent " + "element is not a directory)"); + } - if (b.existingIdCnt == b.components.size() + 1) { - assert b.existingPath.equals(path); - assert lockedInfos.size() == b.existingIdCnt; - - // The target directory already exists, nothing to do. - // (The fact that all the path consisns of directories is already checked above). - // Note that properties are not updated in this case. - return false; - } - - Map<String, IgfsListingEntry> parentListing = lowermostExistingInfo.listing(); - - String shortName = b.components.get(b.existingIdCnt - 1); - - IgfsListingEntry entry = parentListing.get(shortName); + IgfsPathsCreateResult res = createDirectory(pathIds, lockInfos, props); - if (entry == null) { - b.doBuild(); + if (res == null) + continue; - tx.commit(); + // Commit TX. + tx.commit(); - break; - } - else { - // Another thread created file or directory with the same name. - if (!entry.isDirectory()) { - // Entry exists, and it is not a directory: - throw new IgfsParentNotDirectoryException("Failed to create directory (parent " + - "element is not a directory)"); - } + generateCreateEvents(res.createdPaths(), false); - // If this is a directory, we continue the repeat loop, - // because we cannot lock this directory without - // lock ordering rule violation. - } - } + // We are done. + return true; } finally { tx.close(); @@ -1727,12 +1712,6 @@ public class IgfsMetaManager extends IgfsManager { else throw new IllegalStateException("Failed to mkdir because Grid is stopping. [path=" + path + ']'); } - - assert b != null; - - b.sendEvents(); - - return true; } /** @@ -1960,11 +1939,12 @@ public class IgfsMetaManager extends IgfsManager { parentInfo = synchronize(fs, parentPath, parentInfo, parent0, true, null); // Fire notification about missing directories creation. - if (evts.isRecordable(EVT_IGFS_DIR_CREATED)) { + if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) { IgfsPath evtPath = parent0; while (!parentPath.equals(evtPath)) { - pendingEvts.addFirst(new IgfsEvent(evtPath, locNode, EVT_IGFS_DIR_CREATED)); + pendingEvts.addFirst(new IgfsEvent(evtPath, locNode, + EventType.EVT_IGFS_DIR_CREATED)); evtPath = evtPath.parent(); @@ -2010,8 +1990,8 @@ public class IgfsMetaManager extends IgfsManager { } // Record CREATE event if needed. - if (evts.isRecordable(EVT_IGFS_FILE_CREATED)) - pendingEvts.add(new IgfsEvent(path, locNode, EVT_IGFS_FILE_CREATED)); + if (evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED)) + pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_CREATED)); return new IgfsSecondaryOutputStreamDescriptor(parentInfo.id(), newInfo, out); } @@ -2285,11 +2265,11 @@ public class IgfsMetaManager extends IgfsManager { synchronize(fs, parentPath, parentPathInfo, path, true, null); - if (evts.isRecordable(EVT_IGFS_DIR_CREATED)) { + if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) { IgfsPath evtPath = path; while (!parentPath.equals(evtPath)) { - pendingEvts.addFirst(new IgfsEvent(evtPath, locNode, EVT_IGFS_DIR_CREATED)); + pendingEvts.addFirst(new IgfsEvent(evtPath, locNode, EventType.EVT_IGFS_DIR_CREATED)); evtPath = evtPath.parent(); @@ -2386,15 +2366,15 @@ public class IgfsMetaManager extends IgfsManager { // Record event if needed. if (srcInfo.isFile()) { - if (evts.isRecordable(EVT_IGFS_FILE_RENAMED)) + if (evts.isRecordable(EventType.EVT_IGFS_FILE_RENAMED)) pendingEvts.add(new IgfsEvent( src, destInfo == null ? dest : new IgfsPath(dest, src.name()), locNode, - EVT_IGFS_FILE_RENAMED)); + EventType.EVT_IGFS_FILE_RENAMED)); } - else if (evts.isRecordable(EVT_IGFS_DIR_RENAMED)) - pendingEvts.add(new IgfsEvent(src, dest, locNode, EVT_IGFS_DIR_RENAMED)); + else if (evts.isRecordable(EventType.EVT_IGFS_DIR_RENAMED)) + pendingEvts.add(new IgfsEvent(src, dest, locNode, EventType.EVT_IGFS_DIR_RENAMED)); return true; } @@ -2896,7 +2876,7 @@ public class IgfsMetaManager extends IgfsManager { * @return Transaction. */ private IgniteInternalTx startTx() { - return metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ); + return metaCache.txStartEx(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ); } /** @@ -3178,6 +3158,14 @@ public class IgfsMetaManager extends IgfsManager { private IgfsListingEntry entry; /** + * Empty constructor required for {@link Externalizable}. + * + */ + public ListingAddProcessor() { + // No-op. + } + + /** * Constructs update directory listing closure. * * @param fileName File name to add into parent listing. @@ -3191,19 +3179,10 @@ public class IgfsMetaManager extends IgfsManager { this.entry = entry; } - /** - * Empty constructor required for {@link Externalizable}. - * - */ - public ListingAddProcessor() { - // No-op. - } - /** {@inheritDoc} */ @Override public Void process(MutableEntry<IgniteUuid, IgfsFileInfo> e, Object... args) { IgfsFileInfo fileInfo = e.getValue(); - assert fileInfo != null : "File info not found for the child: " + entry.fileId(); assert fileInfo.isDirectory(); Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing()); @@ -3240,6 +3219,73 @@ public class IgfsMetaManager extends IgfsManager { } /** + * Listing replace processor. + */ + private static final class ListingReplaceProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>, + Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Name. */ + private String name; + + /** New ID. */ + private IgniteUuid id; + + /** + * Constructor. + */ + public ListingReplaceProcessor() { + // No-op. + } + + /** + * Constructor. + * + * @param name Name. + * @param id ID. + */ + public ListingReplaceProcessor(String name, IgniteUuid id) { + this.name = name; + this.id = id; + } + + /** {@inheritDoc} */ + @Override public Void process(MutableEntry<IgniteUuid, IgfsFileInfo> e, Object... args) + throws EntryProcessorException { + IgfsFileInfo fileInfo = e.getValue(); + + assert fileInfo.isDirectory(); + + Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing()); + + // Modify listing in-place. + IgfsListingEntry oldEntry = listing.get(name); + + if (oldEntry == null) + throw new IgniteException("Directory listing doesn't contain expected entry: " + name); + + listing.put(name, new IgfsListingEntry(id, oldEntry.isDirectory())); + + e.setValue(new IgfsFileInfo(listing, fileInfo)); + + return null; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, name); + out.writeObject(id); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + name = U.readString(in); + id = (IgniteUuid)in.readObject(); + } + } + + /** * Update path closure. */ @GridInternal @@ -3291,170 +3337,92 @@ public class IgfsMetaManager extends IgfsManager { } /** - * Create a new file. + * Append routine. * * @param path Path. - * @param bufSize Buffer size. - * @param overwrite Overwrite flag. + * @param dirProps Directory properties. + * @param create Create flag. + * @param blockSize Block size. * @param affKey Affinity key. - * @param replication Replication factor. - * @param props Properties. - * @param simpleCreate Whether new file should be created in secondary FS using create(Path, boolean) method. - * @return Tuple containing the created file info and its parent id. + * @param evictExclude Evict exclude flag. + * @param fileProps File properties. + * @return Tuple containing the file info and its parent id. + * @throws IgniteCheckedException If failed. */ - IgniteBiTuple<IgfsFileInfo, IgniteUuid> create( + IgniteBiTuple<IgfsFileInfo, IgniteUuid> append( final IgfsPath path, - final boolean append, - final boolean overwrite, Map<String, String> dirProps, + final boolean create, final int blockSize, final @Nullable IgniteUuid affKey, final boolean evictExclude, @Nullable Map<String, String> fileProps) throws IgniteCheckedException { validTxState(false); - assert path != null; - - final String name = path.name(); - - DirectoryChainBuilder b = null; - - IgniteUuid trashId = IgfsUtils.randomTrashId(); - while (true) { if (busyLock.enterBusy()) { try { - b = new DirectoryChainBuilder(path, dirProps, fileProps, blockSize, affKey, evictExclude); - - // Start Tx: - IgniteInternalTx tx = startTx(); - - try { - if (overwrite) - // Lock also the TRASH directory because in case of overwrite we - // may need to delete the old file: - b.idSet.add(trashId); - - final Map<IgniteUuid, IgfsFileInfo> lockedInfos = lockIds(b.idSet); - - assert !overwrite || lockedInfos.get(trashId) != null; // TRASH must exist at this point. - - // If the path was changed, we close the current Tx and repeat the procedure again - // starting from taking the path ids. - if (verifyPathIntegrity(b.existingPath, b.idList, lockedInfos)) { - // Locked path okay, trying to proceed with the remainder creation. - final IgfsFileInfo lowermostExistingInfo = lockedInfos.get(b.lowermostExistingId); + // Prepare path IDs. + IgfsPathIds pathIds = pathIds(path); - if (b.existingIdCnt == b.components.size() + 1) { - // Full requestd path exists. + // Fail-fast: create flag is not specified and some paths are missing. + if (!pathIds.allExists() && !create) + throw new IgfsPathNotFoundException("Failed to append because file is not found: " + path); - assert b.existingPath.equals(path); - assert lockedInfos.size() == - (overwrite ? b.existingIdCnt + 1/*TRASH*/ : b.existingIdCnt); - - if (lowermostExistingInfo.isDirectory()) { - throw new IgfsPathAlreadyExistsException("Failed to " - + (append ? "open" : "create") + " file (path points to an " + - "existing directory): " + path); - } - else { - // This is a file. - assert lowermostExistingInfo.isFile(); - - final IgniteUuid parentId = b.idList.get(b.idList.size() - 2); - - final IgniteUuid lockId = lowermostExistingInfo.lockId(); - - if (append) { - if (lockId != null) - throw fsException("Failed to open file (file is opened for writing) " - + "[fileName=" + name + ", fileId=" + lowermostExistingInfo.id() - + ", lockId=" + lockId + ']'); - - IgfsFileInfo lockedInfo = invokeLock(lowermostExistingInfo.id(), false); - - IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(lockedInfo, parentId); - - tx.commit(); - - IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, - EventType.EVT_IGFS_FILE_OPENED_WRITE); - - return t2; - } - else if (overwrite) { - // Delete existing file, but fail if it is locked: - if (lockId != null) - throw fsException("Failed to overwrite file (file is opened for writing) " + - "[fileName=" + name + ", fileId=" + lowermostExistingInfo.id() - + ", lockId=" + lockId + ']'); + // Prepare lock IDs. + Set<IgniteUuid> lockIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR); - final IgfsListingEntry deletedEntry = lockedInfos.get(parentId).listing() - .get(name); + pathIds.addExistingIds(lockIds); + pathIds.addSurrogateIds(lockIds); - assert deletedEntry != null; - - transferEntry(deletedEntry, parentId, name, trashId, - lowermostExistingInfo.id().toString()); - - // Update a file info of the removed file with a file path, - // which will be used by delete worker for event notifications. - invokeUpdatePath(lowermostExistingInfo.id(), path); - - // Make a new locked info: - long t = System.currentTimeMillis(); - - final IgfsFileInfo newFileInfo = new IgfsFileInfo(cfg.getBlockSize(), 0L, - affKey, createFileLockId(false), evictExclude, fileProps, t, t); - - assert newFileInfo.lockId() != null; // locked info should be created. - - createNewEntry(newFileInfo, parentId, name); - - IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(newFileInfo, parentId); + // Start TX. + IgniteInternalTx tx = startTx(); - tx.commit(); + try { + Map<IgniteUuid, IgfsFileInfo> lockInfos = lockIds(lockIds); - delWorker.signal(); + if (!pathIds.verifyIntegrity(lockInfos)) + // Directory structure changed concurrently. So we simply re-try. + continue; - IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_WRITE); + if (pathIds.allExists()) { + // All participants are found. Simply open the stream. + IgfsFileInfo info = lockInfos.get(pathIds.lastId()); - return t2; - } - else { - throw new IgfsPathAlreadyExistsException("Failed to create file (file " + - "already exists and overwrite flag is false): " + path); - } - } - } + // Check: is it a file? + if (!info.isFile()) + throw new IgfsPathIsDirectoryException("Failed to open file for write." + path); - // The full requested path does not exist. + // Check if file already opened for write. + if (info.lockId() != null) + throw new IgfsException("File is already opened for write: " + path); - // Check only the lowermost directory in the existing directory chain - // because others are already checked in #verifyPathIntegrity() above. - if (!lowermostExistingInfo.isDirectory()) - throw new IgfsParentNotDirectoryException("Failed to " + (append ? "open" : "create" ) - + " file (parent element is not a directory)"); + // At this point we can open the stream safely. + info = invokeLock(info.id(), false); - final String uppermostFileToBeCreatedName = b.components.get(b.existingIdCnt - 1); + IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(info, pathIds.lastParentId()); - if (!lowermostExistingInfo.hasChild(uppermostFileToBeCreatedName)) { - b.doBuild(); + tx.commit(); - assert b.leafInfo != null; - assert b.leafParentId != null; + IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE); - IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(b.leafInfo, b.leafParentId); + return t2; + } + else { + // Create file and parent folders. + IgfsPathsCreateResult res = + createFile(pathIds, lockInfos, dirProps, fileProps, blockSize, affKey, evictExclude); - tx.commit(); + if (res == null) + continue; - b.sendEvents(); + // Commit. + tx.commit(); - return t2; - } + // Generate events. + generateCreateEvents(res.createdPaths(), true); - // Another thread concurrently created file or directory in the path with - // the name we need. + return new T2<>(res.info(), res.parentId()); } } finally { @@ -3464,224 +3432,292 @@ public class IgfsMetaManager extends IgfsManager { finally { busyLock.leaveBusy(); } - } else - throw new IllegalStateException("Failed to mkdir because Grid is stopping. [path=" + path + ']'); + } + else + throw new IllegalStateException("Failed to append for file because Grid is stopping:" + path); } } - /** File chain builder. */ - private class DirectoryChainBuilder { - /** The requested path to be created. */ - private final IgfsPath path; + /** + * Create a new file. + * + * @param path Path. + * @param bufSize Buffer size. + * @param overwrite Overwrite flag. + * @param affKey Affinity key. + * @param replication Replication factor. + * @param props Properties. + * @param simpleCreate Whether new file should be created in secondary FS using create(Path, boolean) method. + * @return Tuple containing the created file info and its parent id. + */ + IgniteBiTuple<IgfsFileInfo, IgniteUuid> create( + final IgfsPath path, + Map<String, String> dirProps, + final boolean overwrite, + final int blockSize, + final @Nullable IgniteUuid affKey, + final boolean evictExclude, + @Nullable Map<String, String> fileProps) throws IgniteCheckedException { + validTxState(false); - /** Full path components. */ - private final List<String> components; + while (true) { + if (busyLock.enterBusy()) { + try { + // Prepare path IDs. + IgfsPathIds pathIds = pathIds(path); - /** The list of ids. */ - private final List<IgniteUuid> idList; + // Prepare lock IDs. + Set<IgniteUuid> lockIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR); - /** The set of ids. */ - private final SortedSet<IgniteUuid> idSet = new TreeSet<IgniteUuid>(PATH_ID_SORTING_COMPARATOR); + pathIds.addExistingIds(lockIds); + pathIds.addSurrogateIds(lockIds); - /** The middle node properties. */ - private final Map<String, String> props; + // In overwrite mode we also lock ID of potential replacement as well as trash ID. + IgniteUuid overwriteId = IgniteUuid.randomUuid(); + IgniteUuid trashId = IgfsUtils.randomTrashId(); - /** The leaf node properties. */ - private final Map<String, String> leafProps; + if (overwrite) { + lockIds.add(overwriteId); - /** The lowermost exsiting path id. */ - private final IgniteUuid lowermostExistingId; + // Trash ID is only added if we suspect conflict. + if (pathIds.allExists()) + lockIds.add(trashId); + } - /** The existing path. */ - private final IgfsPath existingPath; + // Start TX. + IgniteInternalTx tx = startTx(); - /** The created leaf info. */ - private IgfsFileInfo leafInfo; + try { + Map<IgniteUuid, IgfsFileInfo> lockInfos = lockIds(lockIds); - /** The leaf parent id. */ - private IgniteUuid leafParentId; + if (!pathIds.verifyIntegrity(lockInfos)) + // Directory structure changed concurrently. So we simply re-try. + continue; - /** The number of existing ids. */ - private final int existingIdCnt; + if (pathIds.allExists()) { + // All participants found. + IgfsFileInfo oldInfo = lockInfos.get(pathIds.lastId()); - /** Whether laef is directory. */ - private final boolean leafDir; + // Check: is it a file? + if (!oldInfo.isFile()) + throw new IgfsPathIsDirectoryException("Failed to create a file: " + path); - /** Block size. */ - private final int blockSize; + // Check: can we overwrite it? + if (!overwrite) + throw new IgfsPathAlreadyExistsException("Failed to create a file: " + path); - /** Affinity key. */ - private final IgniteUuid affKey; + // Check if file already opened for write. + if (oldInfo.lockId() != null) + throw new IgfsException("File is already opened for write: " + path); - /** Evict exclude flag. */ - private final boolean evictExclude; + // At this point file can be re-created safely. - /** - * Constructor for directories. - * - * @param path Path. - * @param props Properties. - * @throws IgniteCheckedException If failed. - */ - protected DirectoryChainBuilder(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { - this(path, props, props, true, 0, null, false); - } + // First step: add existing to trash listing. + IgniteUuid oldId = pathIds.lastId(); - /** - * Constructor for files. - * - * @param path Path. - * @param dirProps Directory properties. - * @param fileProps File properties. - * @param blockSize Block size. - * @param affKey Affinity key (optional). - * @param evictExclude Evict exclude flag. - * @throws IgniteCheckedException If failed. - */ - protected DirectoryChainBuilder(IgfsPath path, Map<String, String> dirProps, Map<String, String> fileProps, - int blockSize, @Nullable IgniteUuid affKey, boolean evictExclude) - throws IgniteCheckedException { - this(path, dirProps, fileProps, false, blockSize, affKey, evictExclude); - } + id2InfoPrj.invoke(trashId, new ListingAddProcessor(oldId.toString(), + new IgfsListingEntry(oldId, true))); - /** - * Constructor. - * - * @param path Path. - * @param props Middle properties. - * @param leafProps Leaf properties. - * @param leafDir Whether leaf is directory or file. - * @param blockSize Block size. - * @param affKey Affinity key (optional). - * @param evictExclude Evict exclude flag. - * @throws IgniteCheckedException If failed. - */ - private DirectoryChainBuilder(IgfsPath path, Map<String,String> props, Map<String,String> leafProps, - boolean leafDir, int blockSize, @Nullable IgniteUuid affKey, boolean evictExclude) - throws IgniteCheckedException { - this.path = path; - this.components = path.components(); - this.idList = fileIds(path); - this.props = props; - this.leafProps = leafProps; - this.leafDir = leafDir; - this.blockSize = blockSize; - this.affKey = affKey; - this.evictExclude = evictExclude; + // Second step: replace ID in parent directory. + String name = pathIds.lastPart(); + IgniteUuid parentId = pathIds.lastParentId(); - // Store all the non-null ids in the set & construct existing path in one loop: - IgfsPath existingPath = path.root(); + id2InfoPrj.invoke(parentId, new ListingReplaceProcessor(name, overwriteId)); - assert idList.size() == components.size() + 1; + // Third step: create the file. + long createTime = System.currentTimeMillis(); - // Find the lowermost existing id: - IgniteUuid lowermostExistingId = null; + IgfsFileInfo newInfo = invokeAndGet(overwriteId, new FileCreateProcessor(createTime, + fileProps, blockSize, affKey, createFileLockId(false), evictExclude)); - int idIdx = 0; + // Fourth step: update path of remove file. + invokeUpdatePath(oldId, path); - for (IgniteUuid id : idList) { - if (id == null) - break; + // Prepare result and commit. + IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(newInfo, parentId); - lowermostExistingId = id; + tx.commit(); - boolean added = idSet.add(id); + IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE); - assert added : "Not added id = " + id; + return t2; + } + else { + // Create file and parent folders. + IgfsPathsCreateResult res = + createFile(pathIds, lockInfos, dirProps, fileProps, blockSize, affKey, evictExclude); - if (idIdx >= 1) // skip root. - existingPath = new IgfsPath(existingPath, components.get(idIdx - 1)); + if (res == null) + continue; - idIdx++; - } + // Commit. + tx.commit(); - assert idSet.contains(IgfsUtils.ROOT_ID); + // Generate events. + generateCreateEvents(res.createdPaths(), true); - this.lowermostExistingId = lowermostExistingId; + return new T2<>(res.info(), res.parentId()); + } + } + finally { + tx.close(); + } + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to mkdir because Grid is stopping. [path=" + path + ']'); + } + } - this.existingPath = existingPath; + /** + * Create directory and it's parents. + * + * @param pathIds Path IDs. + * @param lockInfos Lock infos. + * @param dirProps Directory properties. + * @return Result or {@code} if the first parent already contained child with the same name. + * @throws IgniteCheckedException If failed. + */ + @Nullable IgfsPathsCreateResult createDirectory(IgfsPathIds pathIds, Map<IgniteUuid, IgfsFileInfo> lockInfos, + Map<String, String> dirProps) throws IgniteCheckedException { + // Check if entry we are going to write to is directory. + if (lockInfos.get(pathIds.lastExistingId()).isFile()) + throw new IgfsParentNotDirectoryException("Failed to create directory (parent " + + "element is not a directory)"); + + return createFileOrDirectory(true, pathIds, lockInfos, dirProps, null, 0, null, false); + } - this.existingIdCnt = idSet.size(); - } + /** + * Create file and all it's parents. + * + * @param pathIds Paths IDs. + * @param lockInfos Lock infos. + * @param dirProps Directory properties. + * @param fileProps File propertris. + * @param blockSize Block size. + * @param affKey Affinity key (optional) + * @param evictExclude Evict exclude flag. + * @return Result or {@code} if the first parent already contained child with the same name. + * @throws IgniteCheckedException If failed. + */ + @Nullable private IgfsPathsCreateResult createFile(IgfsPathIds pathIds, Map<IgniteUuid, IgfsFileInfo> lockInfos, + Map<String, String> dirProps, Map<String, String> fileProps, int blockSize, @Nullable IgniteUuid affKey, + boolean evictExclude) throws IgniteCheckedException{ + // Check if entry we are going to write to is directory. + if (lockInfos.get(pathIds.lastExistingId()).isFile()) + throw new IgfsParentNotDirectoryException("Failed to open file for write " + + "(parent element is not a directory): " + pathIds.path()); + + return createFileOrDirectory(false, pathIds, lockInfos, dirProps, fileProps, blockSize, affKey, evictExclude); + } - /** - * Does the main portion of job building the renmaining path. - */ - public final void doBuild() throws IgniteCheckedException { - // Fix current time. It will be used in all created entities. - long createTime = System.currentTimeMillis(); + /** + * Ceate file or directory. + * + * @param dir Directory flag. + * @param pathIds Path IDs. + * @param lockInfos Lock infos. + * @param dirProps Directory properties. + * @param fileProps File properties. + * @param blockSize Block size. + * @param affKey Affinity key. + * @param evictExclude Evict exclude flag. + * @return Result. + * @throws IgniteCheckedException If failed. + */ + private IgfsPathsCreateResult createFileOrDirectory(boolean dir, IgfsPathIds pathIds, + Map<IgniteUuid, IgfsFileInfo> lockInfos, Map<String, String> dirProps, Map<String, String> fileProps, + int blockSize, @Nullable IgniteUuid affKey, boolean evictExclude) throws IgniteCheckedException { + // This is our starting point. + int lastExistingIdx = pathIds.lastExistingIndex(); + IgfsFileInfo lastExistingInfo = lockInfos.get(pathIds.lastExistingId()); + + // If current info already contains entry with the same name as it's child, then something + // has changed concurrently. We must re-try because we cannot get info of this unexpected + // element due to possible deadlocks. + int curIdx = lastExistingIdx + 1; + + String curPart = pathIds.part(curIdx); + IgniteUuid curId = pathIds.surrogateId(curIdx); + IgniteUuid curParentId = lastExistingInfo.id(); + + if (lastExistingInfo.hasChild(curPart)) + return null; - IgfsListingEntry childInfo = null; - String childName = null; + // First step: add new entry to the last existing element. + id2InfoPrj.invoke(lastExistingInfo.id(), new ListingAddProcessor(curPart, + new IgfsListingEntry(curId, dir || !pathIds.isLastIndex(curIdx)))); - IgniteUuid parentId = null; + // Events support. + IgfsPath lastCreatedPath = pathIds.lastExistingPath(); - // This loop creates the missing directory chain from the bottom to the top: - for (int i = components.size() - 1; i >= existingIdCnt - 1; i--) { - IgniteUuid childId = IgniteUuid.randomUuid(); - boolean childDir; + List<IgfsPath> createdPaths = new ArrayList<>(pathIds.count() - curIdx); - if (childName == null) { - assert childInfo == null; + // Second step: create middle directories. + long createTime = System.currentTimeMillis(); - if (leafDir) { - childDir = true; + while (curIdx < pathIds.count() - 1) { + int nextIdx = curIdx + 1; - leafInfo = invokeAndGet(childId, new DirectoryCreateProcessor(createTime, leafProps)); - } - else { - childDir = false; + String nextPart = pathIds.part(nextIdx); + IgniteUuid nextId = pathIds.surrogateId(nextIdx); - leafInfo = invokeAndGet(childId, new FileCreateProcessor(createTime, leafProps, blockSize, - affKey, createFileLockId(false), evictExclude)); - } - } - else { - assert childInfo != null; + id2InfoPrj.invoke(curId, new DirectoryCreateProcessor(createTime, dirProps, + nextPart, new IgfsListingEntry(nextId, dir || !pathIds.isLastIndex(nextIdx)))); - childDir = true; + // Save event. + lastCreatedPath = new IgfsPath(lastCreatedPath, curPart); - id2InfoPrj.invoke(childId, new DirectoryCreateProcessor(createTime, props, childName, childInfo)); + createdPaths.add(lastCreatedPath); - if (parentId == null) - parentId = childId; - } + // Advance things further. + curIdx++; - childInfo = new IgfsListingEntry(childId, childDir); + curParentId = curId; - childName = components.get(i); - } + curPart = nextPart; + curId = nextId; + } - if (parentId == null) - parentId = lowermostExistingId; + // Third step: create leaf. + IgfsFileInfo info; - leafParentId = parentId; + if (dir) + info = invokeAndGet(curId, new DirectoryCreateProcessor(createTime, dirProps)); + else + info = invokeAndGet(curId, new FileCreateProcessor(createTime, fileProps, + blockSize, affKey, createFileLockId(false), evictExclude)); - // Now link the newly created directory chain to the lowermost existing parent: - id2InfoPrj.invoke(lowermostExistingId, new ListingAddProcessor(childName, childInfo)); - } + createdPaths.add(pathIds.path()); - /** - * Sends events. - */ - public final void sendEvents() { - if (evts.isRecordable(EVT_IGFS_DIR_CREATED)) { - IgfsPath createdPath = existingPath; + return new IgfsPathsCreateResult(createdPaths, info, curParentId); + } - for (int i = existingPath.components().size(); i < components.size() - 1; i++) { - createdPath = new IgfsPath(createdPath, components.get(i)); + /** + * Generate events for created file or directory. + * + * @param createdPaths Created paths. + * @param file Whether file was created. + */ + private void generateCreateEvents(List<IgfsPath> createdPaths, boolean file) { + if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) { + for (int i = 0; i < createdPaths.size() - 1; i++) + IgfsUtils.sendEvents(igfsCtx.kernalContext(), createdPaths.get(i), + EventType.EVT_IGFS_DIR_CREATED); + } - IgfsUtils.sendEvents(igfsCtx.kernalContext(), createdPath, EVT_IGFS_DIR_CREATED); - } - } + IgfsPath leafPath = createdPaths.get(createdPaths.size() - 1); - if (leafDir) - IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_DIR_CREATED); - else { - IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_CREATED); - IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_WRITE); - } + if (file) { + IgfsUtils.sendEvents(igfsCtx.kernalContext(), leafPath, EventType.EVT_IGFS_FILE_CREATED); + IgfsUtils.sendEvents(igfsCtx.kernalContext(), leafPath, EventType.EVT_IGFS_FILE_OPENED_WRITE); } + else + IgfsUtils.sendEvents(igfsCtx.kernalContext(), leafPath, EventType.EVT_IGFS_DIR_CREATED); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf40752/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java new file mode 100644 index 0000000..1f669b0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +import java.util.Collection; +import java.util.Map; + +/** + * Path IDs abstraction. Contains path and corresponding IDs. + */ +public class IgfsPathIds { + /** Original path. */ + private final IgfsPath path; + + /** Path parts. */ + private final String[] parts; + + /** IDs. */ + private final IgniteUuid[] ids; + + /** Surrogate IDs for paths which doesn't exist yet. Initialized on demand. */ + private IgniteUuid[] surrogateIds; + + /** Index of last existing ID. */ + private final int lastExistingIdx; + + /** + * Constructor. + * + * @param path Path. + * @param parts Path parts. + * @param ids IDs. + */ + public IgfsPathIds(IgfsPath path, String[] parts, IgniteUuid[] ids) { + assert path != null; + assert parts.length == ids.length; + + this.path = path; + this.parts = parts; + this.ids = ids; + + int lastExistingIdx0 = -1; + + for (int i = parts.length - 1; i >= 0; i--) { + if (ids[i] != null) { + lastExistingIdx0 = i; + + break; + } + } + + assert lastExistingIdx0 >= 0; + + lastExistingIdx = lastExistingIdx0; + } + + /** + * Get parent entity. + * + * @return Parent entity. + */ + public IgfsPathIds parent() { + assert ids.length > 1; + + String[] parentParts = new String[parts.length - 1]; + IgniteUuid[] parentIds = new IgniteUuid[ids.length - 1]; + + System.arraycopy(parts, 0, parentParts, 0, parentParts.length); + System.arraycopy(ids, 0, parentIds, 0, parentIds.length); + + return new IgfsPathIds(path.parent(), parentParts, parentIds); + } + + /** + * Get number of elements. + * + * @return ID count. + */ + public int count() { + return ids.length; + } + + /** + * Get original path. + * + * @return Path. + */ + public IgfsPath path() { + return path; + } + + /** + * Get path part at the given index. + * + * @param idx Index. + * @return Path part. + */ + public String part(int idx) { + assert idx < parts.length; + + return parts[idx]; + } + + /** + * Get last part of original path. + * + * @return Last part. + */ + public String lastPart() { + return parts[parts.length - 1]; + } + + /** + * Get last ID. + * + * @return Last ID. + */ + @Nullable public IgniteUuid lastId() { + return ids[ids.length - 1]; + } + + /** + * Get last parent ID. + * + * @return Last parent ID. + */ + @Nullable public IgniteUuid lastParentId() { + return ids[ids.length - 2]; + } + + /** + * Whether provided index denotes last entry in the path. + * + * @param idx Index. + * @return {@code True} if last. + */ + public boolean isLastIndex(int idx) { + return idx == parts.length - 1; + } + + /** + * Get path of the last existing element. + * + * @return Path of the last existing element. + */ + public IgfsPath lastExistingPath() { + IgfsPath path = new IgfsPath(); + + for (int i = 1; i <= lastExistingIdx; i++) + path = new IgfsPath(path, parts[i]); + + return path; + } + + /** + * Whether all parts exists. + * + * @return {@code True} if all parts were found. + */ + public boolean allExists() { + return parts.length == lastExistingIdx + 1; + } + + /** + * Whether last entry exists. + * + * @return {@code True} if exists. + */ + public boolean lastExists() { + return lastExistingIdx == ids.length - 1; + } + + + /** + * Whether parent of the last entry exists. + * + * @return {@code True} if exists. + */ + public boolean lastParentExists() { + return ids.length > 1 && lastExistingIdx == ids.length - 2; + } + + /** + * Get ID of the last existing entry. + * + * @return ID of the last existing entry. + */ + public IgniteUuid lastExistingId() { + return ids[lastExistingIdx]; + } + + /** + * Get index of the last existing entry. + * + * @return Index of the last existing entry. + */ + public int lastExistingIndex() { + return lastExistingIdx; + } + + /** + * Add existing IDs to provided collection. + * + * @param col Collection. + */ + @SuppressWarnings("ManualArrayToCollectionCopy") + public void addExistingIds(Collection<IgniteUuid> col) { + for (int i = 0; i <= lastExistingIdx; i++) + col.add(ids[i]); + } + + /** + * Add surrogate IDs to provided collection potentially creating them on demand. + * + * @param col Collection. + */ + @SuppressWarnings("ManualArrayToCollectionCopy") + public void addSurrogateIds(Collection<IgniteUuid> col) { + if (surrogateIds == null) { + surrogateIds = new IgniteUuid[ids.length]; + + for (int i = lastExistingIdx + 1; i < surrogateIds.length; i++) + surrogateIds[i] = IgniteUuid.randomUuid(); + } + + for (int i = lastExistingIdx + 1; i < surrogateIds.length; i++) + col.add(surrogateIds[i]); + } + + /** + * Get surrogate ID at the given index. + * + * @param idx Index. + * @return Surrogate ID. + */ + public IgniteUuid surrogateId(int idx) { + assert surrogateIds != null; + + assert idx > lastExistingIdx; + assert idx < surrogateIds.length; + + return surrogateIds[idx]; + } + + /** + * Verify that observed paths are found in provided infos in the right order. + * + * @param infos Info. + * @return {@code True} if full integrity is preserved. + */ + public boolean verifyIntegrity(Map<IgniteUuid, IgfsFileInfo> infos) { + for (int i = 0; i <= lastExistingIdx; i++) { + IgniteUuid curId = ids[i]; + IgfsFileInfo curInfo = infos.get(curId); + + // Check if required ID is there. + if (curInfo == null) + return false; + + // For non-leaf entry we check if child exists. + if (i < lastExistingIdx) { + String childName = parts[i + 1]; + IgniteUuid childId = ids[i + 1]; + + if (!curInfo.hasChild(childName, childId)) + return false; + } + } + + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf40752/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathsCreateResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathsCreateResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathsCreateResult.java new file mode 100644 index 0000000..3b620f8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathsCreateResult.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; + +import java.util.List; + +/** + * IGFS paths create result. + */ +public class IgfsPathsCreateResult { + /** Created paths. */ + private final List<IgfsPath> paths; + + /** Info of the last created file. */ + private final IgfsFileInfo info; + + /** Parent ID. */ + private final IgniteUuid parentId; + + /** + * Constructor. + * + * @param paths Created paths. + * @param info Info of the last created file. + * @param parentId Parent ID. + */ + public IgfsPathsCreateResult(List<IgfsPath> paths, IgfsFileInfo info, IgniteUuid parentId) { + this.paths = paths; + this.info = info; + this.parentId = parentId; + } + + /** + * @return Created paths. + */ + public List<IgfsPath> createdPaths() { + return paths; + } + + /** + * @return Info of the last created file. + */ + public IgfsFileInfo info() { + return info; + } + + /** + * @return Parent ID. + */ + public IgniteUuid parentId() { + return parentId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsPathsCreateResult.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf40752/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java index edded2f..b176e21 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java @@ -17,10 +17,6 @@ package org.apache.ignite.internal.processors.igfs; -import java.lang.reflect.Constructor; -import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteSystemProperties; @@ -43,6 +39,10 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.Transaction; import org.jetbrains.annotations.Nullable; +import java.lang.reflect.Constructor; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; + import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_RETRIES_COUNT; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; @@ -104,7 +104,17 @@ public class IgfsUtils { * @return {@code True} if this is root ID or trash ID. */ public static boolean isRootOrTrashId(@Nullable IgniteUuid id) { - return id != null && (ROOT_ID.equals(id) || isTrashId(id)); + return isRootId(id) || isTrashId(id); + } + + /** + * Check whether provided ID is root ID. + * + * @param id ID. + * @return {@code True} if this is root ID. + */ + public static boolean isRootId(@Nullable IgniteUuid id) { + return id != null && ROOT_ID.equals(id); } /** @@ -114,7 +124,8 @@ public class IgfsUtils { * @return {@code True} if this is trash ID. */ private static boolean isTrashId(IgniteUuid id) { - assert id != null; + if (id == null) + return false; UUID gid = id.globalId(); http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf40752/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java index 52d8bd5..2acf59c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java @@ -17,27 +17,6 @@ package org.apache.ignite.internal.processors.igfs; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.reflect.Field; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -79,6 +58,28 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.jetbrains.annotations.Nullable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Field; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; import static org.apache.ignite.cache.CacheMode.PARTITIONED; @@ -1449,19 +1450,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { createCtr.incrementAndGet(); } catch (IgniteException e) { - Throwable[] chain = X.getThrowables(e); - - Throwable cause = chain[chain.length - 1]; - - if (!e.getMessage().startsWith("Failed to overwrite file (file is opened for writing)") - && (cause == null - || !cause.getMessage().startsWith("Failed to overwrite file (file is opened for writing)"))) { - - System.out.println("Failed due to IgniteException exception. Cause:"); - cause.printStackTrace(System.out); - - err.compareAndSet(null, e); - } + // No-op. } catch (IOException e) { err.compareAndSet(null, e); @@ -1937,15 +1926,8 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest { chunksCtr.incrementAndGet(); } - catch (IgniteException e) { - Throwable[] chain = X.getThrowables(e); - - Throwable cause = chain[chain.length - 1]; - - if (!e.getMessage().startsWith("Failed to open file (file is opened for writing)") - && (cause == null - || !cause.getMessage().startsWith("Failed to open file (file is opened for writing)"))) - err.compareAndSet(null, e); + catch (IgniteException ignore) { + // No-op. } catch (IOException e) { err.compareAndSet(null, e);
