Repository: ignite Updated Branches: refs/heads/master 96c8c246b -> 8eed48b6b
IGNITE-1679: IGFS: Restored PURGE events. This closes #262. This closes #605. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/519ae631 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/519ae631 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/519ae631 Branch: refs/heads/master Commit: 519ae631e1931f227111d50c4709bbd70ae4d781 Parents: 6578c8b Author: iveselovskiy <[email protected]> Authored: Tue Apr 12 15:32:06 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Tue Apr 12 15:32:06 2016 +0300 ---------------------------------------------------------------------- .../processors/igfs/IgfsDeleteWorker.java | 16 +++++++-- .../processors/igfs/IgfsMetaManager.java | 26 +++++++-------- .../internal/processors/igfs/IgfsUtils.java | 34 ++++++++++++++++++++ ...IgfsMetaDirectoryListingRemoveProcessor.java | 2 -- .../ignite/igfs/IgfsEventsAbstractSelfTest.java | 26 +++++++++++---- .../apache/ignite/igfs/IgfsEventsTestSuite.java | 6 ++++ 6 files changed, 86 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/519ae631/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java index e5914e0..aef60dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.igfs; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; @@ -39,6 +40,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS; +import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_PURGED; /** * IGFS worker for removal from the trash directory. @@ -167,7 +169,7 @@ public class IgfsDeleteWorker extends IgfsThread { try { info = meta.info(trashId); } - catch(ClusterTopologyServerNotFoundException e) { + catch (ClusterTopologyServerNotFoundException e) { LT.warn(log, e, "Server nodes not found."); } catch (IgniteCheckedException e) { @@ -245,7 +247,17 @@ public class IgfsDeleteWorker extends IgfsThread { // In case this node crashes, other node will re-delete the file. data.delete(lockedInfo).get(); - return meta.delete(trashId, name, id); + boolean ret = meta.delete(trashId, name, id); + + if (ret) { + IgfsPath path = IgfsUtils.extractOriginalPathFromTrash(name); + + assert path != null; + + IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_PURGED); + } + + return ret; } } else http://git-wip-us.apache.org/repos/asf/ignite/blob/519ae631/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java index 7b1d68c..73d0887 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java @@ -968,7 +968,7 @@ public class IgfsMetaManager extends IgfsManager { * @param destParentId New parent directory ID. * @throws IgniteCheckedException If failed. */ - private void moveNonTx(IgniteUuid fileId, @Nullable String srcFileName, IgniteUuid srcParentId, String destFileName, + private void moveNonTx(IgniteUuid fileId, String srcFileName, IgniteUuid srcParentId, String destFileName, IgniteUuid destParentId) throws IgniteCheckedException { validTxState(true); @@ -1139,7 +1139,8 @@ public class IgfsMetaManager extends IgfsManager { // Prepare trash data. IgfsEntryInfo trashInfo = lockInfos.get(trashId); - final String trashName = victimId.toString(); + + final String trashName = IgfsUtils.composeNameForTrash(path, victimId); assert !trashInfo.hasChild(trashName) : "Failed to add file name into the " + "destination directory (file already exists) [destName=" + trashName + ']'; @@ -1169,14 +1170,14 @@ public class IgfsMetaManager extends IgfsManager { * Move path to the trash directory in existing transaction. * * @param parentId Parent ID. - * @param name Path name. + * @param path Path name. * @param id Path ID. - * @param trashId Trash ID. + * @param trashId Trash folder ID. * @return ID of an entry located directly under the trash directory. * @throws IgniteCheckedException If failed. */ @SuppressWarnings("RedundantCast") - @Nullable private IgniteUuid softDeleteNonTx(@Nullable IgniteUuid parentId, @Nullable String name, IgniteUuid id, + @Nullable private IgniteUuid softDeleteNonTx(@Nullable IgniteUuid parentId, IgfsPath path, IgniteUuid id, IgniteUuid trashId) throws IgniteCheckedException { validTxState(true); @@ -1208,9 +1209,7 @@ public class IgfsMetaManager extends IgfsManager { lockIds(lockIds); // Construct new info and move locked entries from root to it. - Map<String, IgfsListingEntry> transferListing = new HashMap<>(); - - transferListing.putAll(rootListing); + Map<String, IgfsListingEntry> transferListing = new HashMap<>(rootListing); IgfsEntryInfo newInfo = IgfsUtils.createDirectory( IgniteUuid.randomUuid(), @@ -1234,7 +1233,7 @@ public class IgfsMetaManager extends IgfsManager { // Ensure trash directory existence. createSystemDirectoryIfAbsent(trashId); - moveNonTx(id, name, parentId, id.toString(), trashId); + moveNonTx(id, path.name(), parentId, IgfsUtils.composeNameForTrash(path, id), trashId); resId = id; } @@ -2406,12 +2405,12 @@ public class IgfsMetaManager extends IgfsManager { if (path.parent() != null) { assert infos.containsKey(path.parent()); - softDeleteNonTx(infos.get(path.parent()).id(), path.name(), info.id(), trashId); + softDeleteNonTx(infos.get(path.parent()).id(), path, info.id(), trashId); } else { assert IgfsUtils.ROOT_ID.equals(info.id()); - softDeleteNonTx(null, path.name(), info.id(), trashId); + softDeleteNonTx(null, path, info.id(), trashId); } return true; // No additional handling is required. @@ -2644,7 +2643,6 @@ public class IgfsMetaManager extends IgfsManager { pathIds.add(fileIds(path)); // Start pessimistic. - try (IgniteInternalTx tx = startTx()) { // Lock the very first existing parents and possibly the leaf as well. Map<IgfsPath, IgfsPath> pathToParent = new HashMap<>(); @@ -3101,8 +3099,8 @@ public class IgfsMetaManager extends IgfsManager { // First step: add existing to trash listing. IgniteUuid oldId = pathIds.lastId(); - id2InfoPrj.invoke(trashId, new IgfsMetaDirectoryListingAddProcessor(oldId.toString(), - new IgfsListingEntry(oldInfo))); + id2InfoPrj.invoke(trashId, new IgfsMetaDirectoryListingAddProcessor( + IgfsUtils.composeNameForTrash(path, oldId), new IgfsListingEntry(oldInfo))); // Second step: replace ID in parent directory. String name = pathIds.lastPart(); http://git-wip-us.apache.org/repos/asf/ignite/blob/519ae631/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java index 1b97565..2b31868 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java @@ -105,6 +105,9 @@ public class IgfsUtils { /** Maximum number of file unlock transaction retries when topology changes. */ private static final int MAX_CACHE_TX_RETRIES = IgniteSystemProperties.getInteger(IGNITE_CACHE_RETRIES_COUNT, 100); + /** Separator between id and name parts in the trash name. */ + private static final char TRASH_NAME_SEPARATOR = '|'; + /** * Static initializer. */ @@ -308,6 +311,7 @@ public class IgfsUtils { assert path != null; GridEventStorageManager evts = kernalCtx.event(); + ClusterNode locNode = kernalCtx.discovery().localNode(); if (evts.isRecordable(type)) { @@ -681,4 +685,34 @@ public class IgfsUtils { else return null; } + + /** + * Parses the TRASH file name to extract the original path. + * + * @param name The TRASH short (entry) name. + * @return The original path, or null in case of failure. + */ + public static IgfsPath extractOriginalPathFromTrash(String name) { + int idx = name.indexOf(TRASH_NAME_SEPARATOR); + + assert idx >= 0; + + String path = name.substring(idx + 1, name.length()); + + return new IgfsPath(path); + } + + /** + * Creates short name of the file in TRASH directory. + * The name consists of the whole file path and its unique id. + * Upon file cleanup this name will be parsed to extract the path. + * Note that in contrast to common practice the composed name contains '/' character. + * + * @param path The full path of the deleted file. + * @param id The file id. + * @return The new short name for trash directory. + */ + static String composeNameForTrash(IgfsPath path, IgniteUuid id) { + return id.toString() + TRASH_NAME_SEPARATOR + path.toString(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/519ae631/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingRemoveProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingRemoveProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingRemoveProcessor.java index 15eb9c8..4c34fea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingRemoveProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingRemoveProcessor.java @@ -83,8 +83,6 @@ public class IgfsMetaDirectoryListingRemoveProcessor implements EntryProcessor<I Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing()); - listing.putAll(fileInfo.listing()); - IgfsListingEntry oldEntry = listing.get(fileName); if (oldEntry == null || !oldEntry.fileId().equals(fileId)) http://git-wip-us.apache.org/repos/asf/ignite/blob/519ae631/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java index de20a4f..9c2c6e5 100644 --- a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java @@ -53,6 +53,7 @@ import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CREATED; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE; +import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_PURGED; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_RENAMED; import static org.apache.ignite.events.EventType.EVT_JOB_MAPPED; import static org.apache.ignite.events.EventType.EVT_TASK_FAILED; @@ -424,7 +425,7 @@ public abstract class IgfsEventsAbstractSelfTest extends GridCommonAbstractTest public void testTwoFiles() throws Exception { final List<Event> evtList = new ArrayList<>(); - final int evtsCnt = 4 + 3 + 1 + 1; + final int evtsCnt = 4 + 3 + 2 + 2; final CountDownLatch latch = new CountDownLatch(evtsCnt); @@ -494,9 +495,11 @@ public abstract class IgfsEventsAbstractSelfTest extends GridCommonAbstractTest assertEquals(0, evt.dataSize()); assertOneToOne( - evtList.subList(7, 9), + evtList.subList(7, 11), new EventPredicate(EVT_IGFS_FILE_DELETED, new IgfsPath("/dir1/file1")), - new EventPredicate(EVT_IGFS_FILE_DELETED, new IgfsPath("/dir1/file2")) + new EventPredicate(EVT_IGFS_FILE_PURGED, new IgfsPath("/dir1/file1")), + new EventPredicate(EVT_IGFS_FILE_DELETED, new IgfsPath("/dir1/file2")), + new EventPredicate(EVT_IGFS_FILE_PURGED, new IgfsPath("/dir1/file2")) ); } @@ -680,7 +683,11 @@ public abstract class IgfsEventsAbstractSelfTest extends GridCommonAbstractTest public void testSingleFileOverwrite() throws Exception { final List<Event> evtList = new ArrayList<>(); - final int evtsCnt = 1 + 3 + 1; + // NB: In case of create-overwrite FILE_PURGED event will be sent in PRIMARY IGFS mode only. + final boolean awaitForPurgeEvt + = grid(1).configuration().getFileSystemConfiguration()[0].getDefaultMode() == IgfsMode.PRIMARY; + + final int evtsCnt = 1 + 4 + (awaitForPurgeEvt ? 1 : 0); final CountDownLatch latch = new CountDownLatch(evtsCnt); @@ -700,7 +707,7 @@ public abstract class IgfsEventsAbstractSelfTest extends GridCommonAbstractTest igfs.create(file, false).close(); // Will generate create, open and close events. - igfs.create(file, true).close(); // Will generate only OPEN_WRITE & close events. + igfs.create(file, true).close(); // Will generate PURGE (async), OPEN_WRITE & close events. try { igfs.create(file, false).close(); // Won't generate any event. @@ -734,6 +741,13 @@ public abstract class IgfsEventsAbstractSelfTest extends GridCommonAbstractTest @Override public boolean apply(Event e) { IgfsEvent e0 = (IgfsEvent)e; + return e0.type() == EVT_IGFS_FILE_PURGED && e0.path().equals(file1); + } + }, + new P1<Event>() { + @Override public boolean apply(Event e) { + IgfsEvent e0 = (IgfsEvent)e; + return e0.type() == EVT_IGFS_FILE_OPENED_WRITE && e0.path().equals(file1); } }, @@ -811,7 +825,7 @@ public abstract class IgfsEventsAbstractSelfTest extends GridCommonAbstractTest evt = (IgfsEvent)evtList.get(4); assertEquals(EVT_IGFS_FILE_CLOSED_READ, evt.type()); assertEquals(new IgfsPath("/file1"), evt.path()); - assertEquals((long)dataSize, evt.dataSize()); + assertEquals((long) dataSize, evt.dataSize()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/519ae631/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java index 1dd665a..a9d7bad 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java @@ -81,6 +81,8 @@ public class IgfsEventsTestSuite extends TestSuite { @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); + igfsCfg.setDefaultMode(IgfsMode.PRIMARY); + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); endpointCfg.setType(IgfsIpcEndpointType.SHMEM); @@ -100,6 +102,8 @@ public class IgfsEventsTestSuite extends TestSuite { @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); + igfsCfg.setDefaultMode(IgfsMode.PRIMARY); + IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); endpointCfg.setType(IgfsIpcEndpointType.TCP); @@ -224,6 +228,8 @@ public class IgfsEventsTestSuite extends TestSuite { @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); + igfsCfg.setDefaultMode(IgfsMode.PRIMARY); + igfsCfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem( "igfs://igfs-secondary:[email protected]:11500/", "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"));
