IGNITE-2861: IGFS: Moved metadata processors into separate top-level classes to simplify code. Also cleaned up IgfsMetaManager from unused code.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/865e376a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/865e376a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/865e376a Branch: refs/heads/ignite-2849 Commit: 865e376ad469bf9929b9f6f98ff0882e44c951a8 Parents: c506c44 Author: thatcoach <[email protected]> Authored: Sat Mar 19 21:13:35 2016 +0300 Committer: thatcoach <[email protected]> Committed: Sat Mar 19 21:13:35 2016 +0300 ---------------------------------------------------------------------- .../igfs/IgfsFragmentizerManager.java | 141 +-- .../processors/igfs/IgfsMetaManager.java | 1142 ++---------------- .../internal/processors/igfs/IgfsPathIds.java | 2 +- .../meta/IgfsMetaDirectoryCreateProcessor.java | 117 ++ .../IgfsMetaDirectoryListingAddProcessor.java | 92 ++ ...IgfsMetaDirectoryListingRemoveProcessor.java | 89 ++ ...gfsMetaDirectoryListingReplaceProcessor.java | 84 ++ .../igfs/meta/IgfsMetaFileCreateProcessor.java | 110 ++ .../igfs/meta/IgfsMetaFileLockProcessor.java | 63 + .../meta/IgfsMetaFileRangeDeleteProcessor.java | 74 ++ .../meta/IgfsMetaFileRangeUpdateProcessor.java | 81 ++ .../meta/IgfsMetaFileReserveSpaceProcessor.java | 75 ++ .../igfs/meta/IgfsMetaFileUnlockProcessor.java | 60 + .../igfs/meta/IgfsMetaUpdatePathProcessor.java | 66 + .../meta/IgfsMetaUpdatePropertiesProcessor.java | 78 ++ .../igfs/meta/IgfsMetaUpdateTimesProcessor.java | 68 ++ 16 files changed, 1155 insertions(+), 1187 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/865e376a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java index 194a8ac..99e7cd6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java @@ -26,13 +26,14 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileRangeDeleteProcessor; +import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileRangeUpdateProcessor; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridSpinReadWriteLock; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.LT; -import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiTuple; @@ -40,13 +41,6 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; -import javax.cache.processor.EntryProcessor; -import javax.cache.processor.EntryProcessorException; -import javax.cache.processor.MutableEntry; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; @@ -294,7 +288,7 @@ public class IgfsFragmentizerManager extends IgfsManager { case RANGE_STATUS_INITIAL: { // Mark range as moving. updated = igfsCtx.meta().updateInfo( - fileId, new RangeUpdateProcessor(range, RANGE_STATUS_MOVING)); + fileId, new IgfsMetaFileRangeUpdateProcessor(range, RANGE_STATUS_MOVING)); if (updated == null) { igfsCtx.data().cleanBlocks(fileInfo, range, true); @@ -311,7 +305,7 @@ public class IgfsFragmentizerManager extends IgfsManager { // Mark range as moved. updated = igfsCtx.meta().updateInfo( - fileId, new RangeUpdateProcessor(range, RANGE_STATUS_MOVED)); + fileId, new IgfsMetaFileRangeUpdateProcessor(range, RANGE_STATUS_MOVED)); if (updated == null) { igfsCtx.data().cleanBlocks(fileInfo, range, true); @@ -327,7 +321,7 @@ public class IgfsFragmentizerManager extends IgfsManager { igfsCtx.data().cleanBlocks(fileInfo, range, false); // Remove range from map. - updated = igfsCtx.meta().updateInfo(fileId, new RangeDeleteProcessor(range)); + updated = igfsCtx.meta().updateInfo(fileId, new IgfsMetaFileRangeDeleteProcessor(range)); if (updated == null) igfsCtx.data().cleanBlocks(fileInfo, range, true); @@ -343,131 +337,6 @@ public class IgfsFragmentizerManager extends IgfsManager { } /** - * Update range processor. - */ - private static class RangeUpdateProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, IgfsEntryInfo>, - Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Range. */ - private IgfsFileAffinityRange range; - - /** Status. */ - private int status; - - /** - * Constructor. - */ - public RangeUpdateProcessor() { - // No-op. - } - - /** - * Constructor. - * - * @param range Range. - * @param status Status. - */ - public RangeUpdateProcessor(IgfsFileAffinityRange range, int status) { - this.range = range; - this.status = status; - } - - /** {@inheritDoc} */ - @Override public IgfsEntryInfo process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args) - throws EntryProcessorException { - IgfsEntryInfo oldInfo = entry.getValue(); - - IgfsFileMap newMap = new IgfsFileMap(oldInfo.fileMap()); - - newMap.updateRangeStatus(range, status); - - IgfsEntryInfo newInfo = oldInfo.fileMap(newMap); - - entry.setValue(newInfo); - - return newInfo; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(range); - out.writeInt(status); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - range = (IgfsFileAffinityRange)in.readObject(); - status = in.readInt(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(RangeUpdateProcessor.class, this); - } - } - - /** - * Delete range processor. - */ - private static class RangeDeleteProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, IgfsEntryInfo>, - Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Range. */ - private IgfsFileAffinityRange range; - - /** - * Constructor. - */ - public RangeDeleteProcessor() { - // No-op. - } - - /** - * Constructor. - * - * @param range Range. - */ - public RangeDeleteProcessor(IgfsFileAffinityRange range) { - this.range = range; - } - - /** {@inheritDoc} */ - @Override public IgfsEntryInfo process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args) - throws EntryProcessorException { - IgfsEntryInfo oldInfo = entry.getValue(); - - IgfsFileMap newMap = new IgfsFileMap(oldInfo.fileMap()); - - newMap.deleteRange(range); - - IgfsEntryInfo newInfo = oldInfo.fileMap(newMap); - - entry.setValue(newInfo); - - return newInfo; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(range); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - range = (IgfsFileAffinityRange)in.readObject(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(RangeDeleteProcessor.class, this); - } - } - - /** * Fragmentizer coordinator thread. */ private class FragmentizerCoordinator extends GridWorker implements GridLocalEventListener, GridMessageListener { http://git-wip-us.apache.org/repos/asf/ignite/blob/865e376a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java index d66d9be..1aa49ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java @@ -44,7 +44,17 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheInternal; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; -import org.apache.ignite.internal.processors.task.GridInternal; +import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaDirectoryCreateProcessor; +import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileCreateProcessor; +import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileLockProcessor; +import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileReserveSpaceProcessor; +import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileUnlockProcessor; +import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaDirectoryListingAddProcessor; +import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaDirectoryListingRemoveProcessor; +import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaDirectoryListingReplaceProcessor; +import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaUpdatePathProcessor; +import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaUpdatePropertiesProcessor; +import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaUpdateTimesProcessor; import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.lang.GridClosureException; @@ -52,22 +62,15 @@ import org.apache.ignite.internal.util.lang.IgniteOutClosureX; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; -import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; import javax.cache.processor.EntryProcessor; -import javax.cache.processor.EntryProcessorException; -import javax.cache.processor.MutableEntry; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; +import javax.cache.processor.EntryProcessorResult; import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; @@ -89,7 +92,6 @@ import java.util.concurrent.CountDownLatch; /** * Cache based structure (meta data) manager. */ -@SuppressWarnings("all") public class IgfsMetaManager extends IgfsManager { /** Comparator for Id sorting. */ private static final Comparator<IgniteUuid> PATH_ID_SORTING_COMPARATOR @@ -161,6 +163,7 @@ public class IgfsMetaManager extends IgfsManager { } /** {@inheritDoc} */ + @SuppressWarnings("RedundantCast") @Override protected void onKernalStart0() throws IgniteCheckedException { metaCache = igfsCtx.kernalContext().cache().getOrStartCache(cfg.getMetaCacheName()); @@ -511,9 +514,7 @@ public class IgfsMetaManager extends IgfsManager { assert fileId != null; - IgniteInternalTx tx = startTx(); - - try { + try (IgniteInternalTx tx = startTx()) { // Lock file ID for this transaction. IgfsEntryInfo oldInfo = info(fileId); @@ -532,9 +533,6 @@ public class IgfsMetaManager extends IgfsManager { catch (GridClosureException e) { throw U.cast(e); } - finally { - tx.close(); - } } finally { busyLock.leaveBusy(); @@ -593,12 +591,12 @@ public class IgfsMetaManager extends IgfsManager { throw fsException(new IgfsPathNotFoundException("Failed to unlock file (file not " + "found): " + fileId)); - if (!info.lockId().equals(oldInfo.lockId())) + if (!F.eq(info.lockId(), oldInfo.lockId())) throw new IgniteCheckedException("Failed to unlock file (inconsistent file lock ID) " + "[fileId=" + fileId + ", lockId=" + info.lockId() + ", actualLockId=" + oldInfo.lockId() + ']'); - id2InfoPrj.invoke(fileId, new FileUnlockProcessor(modificationTime)); + id2InfoPrj.invoke(fileId, new IgfsMetaFileUnlockProcessor(modificationTime)); return null; } @@ -701,8 +699,6 @@ public class IgfsMetaManager extends IgfsManager { throws IgniteCheckedException { assert IgfsUtils.isRootOrTrashId(id); - long time = System.currentTimeMillis(); - IgfsEntryInfo info = IgfsUtils.createDirectory(id); IgfsEntryInfo oldInfo = id2InfoPrj.getAndPutIfAbsent(id, info); @@ -898,9 +894,7 @@ public class IgfsMetaManager extends IgfsManager { srcPathIds.addExistingIds(lockIds); dstPathIds.addExistingIds(lockIds); - IgniteInternalTx tx = startTx(); - - try { + try (IgniteInternalTx tx = startTx()) { // Obtain the locks. final Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds); @@ -942,9 +936,6 @@ public class IgfsMetaManager extends IgfsManager { // Set the new path to the info to simplify event creation: return srcInfo.path(newPath); } - finally { - tx.close(); - } } finally { busyLock.leaveBusy(); @@ -956,41 +947,6 @@ public class IgfsMetaManager extends IgfsManager { } /** - * Verify path integrity. - * - * @param path Path to verify. - * @param expIds Expected IDs for this path. Might contain additional elements, e.g. because they were created - * on a child path. - * @param infos Locked infos. - * @return verification result. - */ - private static boolean verifyPathIntegrity(IgfsPath path, List<IgniteUuid> expIds, - Map<IgniteUuid, IgfsEntryInfo> infos) { - List<String> pathParts = path.components(); - - assert pathParts.size() < expIds.size(); - - for (int i = 0; i < pathParts.size(); i++) { - IgniteUuid parentId = expIds.get(i); - - // If parent ID is null, it doesn't exist. - if (parentId != null) { - IgfsEntryInfo parentInfo = infos.get(parentId); - - // If parent info is null, it doesn't exist. - if (parentInfo != null) { - if (parentInfo.hasChild(pathParts.get(i), expIds.get(i + 1))) - continue; - } - } - - return false; - } - - return true; - } - - /** * Move or rename file in existing transaction. * * @param fileId File ID to move or rename. @@ -1069,6 +1025,7 @@ public class IgfsMetaManager extends IgfsManager { * elements moved to TRASH folder. * @throws IgniteCheckedException On error. */ + @SuppressWarnings("RedundantCast") IgniteUuid format() throws IgniteCheckedException { if (busyLock.enterBusy()) { try { @@ -1076,9 +1033,7 @@ public class IgfsMetaManager extends IgfsManager { IgniteUuid trashId = IgfsUtils.randomTrashId(); - final IgniteInternalTx tx = startTx(); - - try { + try (IgniteInternalTx tx = startTx()) { // NB: We may lock root because its id is less than any other id: final IgfsEntryInfo rootInfo = lockIds(IgfsUtils.ROOT_ID, trashId).get(IgfsUtils.ROOT_ID); @@ -1097,7 +1052,7 @@ public class IgfsMetaManager extends IgfsManager { IgfsEntryInfo newInfo = IgfsUtils.createDirectory( IgniteUuid.randomUuid(), transferListing, - (Map<String,String>)null + (Map<String, String>) null ); createNewEntry(newInfo, trashId, newInfo.id().toString()); @@ -1112,9 +1067,6 @@ public class IgfsMetaManager extends IgfsManager { return newInfo.id(); } - finally { - tx.close(); - } } finally { busyLock.leaveBusy(); @@ -1127,9 +1079,8 @@ public class IgfsMetaManager extends IgfsManager { /** * Move path to the trash directory. * - * @param parentId Parent ID. - * @param pathName Path name. - * @param pathId Path ID. + * @param path Path. + * @param recursive Recursive flag. * @return ID of an entry located directly under the trash directory. * @throws IgniteCheckedException If failed. */ @@ -1159,9 +1110,7 @@ public class IgfsMetaManager extends IgfsManager { allIds.add(trashId); - IgniteInternalTx tx = startTx(); - - try { + try (IgniteInternalTx tx = startTx()) { // Lock participants. Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(allIds); @@ -1197,9 +1146,6 @@ public class IgfsMetaManager extends IgfsManager { return victimId; } - finally { - tx.close(); - } } finally { busyLock.leaveBusy(); @@ -1220,9 +1166,9 @@ public class IgfsMetaManager extends IgfsManager { * @return ID of an entry located directly under the trash directory. * @throws IgniteCheckedException If failed. */ + @SuppressWarnings("RedundantCast") @Nullable private IgniteUuid softDeleteNonTx(@Nullable IgniteUuid parentId, @Nullable String name, IgniteUuid id, - IgniteUuid trashId) - throws IgniteCheckedException { + IgniteUuid trashId) throws IgniteCheckedException { validTxState(true); IgniteUuid resId; @@ -1268,7 +1214,7 @@ public class IgfsMetaManager extends IgfsManager { // Remove listing entries from root. for (Map.Entry<String, IgfsListingEntry> entry : transferListing.entrySet()) id2InfoPrj.invoke(IgfsUtils.ROOT_ID, - new ListingRemoveProcessor(entry.getKey(), entry.getValue().fileId())); + new IgfsMetaDirectoryListingRemoveProcessor(entry.getKey(), entry.getValue().fileId())); resId = newInfo.id(); } @@ -1304,9 +1250,7 @@ public class IgfsMetaManager extends IgfsManager { assert listing != null; validTxState(false); - IgniteInternalTx tx = startTx(); - - try { + try (IgniteInternalTx tx = startTx()) { Collection<IgniteUuid> res = new HashSet<>(); // Obtain all necessary locks in one hop. @@ -1367,9 +1311,6 @@ public class IgfsMetaManager extends IgfsManager { return res; } - finally { - tx.close(); - } } finally { busyLock.leaveBusy(); @@ -1395,17 +1336,13 @@ public class IgfsMetaManager extends IgfsManager { try { validTxState(false); - IgniteInternalTx tx = startTx(); - - try { - boolean res = false; - + try (IgniteInternalTx tx = startTx()) { Map<IgniteUuid, IgfsEntryInfo> infos = lockIds(parentId, id); IgfsEntryInfo victim = infos.get(id); if (victim == null) - return res; + return false; assert victim.isDirectory() || IgfsUtils.DELETE_LOCK_ID.equals(victim.lockId()) : " isDir: " + victim.isDirectory() + ", lockId: " + victim.lockId(); @@ -1419,19 +1356,16 @@ public class IgfsMetaManager extends IgfsManager { IgfsListingEntry childEntry = parentInfo.listing().get(name); if (childEntry != null) - id2InfoPrj.invoke(parentId, new ListingRemoveProcessor(name, id)); + id2InfoPrj.invoke(parentId, new IgfsMetaDirectoryListingRemoveProcessor(name, id)); id2InfoPrj.remove(id); - res = true; - } + tx.commit(); - tx.commit(); + return true; + } - return res; - } - finally { - tx.close(); + return false; } } finally { @@ -1499,7 +1433,7 @@ public class IgfsMetaManager extends IgfsManager { if (oldInfo == null) return null; - return invokeAndGet(fileId, new UpdatePropertiesProcessor(props)); + return invokeAndGet(fileId, new IgfsMetaUpdatePropertiesProcessor(props)); } catch (GridClosureException e) { throw U.cast(e); @@ -1520,18 +1454,13 @@ public class IgfsMetaManager extends IgfsManager { try { validTxState(false); - IgniteInternalTx tx = startTx(); - - try { + try (IgniteInternalTx tx = startTx()) { IgfsEntryInfo info = updatePropertiesNonTx(fileId, props); tx.commit(); return info; } - finally { - tx.close(); - } } finally { busyLock.leaveBusy(); @@ -1560,16 +1489,15 @@ public class IgfsMetaManager extends IgfsManager { if (log.isDebugEnabled()) log.debug("Reserve file space [path=" + path + ", id=" + fileId + ']'); - IgniteInternalTx tx = startTx(); - - try { + try (IgniteInternalTx tx = startTx()) { // Lock file ID for this transaction. IgfsEntryInfo oldInfo = info(fileId); if (oldInfo == null) throw fsException("File has been deleted concurrently [path=" + path + ", id=" + fileId + ']'); - IgfsEntryInfo newInfo = invokeAndGet(fileId, new FileReserveSpaceProcessor(space, affRange)); + IgfsEntryInfo newInfo = + invokeAndGet(fileId, new IgfsMetaFileReserveSpaceProcessor(space, affRange)); tx.commit(); @@ -1578,9 +1506,6 @@ public class IgfsMetaManager extends IgfsManager { catch (GridClosureException e) { throw U.cast(e); } - finally { - tx.close(); - } } finally { busyLock.leaveBusy(); @@ -1610,9 +1535,7 @@ public class IgfsMetaManager extends IgfsManager { if (log.isDebugEnabled()) log.debug("Update file info [fileId=" + fileId + ", proc=" + proc + ']'); - IgniteInternalTx tx = startTx(); - - try { + try (IgniteInternalTx tx = startTx()) { // Lock file ID for this transaction. IgfsEntryInfo oldInfo = info(fileId); @@ -1623,7 +1546,7 @@ public class IgfsMetaManager extends IgfsManager { if (newInfo == null) throw fsException("Failed to update file info with null value" + - " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", proc=" + proc + ']'); + " [oldInfo=" + oldInfo + ", newInfo=null, proc=" + proc + ']'); if (!oldInfo.id().equals(newInfo.id())) throw fsException("Failed to update file info (file IDs differ)" + @@ -1640,9 +1563,6 @@ public class IgfsMetaManager extends IgfsManager { catch (GridClosureException e) { throw U.cast(e); } - finally { - tx.close(); - } } finally { busyLock.leaveBusy(); @@ -1679,9 +1599,7 @@ public class IgfsMetaManager extends IgfsManager { assert lockIds.size() == pathIds.count(); // Start TX. - IgniteInternalTx tx = startTx(); - - try { + try (IgniteInternalTx tx = startTx()) { final Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds); if (!pathIds.verifyIntegrity(lockInfos)) @@ -1710,9 +1628,6 @@ public class IgfsMetaManager extends IgfsManager { // We are done. return true; } - finally { - tx.close(); - } } finally { busyLock.leaveBusy(); @@ -1735,18 +1650,13 @@ public class IgfsMetaManager extends IgfsManager { try { validTxState(false); - IgniteInternalTx tx = startTx(); - - try { + try (IgniteInternalTx tx = startTx()) { Object prev = val != null ? metaCache.getAndPut(sampling, val) : metaCache.getAndRemove(sampling); tx.commit(); return !F.eq(prev, val); } - finally { - tx.close(); - } } finally { busyLock.leaveBusy(); @@ -1795,7 +1705,7 @@ public class IgfsMetaManager extends IgfsManager { throw fsException("Failed to create new metadata entry due to ID conflict: " + info.id()); if (parentId != null) - id2InfoPrj.invoke(parentId, new ListingAddProcessor(name, new IgfsListingEntry(info))); + id2InfoPrj.invoke(parentId, new IgfsMetaDirectoryListingAddProcessor(name, new IgfsListingEntry(info))); } /** @@ -1812,8 +1722,8 @@ public class IgfsMetaManager extends IgfsManager { IgniteUuid destId, String destName) throws IgniteCheckedException { validTxState(true); - id2InfoPrj.invoke(srcId, new ListingRemoveProcessor(srcName, entry.fileId())); - id2InfoPrj.invoke(destId, new ListingAddProcessor(destName, entry)); + id2InfoPrj.invoke(srcId, new IgfsMetaDirectoryListingRemoveProcessor(srcName, entry.fileId())); + id2InfoPrj.invoke(destId, new IgfsMetaDirectoryListingAddProcessor(destName, entry)); } /** @@ -1825,7 +1735,7 @@ public class IgfsMetaManager extends IgfsManager { * @throws IgniteCheckedException If failed. */ private IgfsEntryInfo invokeLock(IgniteUuid id, boolean delete) throws IgniteCheckedException { - return invokeAndGet(id, new FileLockProcessor(createFileLockId(delete))); + return invokeAndGet(id, new IgfsMetaFileLockProcessor(createFileLockId(delete))); } /** @@ -1838,7 +1748,7 @@ public class IgfsMetaManager extends IgfsManager { private void invokeUpdatePath(IgniteUuid id, IgfsPath path) throws IgniteCheckedException { validTxState(true); - id2InfoPrj.invoke(id, new UpdatePathProcessor(path)); + id2InfoPrj.invoke(id, new IgfsMetaUpdatePathProcessor(path)); } /** @@ -1853,7 +1763,11 @@ public class IgfsMetaManager extends IgfsManager { throws IgniteCheckedException { validTxState(true); - return id2InfoPrj.invoke(id, proc).get(); + EntryProcessorResult<IgfsEntryInfo> res = id2InfoPrj.invoke(id, proc); + + assert res != null; + + return res.get(); } /** @@ -1985,6 +1899,8 @@ public class IgfsMetaManager extends IgfsManager { ); // Add new file info to the listing optionally removing the previous one. + assert parentInfo != null; + IgniteUuid oldId = putIfAbsentNonTx(parentInfo.id(), path.name(), newInfo); if (oldId != null) { @@ -1998,12 +1914,12 @@ public class IgfsMetaManager extends IgfsManager { path + ", fileId=" + oldId + ", lockId=" + oldInfo.lockId() + ']'); id2InfoPrj.remove(oldId); // Remove the old one. - id2InfoPrj.invoke(parentInfo.id(), - new ListingRemoveProcessor(path.name(), parentInfo.listing().get(path.name()).fileId())); + id2InfoPrj.invoke(parentInfo.id(), new IgfsMetaDirectoryListingRemoveProcessor( + path.name(), parentInfo.listing().get(path.name()).fileId())); createNewEntry(newInfo, parentInfo.id(), path.name()); // Put new one. - IgniteInternalFuture<?> delFut = igfsCtx.data().delete(oldInfo); + igfsCtx.data().delete(oldInfo); } // Record CREATE event if needed. @@ -2084,13 +2000,13 @@ public class IgfsMetaManager extends IgfsManager { if (remainder > 0) { int blockIdx = (int)(len / blockSize); - IgfsSecondaryFileSystemPositionedReadable reader = fs.open(path, bufSize); + try (IgfsSecondaryFileSystemPositionedReadable reader = fs.open(path, bufSize)) { + IgniteInternalFuture<byte[]> fut = + igfsCtx.data().dataBlock(info, path, blockIdx, reader); - try { - igfsCtx.data().dataBlock(info, path, blockIdx, reader).get(); - } - finally { - reader.close(); + assert fut != null; + + fut.get(); } } @@ -2366,12 +2282,8 @@ public class IgfsMetaManager extends IgfsManager { fs.rename(src, dest); // Rename was successful, perform compensation in the local file system. - if (destInfo == null) { - // Move and rename. - assert destParentInfo != null; - + if (destInfo == null) moveNonTx(srcInfo.id(), src.name(), srcParentInfo.id(), dest.name(), destParentInfo.id()); - } else { // Move. if (destInfo.isFile()) @@ -2622,6 +2534,8 @@ public class IgfsMetaManager extends IgfsManager { status.modificationTime() ); + assert parentInfo != null; + IgniteUuid oldId = putIfAbsentNonTx(parentInfo.id(), components.get(i), curInfo); if (oldId != null) @@ -2669,13 +2583,9 @@ public class IgfsMetaManager extends IgfsManager { * @return Result of task execution. * @throws IgniteCheckedException If failed. */ - private <T> T synchronizeAndExecute(SynchronizationTask<T> task, - IgfsSecondaryFileSystem fs, - boolean strict, - @Nullable Collection<IgniteUuid> extraLockIds, - IgfsPath... paths) - throws IgniteCheckedException - { + @SuppressWarnings({"Contract", "ConstantConditions"}) + private <T> T synchronizeAndExecute(SynchronizationTask<T> task, IgfsSecondaryFileSystem fs, boolean strict, + @Nullable Collection<IgniteUuid> extraLockIds, IgfsPath... paths) throws IgniteCheckedException { assert task != null; assert fs != null; assert paths != null && paths.length > 0; @@ -2696,9 +2606,8 @@ public class IgfsMetaManager extends IgfsManager { pathIds.add(fileIds(path)); // Start pessimistic. - IgniteInternalTx tx = startTx(); - try { + try (IgniteInternalTx tx = startTx()) { // Lock the very first existing parents and possibly the leaf as well. Map<IgfsPath, IgfsPath> pathToParent = new HashMap<>(); @@ -2864,34 +2773,12 @@ public class IgfsMetaManager extends IgfsManager { else throw e; } - finally { - tx.close(); - } } return res; } /** - * Update cached value with closure. - * - * @param cache Cache projection to work with. - * @param key Key to retrieve/update the value for. - * @param c Closure to apply to cached value. - * @return {@code True} if value was stored in cache, {@code false} otherwise. - * @throws IgniteCheckedException If operation failed. - */ - private <K, V> boolean putx(IgniteInternalCache<K, V> cache, K key, IgniteClosure<V, V> c) - throws IgniteCheckedException { - validTxState(true); - - V oldVal = cache.get(key); - V newVal = c.apply(oldVal); - - return newVal == null ? cache.remove(key) : cache.put(key, newVal); - } - - /** * Check transaction is (not) started. * * @param inTx Expected transaction state. @@ -2927,16 +2814,14 @@ public class IgfsMetaManager extends IgfsManager { validTxState(false); // Start pessimistic transaction. - IgniteInternalTx tx = startTx(); - - try { + try (IgniteInternalTx tx = startTx()) { Map<IgniteUuid, IgfsEntryInfo> infoMap = lockIds(fileId, parentId); IgfsEntryInfo fileInfo = infoMap.get(fileId); if (fileInfo == null) throw fsException(new IgfsPathNotFoundException("Failed to update times " + - "(path was not found): " + fileName)); + "(path was not found): " + fileName)); IgfsEntryInfo parentInfo = infoMap.get(parentId); @@ -2947,20 +2832,17 @@ public class IgfsMetaManager extends IgfsManager { // Validate listing. if (!parentInfo.hasChild(fileName, fileId)) throw fsException(new IgfsConcurrentModificationException("Failed to update times " + - "(file concurrently modified): " + fileName)); + "(file concurrently modified): " + fileName)); assert parentInfo.isDirectory(); - id2InfoPrj.invoke(fileId, new UpdateTimesProcessor( + id2InfoPrj.invoke(fileId, new IgfsMetaUpdateTimesProcessor( accessTime == -1 ? fileInfo.accessTime() : accessTime, modificationTime == -1 ? fileInfo.modificationTime() : modificationTime) ); tx.commit(); } - finally { - tx.close(); - } } finally { busyLock.leaveBusy(); @@ -2980,7 +2862,7 @@ public class IgfsMetaManager extends IgfsManager { } /** - * @param msg Error message. + * @param err Unchecked exception. * @return Checked exception. */ private static IgniteCheckedException fsException(IgfsException err) { @@ -3011,365 +2893,6 @@ public class IgfsMetaManager extends IgfsManager { } /** - * Path descriptor. - */ - private static class PathDescriptor { - /** Path. */ - private final IgfsPath path; - - /** Resolved IDs. */ - private final List<IgniteUuid> ids; - - /** Parent path. */ - private IgfsPath parentPath; - - /** Parent path info. */ - private IgfsEntryInfo parentInfo; - - /** - * Constructor. - * - * @param path Path. - * @param ids Resolved path IDs. - * @param parentPath Parent path. - * @param parentInfo Parent info. - */ - PathDescriptor(IgfsPath path, List<IgniteUuid> ids, IgfsPath parentPath, IgfsEntryInfo parentInfo) { - assert path != null; - assert ids != null && !ids.isEmpty(); - assert parentPath == null && parentInfo == null || parentPath != null && parentInfo != null; - assert parentPath == null || parentPath != null && path.isSubDirectoryOf(parentPath); - - this.path = path; - this.ids = ids; - this.parentPath = parentPath; - this.parentInfo = parentInfo; - } - - /** - * Get resolved path ids. - * - * @return Path ids. - */ - private Collection<IgniteUuid> ids() { - return ids; - } - - /** - * Get path ID from the end. E.g. endId(1) will return the last element. - * @param i Element index from the end. - * - * @return Path ID from the end. - */ - private IgniteUuid endId(int i) { - return ids.get(ids.size() - i); - } - - /** - * Update ID with the given index. - * - * @param newParentPath New parent path. - * @param newParentInfo New parent info. - */ - private void updateParent(IgfsPath newParentPath, IgfsEntryInfo newParentInfo) { - assert newParentPath != null; - assert newParentInfo != null; - assert path.isSubDirectoryOf(newParentPath); - - parentPath = newParentPath; - parentInfo = newParentInfo; - - ids.set(newParentPath.components().size(), newParentInfo.id()); - } - - /** - * Get parent path. - * - * @return Parent path. - */ - private IgfsPath parentPath() { - return parentPath; - } - - /** - * Get parent path info. - * - * @return Parent path info. - */ - private IgfsEntryInfo parentInfo() { - return parentInfo; - } - } - - /** - * Remove entry from directory listing. - */ - @GridInternal - private static final class ListingRemoveProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>, - Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** File name. */ - private String fileName; - - /** Expected ID. */ - private IgniteUuid fileId; - - /** - * Default constructor. - */ - public ListingRemoveProcessor() { - // No-op. - } - - /** - * Constructor. - * - * @param fileName File name. - * @param fileId File ID. - */ - public ListingRemoveProcessor(String fileName, IgniteUuid fileId) { - this.fileName = fileName; - this.fileId = fileId; - } - - /** {@inheritDoc} */ - @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> e, Object... args) - throws EntryProcessorException { - IgfsEntryInfo fileInfo = e.getValue(); - - assert fileInfo != null; - assert fileInfo.isDirectory(); - - Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing()); - - listing.putAll(fileInfo.listing()); - - IgfsListingEntry oldEntry = listing.get(fileName); - - if (oldEntry == null || !oldEntry.fileId().equals(fileId)) - throw new IgniteException("Directory listing doesn't contain expected file" + - " [listing=" + listing + ", fileName=" + fileName + "]"); - - // Modify listing in-place. - listing.remove(fileName); - - e.setValue(fileInfo.listing(listing)); - - return null; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, fileName); - U.writeGridUuid(out, fileId); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - fileName = U.readString(in); - fileId = U.readGridUuid(in); - } - } - - /** - * Update directory listing closure. - */ - @GridInternal - private static final class ListingAddProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>, - Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** File name to add into parent listing. */ - private String fileName; - - /** File ID.*/ - private IgfsListingEntry entry; - - /** - * Empty constructor required for {@link Externalizable}. - * - */ - public ListingAddProcessor() { - // No-op. - } - - /** - * Constructs update directory listing closure. - * - * @param fileName File name to add into parent listing. - * @param entry Listing entry to add or remove. - */ - private ListingAddProcessor(String fileName, IgfsListingEntry entry) { - assert fileName != null; - assert entry != null; - - this.fileName = fileName; - this.entry = entry; - } - - /** {@inheritDoc} */ - @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> e, Object... args) { - IgfsEntryInfo fileInfo = e.getValue(); - - assert fileInfo.isDirectory(); - - Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing()); - - // Modify listing in-place. - IgfsListingEntry oldEntry = listing.put(fileName, entry); - - if (oldEntry != null && !oldEntry.fileId().equals(entry.fileId())) - throw new IgniteException("Directory listing contains unexpected file" + - " [listing=" + listing + ", fileName=" + fileName + ", entry=" + entry + - ", oldEntry=" + oldEntry + ']'); - - e.setValue(fileInfo.listing(listing)); - - return null; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, fileName); - out.writeObject(entry); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - fileName = U.readString(in); - entry = (IgfsListingEntry)in.readObject(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(ListingAddProcessor.class, this); - } - } - - /** - * Listing replace processor. - */ - private static final class ListingReplaceProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>, - Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Name. */ - private String name; - - /** New ID. */ - private IgniteUuid id; - - /** - * Constructor. - */ - public ListingReplaceProcessor() { - // No-op. - } - - /** - * Constructor. - * - * @param name Name. - * @param id ID. - */ - public ListingReplaceProcessor(String name, IgniteUuid id) { - this.name = name; - this.id = id; - } - - /** {@inheritDoc} */ - @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> e, Object... args) - throws EntryProcessorException { - IgfsEntryInfo fileInfo = e.getValue(); - - assert fileInfo.isDirectory(); - - Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing()); - - // Modify listing in-place. - IgfsListingEntry oldEntry = listing.get(name); - - if (oldEntry == null) - throw new IgniteException("Directory listing doesn't contain expected entry: " + name); - - listing.put(name, new IgfsListingEntry(id, oldEntry.isDirectory())); - - e.setValue(fileInfo.listing(listing)); - - return null; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, name); - out.writeObject(id); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - name = U.readString(in); - id = (IgniteUuid)in.readObject(); - } - } - - /** - * Update path closure. - */ - @GridInternal - private static final class UpdatePathProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>, - Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** New path. */ - private IgfsPath path; - - /** - * @param path Path. - */ - private UpdatePathProcessor(IgfsPath path) { - this.path = path; - } - - /** - * Default constructor (required by Externalizable). - */ - public UpdatePathProcessor() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> e, Object... args) { - IgfsEntryInfo info = e.getValue(); - - IgfsEntryInfo newInfo = info.path(path); - - e.setValue(newInfo); - - return null; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(path); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - path = (IgfsPath)in.readObject(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(UpdatePathProcessor.class, this); - } - } - - /** * Append routine. * * @param path Path. @@ -3409,9 +2932,7 @@ public class IgfsMetaManager extends IgfsManager { pathIds.addSurrogateIds(lockIds); // Start TX. - IgniteInternalTx tx = startTx(); - - try { + try (IgniteInternalTx tx = startTx()) { Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds); if (!pathIds.verifyIntegrity(lockInfos)) @@ -3458,9 +2979,6 @@ public class IgfsMetaManager extends IgfsManager { return new T2<>(res.info(), res.parentId()); } } - finally { - tx.close(); - } } finally { busyLock.leaveBusy(); @@ -3472,16 +2990,17 @@ public class IgfsMetaManager extends IgfsManager { } /** - * Create a new file. + * Create a file. * * @param path Path. - * @param bufSize Buffer size. + * @param dirProps Directory properties. * @param overwrite Overwrite flag. + * @param blockSize Block size. * @param affKey Affinity key. - * @param replication Replication factor. - * @param props Properties. - * @param simpleCreate Whether new file should be created in secondary FS using create(Path, boolean) method. - * @return Tuple containing the created file info and its parent id. + * @param evictExclude Evict exclude flag. + * @param fileProps File properties. + * @return @return Tuple containing the created file info and its parent id. + * @throws IgniteCheckedException If failed. */ IgniteBiTuple<IgfsEntryInfo, IgniteUuid> create( final IgfsPath path, @@ -3518,9 +3037,7 @@ public class IgfsMetaManager extends IgfsManager { } // Start TX. - IgniteInternalTx tx = startTx(); - - try { + try (IgniteInternalTx tx = startTx()) { Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds); if (!pathIds.verifyIntegrity(lockInfos)) @@ -3548,19 +3065,19 @@ public class IgfsMetaManager extends IgfsManager { // First step: add existing to trash listing. IgniteUuid oldId = pathIds.lastId(); - id2InfoPrj.invoke(trashId, new ListingAddProcessor(oldId.toString(), + id2InfoPrj.invoke(trashId, new IgfsMetaDirectoryListingAddProcessor(oldId.toString(), new IgfsListingEntry(oldInfo))); // Second step: replace ID in parent directory. String name = pathIds.lastPart(); IgniteUuid parentId = pathIds.lastParentId(); - id2InfoPrj.invoke(parentId, new ListingReplaceProcessor(name, overwriteId)); + id2InfoPrj.invoke(parentId, new IgfsMetaDirectoryListingReplaceProcessor(name, overwriteId)); // Third step: create the file. long createTime = System.currentTimeMillis(); - IgfsEntryInfo newInfo = invokeAndGet(overwriteId, new FileCreateProcessor(createTime, + IgfsEntryInfo newInfo = invokeAndGet(overwriteId, new IgfsMetaFileCreateProcessor(createTime, fileProps, blockSize, affKey, createFileLockId(false), evictExclude)); // Fourth step: update path of remove file. @@ -3592,9 +3109,6 @@ public class IgfsMetaManager extends IgfsManager { return new T2<>(res.info(), res.parentId()); } } - finally { - tx.close(); - } } finally { busyLock.leaveBusy(); @@ -3682,7 +3196,7 @@ public class IgfsMetaManager extends IgfsManager { return null; // First step: add new entry to the last existing element. - id2InfoPrj.invoke(lastExistingInfo.id(), new ListingAddProcessor(curPart, + id2InfoPrj.invoke(lastExistingInfo.id(), new IgfsMetaDirectoryListingAddProcessor(curPart, new IgfsListingEntry(curId, dir || !pathIds.isLastIndex(curIdx)))); // Events support. @@ -3699,7 +3213,7 @@ public class IgfsMetaManager extends IgfsManager { String nextPart = pathIds.part(nextIdx); IgniteUuid nextId = pathIds.surrogateId(nextIdx); - id2InfoPrj.invoke(curId, new DirectoryCreateProcessor(createTime, dirProps, + id2InfoPrj.invoke(curId, new IgfsMetaDirectoryCreateProcessor(createTime, dirProps, nextPart, new IgfsListingEntry(nextId, dir || !pathIds.isLastIndex(nextIdx)))); // Save event. @@ -3720,9 +3234,9 @@ public class IgfsMetaManager extends IgfsManager { IgfsEntryInfo info; if (dir) - info = invokeAndGet(curId, new DirectoryCreateProcessor(createTime, dirProps)); + info = invokeAndGet(curId, new IgfsMetaDirectoryCreateProcessor(createTime, dirProps)); else - info = invokeAndGet(curId, new FileCreateProcessor(createTime, fileProps, + info = invokeAndGet(curId, new IgfsMetaFileCreateProcessor(createTime, fileProps, blockSize, affKey, createFileLockId(false), evictExclude)); createdPaths.add(pathIds.path()); @@ -3752,476 +3266,4 @@ public class IgfsMetaManager extends IgfsManager { else IgfsUtils.sendEvents(igfsCtx.kernalContext(), leafPath, EventType.EVT_IGFS_DIR_CREATED); } - - /** - * File create processor. - */ - private static class FileCreateProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, IgfsEntryInfo>, - Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Create time. */ - private long createTime; - - /** Properties. */ - private Map<String, String> props; - - /** Block size. */ - private int blockSize; - - /** Affintiy key. */ - private IgniteUuid affKey; - - /** Lcok ID. */ - private IgniteUuid lockId; - - /** Evict exclude flag. */ - private boolean evictExclude; - - /** - * Constructor. - */ - public FileCreateProcessor() { - // No-op. - } - - /** - * Constructor. - * - * @param createTime Create time. - * @param props Properties. - * @param blockSize Block size. - * @param affKey Affinity key. - * @param lockId Lock ID. - * @param evictExclude Evict exclude flag. - */ - public FileCreateProcessor(long createTime, Map<String, String> props, int blockSize, - @Nullable IgniteUuid affKey, IgniteUuid lockId, boolean evictExclude) { - this.createTime = createTime; - this.props = props; - this.blockSize = blockSize; - this.affKey = affKey; - this.lockId = lockId; - this.evictExclude = evictExclude; - } - - /** {@inheritDoc} */ - @Override public IgfsEntryInfo process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args) - throws EntryProcessorException { - IgfsEntryInfo info = IgfsUtils.createFile( - entry.getKey(), - blockSize, - 0L, - affKey, - lockId, - evictExclude, - props, - createTime, - createTime - ); - - entry.setValue(info); - - return info; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(createTime); - U.writeStringMap(out, props); - out.writeInt(blockSize); - out.writeObject(affKey); - out.writeObject(lockId); - out.writeBoolean(evictExclude); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - createTime = in.readLong(); - props = U.readStringMap(in); - blockSize = in.readInt(); - affKey = (IgniteUuid)in.readObject(); - lockId = (IgniteUuid)in.readObject(); - evictExclude = in.readBoolean(); - } - } - - /** - * Directory create processor. - */ - private static class DirectoryCreateProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, IgfsEntryInfo>, - Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Create time. */ - private long createTime; - - /** Properties. */ - private Map<String, String> props; - - /** Child name (optional). */ - private String childName; - - /** Child entry (optional. */ - private IgfsListingEntry childEntry; - - /** - * Constructor. - */ - public DirectoryCreateProcessor() { - // No-op. - } - - /** - * Constructor. - * - * @param createTime Create time. - * @param props Properties. - */ - public DirectoryCreateProcessor(long createTime, Map<String, String> props) { - this(createTime, props, null, null); - } - - /** - * Constructor. - * - * @param createTime Create time. - * @param props Properties. - * @param childName Child name. - * @param childEntry Child entry. - */ - public DirectoryCreateProcessor(long createTime, Map<String, String> props, String childName, - IgfsListingEntry childEntry) { - this.createTime = createTime; - this.props = props; - this.childName = childName; - this.childEntry = childEntry; - } - - /** {@inheritDoc} */ - @Override public IgfsEntryInfo process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args) - throws EntryProcessorException { - - IgfsEntryInfo info = IgfsUtils.createDirectory( - entry.getKey(), - null, - props, - createTime, - createTime - ); - - if (childName != null) - info = info.listing(Collections.singletonMap(childName, childEntry)); - - entry.setValue(info); - - return info; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(createTime); - U.writeStringMap(out, props); - - if (childName != null) { - out.writeBoolean(true); - - U.writeString(out, childName); - out.writeObject(childEntry); - } - else - out.writeBoolean(false); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - createTime = in.readLong(); - props = U.readStringMap(in); - - if (in.readBoolean()) { - childName = U.readString(in); - childEntry = (IgfsListingEntry)in.readObject(); - } - } - } - - /** - * File lock entry processor. - */ - private static class FileLockProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, IgfsEntryInfo>, - Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Lock Id. */ - private IgniteUuid lockId; - - /** - * Default constructor. - */ - public FileLockProcessor() { - // No-op. - } - - /** - * Constructor. - * - * @param lockId Lock ID. - */ - public FileLockProcessor(IgniteUuid lockId) { - this.lockId = lockId; - } - - /** {@inheritDoc} */ - @Override public IgfsEntryInfo process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args) - throws EntryProcessorException { - IgfsEntryInfo oldInfo = entry.getValue(); - - IgfsEntryInfo newInfo = oldInfo.lock(lockId); - - entry.setValue(newInfo); - - return newInfo; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeGridUuid(out, lockId); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - lockId = U.readGridUuid(in); - } - } - - /** - * File unlock entry processor. - */ - private static class FileUnlockProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>, - Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Modification time. */ - private long modificationTime; - - /** - * Default constructor. - */ - public FileUnlockProcessor() { - // No-op. - } - - /** - * Constructor. - * - * @param modificationTime Modification time. - */ - public FileUnlockProcessor(long modificationTime) { - this.modificationTime = modificationTime; - } - - /** {@inheritDoc} */ - @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args) - throws EntryProcessorException { - IgfsEntryInfo old = entry.getValue(); - - entry.setValue(old.unlock(modificationTime)); - - return null; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(modificationTime); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - modificationTime = in.readLong(); - } - } - - /** - * File reserve space entry processor. - */ - private static class FileReserveSpaceProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, IgfsEntryInfo>, - Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Space. */ - private long space; - - /** Affinity range. */ - private IgfsFileAffinityRange affRange; - - /** - * Default constructor. - */ - public FileReserveSpaceProcessor() { - // No-op. - } - - /** - * Constructor. - * - * @param space Space. - * @param affRange - */ - public FileReserveSpaceProcessor(long space, IgfsFileAffinityRange affRange) { - this.space = space; - this.affRange = affRange; - } - - /** {@inheritDoc} */ - @Override public IgfsEntryInfo process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args) - throws EntryProcessorException { - IgfsEntryInfo oldInfo = entry.getValue(); - - IgfsFileMap newMap = new IgfsFileMap(oldInfo.fileMap()); - - newMap.addRange(affRange); - - IgfsEntryInfo newInfo = oldInfo.length(oldInfo.length() + space).fileMap(newMap); - - entry.setValue(newInfo); - - return newInfo; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(space); - out.writeObject(affRange); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - space = in.readLong(); - affRange = (IgfsFileAffinityRange)in.readObject(); - } - } - - /** - * Update properties processor. - */ - private static class UpdatePropertiesProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, IgfsEntryInfo>, - Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Properties to be updated. */ - private Map<String, String> props; - - /** - * Constructor. - */ - public UpdatePropertiesProcessor() { - // No-op. - } - - /** - * Constructor. - * - * @param props Properties. - */ - public UpdatePropertiesProcessor(Map<String, String> props) { - this.props = props; - } - - /** {@inheritDoc} */ - @Override public IgfsEntryInfo process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args) - throws EntryProcessorException { - IgfsEntryInfo oldInfo = entry.getValue(); - - Map<String, String> tmp = oldInfo.properties(); - - tmp = tmp == null ? new GridLeanMap<String, String>(props.size()) : new GridLeanMap<>(tmp); - - for (Map.Entry<String, String> e : props.entrySet()) { - if (e.getValue() == null) - // Remove properties with 'null' values. - tmp.remove(e.getKey()); - else - // Add/overwrite property. - tmp.put(e.getKey(), e.getValue()); - } - - IgfsEntryInfo newInfo = oldInfo.properties(tmp); - - entry.setValue(newInfo); - - return newInfo; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeStringMap(out, props); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - props = U.readStringMap(in); - } - } - - /** - * Update times entry processor. - */ - private static class UpdateTimesProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>, - Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Access time. */ - private long accessTime; - - /** Modification time. */ - private long modificationTime; - - /** - * Default constructor. - */ - public UpdateTimesProcessor() { - // No-op. - } - - /** - * Constructor. - * - * @param accessTime Access time. - * @param modificationTime Modification time. - */ - public UpdateTimesProcessor(long accessTime, long modificationTime) { - this.accessTime = accessTime; - this.modificationTime = modificationTime; - } - - /** {@inheritDoc} */ - @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args) - throws EntryProcessorException { - - IgfsEntryInfo oldInfo = entry.getValue(); - - entry.setValue(oldInfo.accessModificationTime(accessTime, modificationTime)); - - return null; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(accessTime); - out.writeLong(modificationTime); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - accessTime = in.readLong(); - modificationTime = in.readLong(); - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/865e376a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java index 2903239..e2fe58d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java @@ -134,7 +134,7 @@ public class IgfsPathIds { * * @return Last ID. */ - @Nullable public IgniteUuid lastId() { + public IgniteUuid lastId() { return ids[ids.length - 1]; } http://git-wip-us.apache.org/repos/asf/ignite/blob/865e376a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java new file mode 100644 index 0000000..ffba042 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java @@ -0,0 +1,117 @@ +package org.apache.ignite.internal.processors.igfs.meta; + +import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo; +import org.apache.ignite.internal.processors.igfs.IgfsListingEntry; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; + +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Collections; +import java.util.Map; + +/** + * Directory create processor. + */ +public class IgfsMetaDirectoryCreateProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, IgfsEntryInfo>, + Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Create time. */ + private long createTime; + + /** Properties. */ + private Map<String, String> props; + + /** Child name (optional). */ + private String childName; + + /** Child entry (optional. */ + private IgfsListingEntry childEntry; + + /** + * Constructor. + */ + public IgfsMetaDirectoryCreateProcessor() { + // No-op. + } + + /** + * Constructor. + * + * @param createTime Create time. + * @param props Properties. + */ + public IgfsMetaDirectoryCreateProcessor(long createTime, Map<String, String> props) { + this(createTime, props, null, null); + } + + /** + * Constructor. + * + * @param createTime Create time. + * @param props Properties. + * @param childName Child name. + * @param childEntry Child entry. + */ + public IgfsMetaDirectoryCreateProcessor(long createTime, Map<String, String> props, String childName, + IgfsListingEntry childEntry) { + this.createTime = createTime; + this.props = props; + this.childName = childName; + this.childEntry = childEntry; + } + + /** {@inheritDoc} */ + @Override public IgfsEntryInfo process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args) + throws EntryProcessorException { + + IgfsEntryInfo info = IgfsUtils.createDirectory( + entry.getKey(), + null, + props, + createTime, + createTime + ); + + if (childName != null) + info = info.listing(Collections.singletonMap(childName, childEntry)); + + entry.setValue(info); + + return info; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(createTime); + U.writeStringMap(out, props); + + if (childName != null) { + out.writeBoolean(true); + + U.writeString(out, childName); + out.writeObject(childEntry); + } + else + out.writeBoolean(false); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + createTime = in.readLong(); + props = U.readStringMap(in); + + if (in.readBoolean()) { + childName = U.readString(in); + childEntry = (IgfsListingEntry)in.readObject(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/865e376a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingAddProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingAddProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingAddProcessor.java new file mode 100644 index 0000000..ab5cd5d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingAddProcessor.java @@ -0,0 +1,92 @@ +package org.apache.ignite.internal.processors.igfs.meta; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo; +import org.apache.ignite.internal.processors.igfs.IgfsListingEntry; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; + +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.MutableEntry; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.HashMap; +import java.util.Map; + +/** + * Update directory listing closure. + */ +public final class IgfsMetaDirectoryListingAddProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>, + Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** File name to add into parent listing. */ + private String fileName; + + /** File ID.*/ + private IgfsListingEntry entry; + + /** + * Empty constructor required for {@link Externalizable}. + * + */ + public IgfsMetaDirectoryListingAddProcessor() { + // No-op. + } + + /** + * Constructs update directory listing closure. + * + * @param fileName File name to add into parent listing. + * @param entry Listing entry to add or remove. + */ + public IgfsMetaDirectoryListingAddProcessor(String fileName, IgfsListingEntry entry) { + assert fileName != null; + assert entry != null; + + this.fileName = fileName; + this.entry = entry; + } + + /** {@inheritDoc} */ + @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> e, Object... args) { + IgfsEntryInfo fileInfo = e.getValue(); + + assert fileInfo.isDirectory(); + + Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing()); + + // Modify listing in-place. + IgfsListingEntry oldEntry = listing.put(fileName, entry); + + if (oldEntry != null && !oldEntry.fileId().equals(entry.fileId())) + throw new IgniteException("Directory listing contains unexpected file" + + " [listing=" + listing + ", fileName=" + fileName + ", entry=" + entry + + ", oldEntry=" + oldEntry + ']'); + + e.setValue(fileInfo.listing(listing)); + + return null; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, fileName); + out.writeObject(entry); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + fileName = U.readString(in); + entry = (IgfsListingEntry)in.readObject(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsMetaDirectoryListingAddProcessor.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/865e376a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingRemoveProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingRemoveProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingRemoveProcessor.java new file mode 100644 index 0000000..181a73e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingRemoveProcessor.java @@ -0,0 +1,89 @@ +package org.apache.ignite.internal.processors.igfs.meta; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo; +import org.apache.ignite.internal.processors.igfs.IgfsListingEntry; +import org.apache.ignite.internal.processors.task.GridInternal; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; + +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.HashMap; +import java.util.Map; + +/** + * Remove entry from directory listing. + */ +public class IgfsMetaDirectoryListingRemoveProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>, + Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** File name. */ + private String fileName; + + /** Expected ID. */ + private IgniteUuid fileId; + + /** + * Default constructor. + */ + public IgfsMetaDirectoryListingRemoveProcessor() { + // No-op. + } + + /** + * Constructor. + * + * @param fileName File name. + * @param fileId File ID. + */ + public IgfsMetaDirectoryListingRemoveProcessor(String fileName, IgniteUuid fileId) { + this.fileName = fileName; + this.fileId = fileId; + } + + /** {@inheritDoc} */ + @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> e, Object... args) + throws EntryProcessorException { + IgfsEntryInfo fileInfo = e.getValue(); + + assert fileInfo != null; + assert fileInfo.isDirectory(); + + Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing()); + + listing.putAll(fileInfo.listing()); + + IgfsListingEntry oldEntry = listing.get(fileName); + + if (oldEntry == null || !oldEntry.fileId().equals(fileId)) + throw new IgniteException("Directory listing doesn't contain expected file" + + " [listing=" + listing + ", fileName=" + fileName + "]"); + + // Modify listing in-place. + listing.remove(fileName); + + e.setValue(fileInfo.listing(listing)); + + return null; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, fileName); + U.writeGridUuid(out, fileId); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + fileName = U.readString(in); + fileId = U.readGridUuid(in); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/865e376a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingReplaceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingReplaceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingReplaceProcessor.java new file mode 100644 index 0000000..4c4888c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingReplaceProcessor.java @@ -0,0 +1,84 @@ +package org.apache.ignite.internal.processors.igfs.meta; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo; +import org.apache.ignite.internal.processors.igfs.IgfsListingEntry; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; + +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.HashMap; +import java.util.Map; + +/** + * Listing replace processor. + */ +public final class IgfsMetaDirectoryListingReplaceProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>, + Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Name. */ + private String name; + + /** New ID. */ + private IgniteUuid id; + + /** + * Constructor. + */ + public IgfsMetaDirectoryListingReplaceProcessor() { + // No-op. + } + + /** + * Constructor. + * + * @param name Name. + * @param id ID. + */ + public IgfsMetaDirectoryListingReplaceProcessor(String name, IgniteUuid id) { + this.name = name; + this.id = id; + } + + /** {@inheritDoc} */ + @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> e, Object... args) + throws EntryProcessorException { + IgfsEntryInfo fileInfo = e.getValue(); + + assert fileInfo.isDirectory(); + + Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing()); + + // Modify listing in-place. + IgfsListingEntry oldEntry = listing.get(name); + + if (oldEntry == null) + throw new IgniteException("Directory listing doesn't contain expected entry: " + name); + + listing.put(name, new IgfsListingEntry(id, oldEntry.isDirectory())); + + e.setValue(fileInfo.listing(listing)); + + return null; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, name); + out.writeObject(id); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + name = U.readString(in); + id = (IgniteUuid)in.readObject(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/865e376a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileCreateProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileCreateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileCreateProcessor.java new file mode 100644 index 0000000..a07d764 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileCreateProcessor.java @@ -0,0 +1,110 @@ +package org.apache.ignite.internal.processors.igfs.meta; + +import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Map; + +/** + * File create processor. + */ +public class IgfsMetaFileCreateProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, IgfsEntryInfo>, + Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Create time. */ + private long createTime; + + /** Properties. */ + private Map<String, String> props; + + /** Block size. */ + private int blockSize; + + /** Affintiy key. */ + private IgniteUuid affKey; + + /** Lcok ID. */ + private IgniteUuid lockId; + + /** Evict exclude flag. */ + private boolean evictExclude; + + /** + * Constructor. + */ + public IgfsMetaFileCreateProcessor() { + // No-op. + } + + /** + * Constructor. + * + * @param createTime Create time. + * @param props Properties. + * @param blockSize Block size. + * @param affKey Affinity key. + * @param lockId Lock ID. + * @param evictExclude Evict exclude flag. + */ + public IgfsMetaFileCreateProcessor(long createTime, Map<String, String> props, int blockSize, + @Nullable IgniteUuid affKey, IgniteUuid lockId, boolean evictExclude) { + this.createTime = createTime; + this.props = props; + this.blockSize = blockSize; + this.affKey = affKey; + this.lockId = lockId; + this.evictExclude = evictExclude; + } + + /** {@inheritDoc} */ + @Override public IgfsEntryInfo process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args) + throws EntryProcessorException { + IgfsEntryInfo info = IgfsUtils.createFile( + entry.getKey(), + blockSize, + 0L, + affKey, + lockId, + evictExclude, + props, + createTime, + createTime + ); + + entry.setValue(info); + + return info; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(createTime); + U.writeStringMap(out, props); + out.writeInt(blockSize); + out.writeObject(affKey); + out.writeObject(lockId); + out.writeBoolean(evictExclude); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + createTime = in.readLong(); + props = U.readStringMap(in); + blockSize = in.readInt(); + affKey = (IgniteUuid)in.readObject(); + lockId = (IgniteUuid)in.readObject(); + evictExclude = in.readBoolean(); + } +}
