IGNITE-2813: IGFS: Optimized metadata values splitting file and directory into separate classes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4794f87b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4794f87b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4794f87b Branch: refs/heads/ignite-1786 Commit: 4794f87b5ebe2865b6afa541ce601df42fb8f6e3 Parents: 4af5316 Author: vozerov-gridgain <[email protected]> Authored: Fri Mar 18 17:45:48 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Fri Mar 18 17:45:48 2016 +0300 ---------------------------------------------------------------------- .../processors/igfs/IgfsDataManager.java | 103 ++-- .../processors/igfs/IgfsDeleteWorker.java | 27 +- .../processors/igfs/IgfsDirectoryInfo.java | 210 ++++++++ .../internal/processors/igfs/IgfsEntryInfo.java | 305 +++++++++++ .../internal/processors/igfs/IgfsFileImpl.java | 15 +- .../internal/processors/igfs/IgfsFileInfo.java | 500 +++---------------- .../processors/igfs/IgfsFileWorkerBatch.java | 75 +-- .../IgfsFileWorkerBatchCancelledException.java | 51 ++ .../igfs/IgfsFragmentizerManager.java | 64 ++- .../internal/processors/igfs/IgfsImpl.java | 28 +- .../processors/igfs/IgfsInputStreamAdapter.java | 5 +- .../processors/igfs/IgfsInputStreamImpl.java | 32 +- .../processors/igfs/IgfsIpcHandler.java | 16 +- .../processors/igfs/IgfsListingEntry.java | 12 +- .../processors/igfs/IgfsMetaManager.java | 432 ++++++++-------- .../processors/igfs/IgfsOutputStreamImpl.java | 19 +- .../internal/processors/igfs/IgfsPathIds.java | 4 +- .../processors/igfs/IgfsPathsCreateResult.java | 6 +- .../IgfsSecondaryInputStreamDescriptor.java | 6 +- .../IgfsSecondaryOutputStreamDescriptor.java | 9 +- .../internal/processors/igfs/IgfsUtils.java | 68 +++ .../internal/processors/igfs/package-info.java | 2 +- .../igfs/IgfsFragmentizerAbstractSelfTest.java | 4 +- .../processors/igfs/IgfsAbstractSelfTest.java | 4 +- .../igfs/IgfsDataManagerSelfTest.java | 55 +- .../igfs/IgfsDualAbstractSelfTest.java | 17 +- .../processors/igfs/IgfsFileInfoSelfTest.java | 31 +- .../igfs/IgfsMetaManagerSelfTest.java | 34 +- .../processors/igfs/IgfsProcessorSelfTest.java | 31 +- .../processors/igfs/IgfsSizeSelfTest.java | 35 +- .../processors/igfs/IgfsStreamsSelfTest.java | 25 +- .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 32 +- .../igfs/HadoopIgfsDualAbstractSelfTest.java | 11 +- .../IgniteHadoopFileSystemLoggerSelfTest.java | 21 +- .../testsuites/IgniteHadoopTestSuite.java | 26 +- 35 files changed, 1321 insertions(+), 994 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4794f87b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java index 125d728..3825086 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java @@ -17,35 +17,6 @@ package org.apache.ignite.internal.processors.igfs; -import java.io.DataInput; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Deque; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import javax.cache.processor.EntryProcessor; -import javax.cache.processor.MutableEntry; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; @@ -87,6 +58,36 @@ import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.MutableEntry; +import java.io.DataInput; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS; @@ -368,7 +369,7 @@ public class IgfsDataManager extends IgfsManager { * @return Requested data block or {@code null} if nothing found. * @throws IgniteCheckedException If failed. */ - @Nullable public IgniteInternalFuture<byte[]> dataBlock(final IgfsFileInfo fileInfo, final IgfsPath path, + @Nullable public IgniteInternalFuture<byte[]> dataBlock(final IgfsEntryInfo fileInfo, final IgfsPath path, final long blockIdx, @Nullable final IgfsSecondaryFileSystemPositionedReadable secReader) throws IgniteCheckedException { //assert validTxState(any); // Allow this method call for any transaction state. @@ -476,7 +477,7 @@ public class IgfsDataManager extends IgfsManager { * @param fileInfo File info of file opened to write. * @return Future that will be completed when all ack messages are received or when write failed. */ - public IgniteInternalFuture<Boolean> writeStart(IgfsFileInfo fileInfo) { + public IgniteInternalFuture<Boolean> writeStart(IgfsEntryInfo fileInfo) { WriteCompletionFuture fut = new WriteCompletionFuture(fileInfo.id()); WriteCompletionFuture oldFut = pendingWrites.putIfAbsent(fileInfo.id(), fut); @@ -495,7 +496,7 @@ public class IgfsDataManager extends IgfsManager { * * @param fileInfo File info being written. */ - public void writeClose(IgfsFileInfo fileInfo) { + public void writeClose(IgfsEntryInfo fileInfo) { WriteCompletionFuture fut = pendingWrites.get(fileInfo.id()); if (fut != null) @@ -524,7 +525,7 @@ public class IgfsDataManager extends IgfsManager { * @throws IgniteCheckedException If failed. */ @Nullable public byte[] storeDataBlocks( - IgfsFileInfo fileInfo, + IgfsEntryInfo fileInfo, long reservedLen, @Nullable byte[] remainder, int remainderLen, @@ -557,7 +558,7 @@ public class IgfsDataManager extends IgfsManager { * @throws IOException If store failed. */ @Nullable public byte[] storeDataBlocks( - IgfsFileInfo fileInfo, + IgfsEntryInfo fileInfo, long reservedLen, @Nullable byte[] remainder, int remainderLen, @@ -579,7 +580,7 @@ public class IgfsDataManager extends IgfsManager { * @param fileInfo File details to remove data for. * @return Delete future that will be completed when file is actually erased. */ - public IgniteInternalFuture<Object> delete(IgfsFileInfo fileInfo) { + public IgniteInternalFuture<Object> delete(IgfsEntryInfo fileInfo) { if (!fileInfo.isFile()) { if (log.isDebugEnabled()) log.debug("Cannot delete content of not-data file: " + fileInfo); @@ -595,7 +596,7 @@ public class IgfsDataManager extends IgfsManager { * @param fileInfo File info. * @return Block key. */ - public IgfsBlockKey blockKey(long blockIdx, IgfsFileInfo fileInfo) { + public IgfsBlockKey blockKey(long blockIdx, IgfsEntryInfo fileInfo) { if (fileInfo.affinityKey() != null) return new IgfsBlockKey(fileInfo.id(), fileInfo.affinityKey(), fileInfo.evictExclude(), blockIdx); @@ -616,7 +617,7 @@ public class IgfsDataManager extends IgfsManager { * @param range Range to clean up. * @param cleanNonColocated {@code True} if all blocks should be cleaned. */ - public void cleanBlocks(IgfsFileInfo fileInfo, IgfsFileAffinityRange range, boolean cleanNonColocated) { + public void cleanBlocks(IgfsEntryInfo fileInfo, IgfsFileAffinityRange range, boolean cleanNonColocated) { long startIdx = range.startOffset() / fileInfo.blockSize(); long endIdx = range.endOffset() / fileInfo.blockSize(); @@ -646,7 +647,7 @@ public class IgfsDataManager extends IgfsManager { * @param fileInfo File info to move data for. * @param range Range to move. */ - public void spreadBlocks(IgfsFileInfo fileInfo, IgfsFileAffinityRange range) { + public void spreadBlocks(IgfsEntryInfo fileInfo, IgfsFileAffinityRange range) { long startIdx = range.startOffset() / fileInfo.blockSize(); long endIdx = range.endOffset() / fileInfo.blockSize(); @@ -721,7 +722,7 @@ public class IgfsDataManager extends IgfsManager { * @return Affinity blocks locations. * @throws IgniteCheckedException If failed. */ - public Collection<IgfsBlockLocation> affinity(IgfsFileInfo info, long start, long len) + public Collection<IgfsBlockLocation> affinity(IgfsEntryInfo info, long start, long len) throws IgniteCheckedException { return affinity(info, start, len, 0); } @@ -736,7 +737,7 @@ public class IgfsDataManager extends IgfsManager { * @return Affinity blocks locations. * @throws IgniteCheckedException If failed. */ - public Collection<IgfsBlockLocation> affinity(IgfsFileInfo info, long start, long len, long maxLen) + public Collection<IgfsBlockLocation> affinity(IgfsEntryInfo info, long start, long len, long maxLen) throws IgniteCheckedException { assert validTxState(false); assert info.isFile() : "Failed to get affinity (not a file): " + info; @@ -845,7 +846,7 @@ public class IgfsDataManager extends IgfsManager { * @param maxLen Maximum allowed split length. * @param res Result collection to add regions to. */ - private void affinity0(IgfsFileInfo info, long start, long len, long maxLen, Deque<IgfsBlockLocation> res) { + private void affinity0(IgfsEntryInfo info, long start, long len, long maxLen, Deque<IgfsBlockLocation> res) { long firstGrpIdx = start / grpBlockSize; long limitGrpIdx = (start + len + grpBlockSize - 1) / grpBlockSize; @@ -1327,7 +1328,7 @@ public class IgfsDataManager extends IgfsManager { */ private IgfsBlockKey createBlockKey( long block, - IgfsFileInfo fileInfo, + IgfsEntryInfo fileInfo, IgfsFileAffinityRange locRange ) { // If affinityKey is present, return block key as is. @@ -1373,7 +1374,7 @@ public class IgfsDataManager extends IgfsManager { * @return Data remainder if {@code flush} flag is {@code false}. */ @Nullable public byte[] storeDataBlocks( - IgfsFileInfo fileInfo, + IgfsEntryInfo fileInfo, long reservedLen, @Nullable byte[] remainder, final int remainderLen, @@ -1632,10 +1633,10 @@ public class IgfsDataManager extends IgfsManager { */ private class AsyncDeleteWorker extends GridWorker { /** File info for stop request. */ - private final IgfsFileInfo stopInfo = new IgfsFileInfo(); + private final IgfsEntryInfo stopInfo; /** Delete requests queue. */ - private BlockingQueue<IgniteBiTuple<GridFutureAdapter<Object>, IgfsFileInfo>> delReqs = + private BlockingQueue<IgniteBiTuple<GridFutureAdapter<Object>, IgfsEntryInfo>> delReqs = new LinkedBlockingQueue<>(); /** @@ -1645,6 +1646,10 @@ public class IgfsDataManager extends IgfsManager { */ protected AsyncDeleteWorker(@Nullable String gridName, String name, IgniteLogger log) { super(gridName, name, log); + + long time = System.currentTimeMillis(); + + stopInfo = IgfsUtils.createDirectory(IgniteUuid.randomUuid()); } /** @@ -1658,7 +1663,7 @@ public class IgfsDataManager extends IgfsManager { * @param info File info to delete. * @return Future which completes when entry is actually removed. */ - private IgniteInternalFuture<Object> deleteAsync(IgfsFileInfo info) { + private IgniteInternalFuture<Object> deleteAsync(IgfsEntryInfo info) { GridFutureAdapter<Object> fut = new GridFutureAdapter<>(); delReqs.offer(F.t(fut, info)); @@ -1670,10 +1675,10 @@ public class IgfsDataManager extends IgfsManager { @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { try { while (!isCancelled()) { - IgniteBiTuple<GridFutureAdapter<Object>, IgfsFileInfo> req = delReqs.take(); + IgniteBiTuple<GridFutureAdapter<Object>, IgfsEntryInfo> req = delReqs.take(); GridFutureAdapter<Object> fut = req.get1(); - IgfsFileInfo fileInfo = req.get2(); + IgfsEntryInfo fileInfo = req.get2(); // Identity check. if (fileInfo == stopInfo) { @@ -1734,7 +1739,7 @@ public class IgfsDataManager extends IgfsManager { if (log.isDebugEnabled()) log.debug("Stopping asynchronous igfs file delete thread: " + name()); - IgniteBiTuple<GridFutureAdapter<Object>, IgfsFileInfo> req = delReqs.poll(); + IgniteBiTuple<GridFutureAdapter<Object>, IgfsEntryInfo> req = delReqs.poll(); while (req != null) { req.get1().onCancelled(); http://git-wip-us.apache.org/repos/asf/ignite/blob/4794f87b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java index ffddd3e..f6b26ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java @@ -17,13 +17,6 @@ package org.apache.ignite.internal.processors.igfs; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; @@ -37,6 +30,14 @@ import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_PURGED; import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS; @@ -162,7 +163,7 @@ public class IgfsDeleteWorker extends IgfsThread { * @param trashId Trash ID. */ private void delete(IgniteUuid trashId) { - IgfsFileInfo info = null; + IgfsEntryInfo info = null; try { info = meta.info(trashId); @@ -220,7 +221,7 @@ public class IgfsDeleteWorker extends IgfsThread { assert id != null; while (true) { - IgfsFileInfo info = meta.info(id); + IgfsEntryInfo info = meta.info(id); if (info != null) { if (info.isDirectory()) { @@ -234,7 +235,7 @@ public class IgfsDeleteWorker extends IgfsThread { assert info.isFile(); // Lock the file with special lock Id to prevent concurrent writing: - IgfsFileInfo lockedInfo = meta.lock(id, true); + IgfsEntryInfo lockedInfo = meta.lock(id, true); if (lockedInfo == null) return false; // File is locked, we cannot delete it. @@ -271,7 +272,7 @@ public class IgfsDeleteWorker extends IgfsThread { assert id != null; while (true) { - IgfsFileInfo info = meta.info(id); + IgfsEntryInfo info = meta.info(id); if (info != null) { assert info.isDirectory(); @@ -298,12 +299,12 @@ public class IgfsDeleteWorker extends IgfsThread { failedFiles++; } else { - IgfsFileInfo fileInfo = meta.info(entry.getValue().fileId()); + IgfsEntryInfo fileInfo = meta.info(entry.getValue().fileId()); if (fileInfo != null) { assert fileInfo.isFile(); - IgfsFileInfo lockedInfo = meta.lock(fileInfo.id(), true); + IgfsEntryInfo lockedInfo = meta.lock(fileInfo.id(), true); if (lockedInfo == null) // File is already locked: http://git-wip-us.apache.org/repos/asf/ignite/blob/4794f87b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDirectoryInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDirectoryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDirectoryInfo.java new file mode 100644 index 0000000..01c8ff9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDirectoryInfo.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Collections; +import java.util.Map; + +/** + * IGFS directory info. + */ +public class IgfsDirectoryInfo extends IgfsEntryInfo { + /** */ + private static final long serialVersionUID = 0L; + + /** Directory listing. */ + @GridToStringInclude + private Map<String, IgfsListingEntry> listing; + + /** + * {@link Externalizable} support. + */ + public IgfsDirectoryInfo() { + // No-op. + } + + /** + * Update length. + * + * @param len New length. + * @return Updated file info. + */ + public IgfsEntryInfo length(long len) { + throw new UnsupportedOperationException("length"); + } + + /** {@inheritDoc} */ + @Override public IgfsDirectoryInfo listing(@Nullable Map<String, IgfsListingEntry> listing) { + IgfsDirectoryInfo res = copy(); + + res.listing = listing; + + return res; + } + + /** {@inheritDoc} */ + @Override public IgfsEntryInfo lock(IgniteUuid lockId) { + throw new UnsupportedOperationException("lock"); + } + + /** {@inheritDoc} */ + @Override public IgfsEntryInfo unlock(long modificationTime) { + throw new UnsupportedOperationException("unlock"); + } + + /** {@inheritDoc} */ + @Override public IgfsEntryInfo fileMap(IgfsFileMap fileMap) { + throw new UnsupportedOperationException("fileMap"); + } + + /** + * Constructs file info. + * + * @param id ID or {@code null} to generate it automatically. + * @param listing Directory listing. + * @param props File properties. + * @param accessTime Last access time. + * @param modificationTime Last modification time. + */ + IgfsDirectoryInfo(IgniteUuid id, @Nullable Map<String, IgfsListingEntry> listing, + @Nullable Map<String, String> props, long accessTime, long modificationTime) { + super(id, props, accessTime, modificationTime); + + this.listing = listing; + } + + /** {@inheritDoc} */ + protected IgfsDirectoryInfo copy() { + return new IgfsDirectoryInfo(id, listing, props, accessTime, modificationTime); + } + + /** {@inheritDoc} */ + public boolean isFile() { + return false; + } + + /** {@inheritDoc} */ + public long length() { + return 0; + } + + /** {@inheritDoc} */ + public int blockSize() { + return 0; + } + + /** {@inheritDoc} */ + public long blocksCount() { + return 0; + } + + /** {@inheritDoc} */ + public Map<String, IgfsListingEntry> listing() { + return listing != null ? listing : Collections.<String, IgfsListingEntry>emptyMap(); + } + + /** {@inheritDoc} */ + public boolean hasChildren() { + return !F.isEmpty(listing); + } + + /** {@inheritDoc} */ + public boolean hasChild(String name) { + return listing != null && listing.containsKey(name); + } + + /** {@inheritDoc} */ + public boolean hasChild(String name, IgniteUuid expId) { + if (listing != null) { + IgfsListingEntry entry = listing.get(name); + + if (entry != null) + return F.eq(expId, entry.fileId()); + } + + return false; + } + + /** {@inheritDoc} */ + @Nullable public IgniteUuid affinityKey() { + return null; + } + + /** {@inheritDoc} */ + public IgfsFileMap fileMap() { + return null; + } + + /** {@inheritDoc} */ + @Nullable public IgniteUuid lockId() { + return null; + } + + /** {@inheritDoc} */ + public boolean evictExclude() { + return true; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + out.writeObject(listing); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + listing = (Map<String, IgfsListingEntry>)in.readObject(); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id.hashCode() ^ (props == null ? 0 : props.hashCode()); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (obj == this) + return true; + + if (obj == null || getClass() != obj.getClass()) + return false; + + IgfsDirectoryInfo that = (IgfsDirectoryInfo)obj; + + return id.equals(that.id) && F.eq(props, that.props); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsDirectoryInfo.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4794f87b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEntryInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEntryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEntryInfo.java new file mode 100644 index 0000000..c5b1111 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEntryInfo.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Collections; +import java.util.Map; + +/** + * Base IGFS entry. + */ +public abstract class IgfsEntryInfo implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** ID. */ + protected IgniteUuid id; + + /** Properties. */ + protected Map<String, String> props; + + /** Last access time. */ + protected long accessTime; + + /** Last modification time. */ + protected long modificationTime; + + /** Original file path. This is a helper field used only during real file delete. */ + protected IgfsPath path; + + /** + * Default constructor. + */ + protected IgfsEntryInfo() { + // No-op. + } + + /** + * Constructor. + * + * @param id ID. + * @param props Properties. + * @param accessTime Access time. + * @param modificationTime Modification time. + */ + protected IgfsEntryInfo(IgniteUuid id, @Nullable Map<String, String> props, long accessTime, + long modificationTime) { + assert id != null; + + this.id = id; + this.props = props == null || props.isEmpty() ? null : props; + this.accessTime = accessTime; + this.modificationTime = modificationTime; + } + + /** + * Gets this item ID. + * + * @return This item ID. + */ + public IgniteUuid id() { + return id; + } + + /** + * Get properties of the file. + * + * @return Properties of the file. + */ + public Map<String, String> properties() { + return props == null ? Collections.<String, String>emptyMap() : Collections.unmodifiableMap(props); + } + + /** + * @return Last access time. + */ + public long accessTime() { + return accessTime; + } + + /** + * @return Last modification time. + */ + public long modificationTime() { + return modificationTime; + } + + /** + * @return Original file path. This is a helper field used only in some operations like delete. + */ + public IgfsPath path() { + return path; + } + + /** + * @return {@code True} if this is a file. + */ + public abstract boolean isFile(); + + /** + * Update length. + * + * @param len New length. + * @return Updated file info. + */ + public abstract IgfsEntryInfo length(long len); + + /** + * Update listing. + * + * @param listing Listing. + * @return Updated file info. + */ + public abstract IgfsEntryInfo listing(@Nullable Map<String, IgfsListingEntry> listing); + + /** + * Update properties. + * + * @param props Properties. + * @return Updated file info. + */ + public IgfsEntryInfo properties(@Nullable Map<String, String> props) { + IgfsEntryInfo res = copy(); + + res.props = props; + + return res; + } + + /** + * Update path. + * + * @param path Path. + * @return Updated file info. + */ + public IgfsEntryInfo path(IgfsPath path) { + IgfsEntryInfo res = copy(); + + res.path = path; + + return res; + } + + /** + * Update access and modification time. + * + * @param accessTime Access time. + * @param modificationTime Modification time. + * @return Updated file info. + */ + public IgfsEntryInfo accessModificationTime(long accessTime, long modificationTime) { + IgfsEntryInfo res = copy(); + + res.accessTime = accessTime; + res.modificationTime = modificationTime; + + return res; + } + + /** + * Lock file. + * + * @param lockId Lock ID. + * @return Update file info. + */ + public abstract IgfsEntryInfo lock(IgniteUuid lockId); + + /** + * Unlock file. + * + * @param modificationTime Modification time. + * @return Updated file info. + */ + public abstract IgfsEntryInfo unlock(long modificationTime); + + /** + * Update file map. + * + * @param fileMap File affinity map. + * @return Updated file info. + */ + public abstract IgfsEntryInfo fileMap(IgfsFileMap fileMap); + + /** + * Copy file info. + * + * @return Copy. + */ + protected abstract IgfsEntryInfo copy(); + + /** + * @return {@code True} if this is a directory. + */ + public boolean isDirectory() { + return !isFile(); + } + + /** + * Get file size. + * + * @return File size. + */ + public abstract long length(); + + /** + * Get single data block size to store this file. + * + * @return Single data block size to store this file. + */ + public abstract int blockSize(); + + /** + * @return Number of data blocks to store this file. + */ + public abstract long blocksCount(); + + /** + * @return Directory listing. + */ + public abstract Map<String, IgfsListingEntry> listing(); + + /** + * @return {@code True} if at least one child exists. + */ + public abstract boolean hasChildren(); + + /** + * @param name Child name. + * @return {@code True} if child with such name exists. + */ + public abstract boolean hasChild(String name); + + /** + * @param name Child name. + * @param expId Expected child ID. + * @return {@code True} if child with such name exists. + */ + public abstract boolean hasChild(String name, IgniteUuid expId); + + /** + * @return Affinity key used for single-node file collocation. If {@code null}, usual + * mapper procedure is used for block affinity detection. + */ + @Nullable public abstract IgniteUuid affinityKey(); + + /** + * @return File affinity map. + */ + public abstract IgfsFileMap fileMap(); + + /** + * Get lock ID. + * + * @return Lock ID if file is locked or {@code null} if file is free of locks. + */ + @Nullable public abstract IgniteUuid lockId(); + + /** + * Get evict exclude flag. + * + * @return Evict exclude flag. + */ + public abstract boolean evictExclude(); + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeGridUuid(out, id); + U.writeStringMap(out, props); + out.writeLong(accessTime); + out.writeLong(modificationTime); + out.writeObject(path); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + id = U.readGridUuid(in); + props = U.readStringMap(in); + accessTime = in.readLong(); + modificationTime = in.readLong(); + path = (IgfsPath)in.readObject(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4794f87b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileImpl.java index 3576a06..be8d0fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileImpl.java @@ -17,12 +17,6 @@ package org.apache.ignite.internal.processors.igfs; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Collections; -import java.util.Map; import org.apache.ignite.igfs.IgfsFile; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.internal.util.typedef.internal.A; @@ -31,6 +25,13 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Collections; +import java.util.Map; + /** * File or directory information. */ @@ -97,7 +98,7 @@ public final class IgfsFileImpl implements IgfsFile, Externalizable { * * @param path Path. */ - public IgfsFileImpl(IgfsPath path, IgfsFileInfo info, long globalGrpBlockSize) { + public IgfsFileImpl(IgfsPath path, IgfsEntryInfo info, long globalGrpBlockSize) { A.notNull(path, "path"); A.notNull(info, "info"); http://git-wip-us.apache.org/repos/asf/ignite/blob/4794f87b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileInfo.java index 13c54ff..30f505e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileInfo.java @@ -17,44 +17,32 @@ package org.apache.ignite.internal.processors.igfs; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.Collections; import java.util.Map; -import org.apache.ignite.configuration.FileSystemConfiguration; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.internal.util.GridLeanMap; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteUuid; -import org.jetbrains.annotations.Nullable; /** - * Unmodifiable file information. + * IGFS file info. */ -public final class IgfsFileInfo implements Externalizable { +public final class IgfsFileInfo extends IgfsEntryInfo { /** */ private static final long serialVersionUID = 0L; - /** Special access time value, indicating that the modification time value should be taken. */ - private static final long ACCESS_TIME_TAKE_MODIFICATION_TIME = -1L; - - /** Info ID. */ - private IgniteUuid id; - /** File length in bytes. */ private long len; /** File block size, {@code zero} for directories. */ private int blockSize; - /** File properties. */ - private Map<String, String> props; - /** File lock ID. */ private IgniteUuid lockId; @@ -64,491 +52,181 @@ public final class IgfsFileInfo implements Externalizable { /** File affinity map. */ private IgfsFileMap fileMap; - /** Last access time. Modified on-demand. */ - private long accessTime; - - /** Last modification time. */ - private long modificationTime; - - /** Directory listing. */ - @GridToStringInclude - private Map<String, IgfsListingEntry> listing; - /** Whether data blocks of this entry should never be excluded. */ private boolean evictExclude; /** - * Original file path. This is a helper field used only in some - * operations like delete. - */ - private IgfsPath path; - - /** * {@link Externalizable} support. */ public IgfsFileInfo() { - this(IgfsUtils.ROOT_ID); + // No-op. } - /** - * Constructs directory file info with the given ID. - * - * @param id ID. - */ - IgfsFileInfo(IgniteUuid id) { - this(true, id, 0, 0, null, null, null, null, false, ACCESS_TIME_TAKE_MODIFICATION_TIME, - System.currentTimeMillis(), false); - } + /** {@inheritDoc} */ + @Override public IgfsFileInfo length(long len) { + IgfsFileInfo res = copy(); - /** - * Constructs directory or file info with - * {@link org.apache.ignite.configuration.FileSystemConfiguration#DFLT_BLOCK_SIZE default} block size. - * - * @param isDir Constructs directory info if {@code true} or file info if {@code false}. - * @param props Meta properties to set. - * @param accessTime The access time. - * @param modificationTime The modification time. - */ - public IgfsFileInfo(boolean isDir, @Nullable Map<String, String> props, long accessTime, long modificationTime) { - this(isDir, null, isDir ? 0 : FileSystemConfiguration.DFLT_BLOCK_SIZE, 0, null, null, props, null, false, - accessTime, modificationTime, false); - } + res.len = len; - /** - * Consturcts directory with random ID and provided listing. - * - * @param listing Listing. - */ - IgfsFileInfo(Map<String, IgfsListingEntry> listing) { - this(listing, (Map<String,String>)null); + return res; } - /** - * Consturcts directory with random ID, provided listing and properties. - * - * @param listing Listing. - * @param props The properties to set for the new directory. - */ - IgfsFileInfo(@Nullable Map<String, IgfsListingEntry> listing, @Nullable Map<String,String> props) { - this(true/*dir*/, null, 0, 0, null, listing, props, null, false, ACCESS_TIME_TAKE_MODIFICATION_TIME, - System.currentTimeMillis(), false); + /** {@inheritDoc} */ + @Override public IgfsEntryInfo listing(@Nullable Map<String, IgfsListingEntry> listing) { + throw new UnsupportedOperationException("listing"); } - /** - * Constructs file info. - * - * @param blockSize Block size. - * @param len Length. - * @param affKey Affinity key. - * @param lockId Lock ID. - * @param props Properties. - * @param evictExclude Evict exclude flag. - * @param accessTime The access time. - * @param modificationTime The modification time. - */ - public IgfsFileInfo(int blockSize, long len, @Nullable IgniteUuid affKey, @Nullable IgniteUuid lockId, - boolean evictExclude, @Nullable Map<String, String> props, long accessTime, long modificationTime) { - this(false, null, blockSize, len, affKey, null, props, lockId, true, accessTime, modificationTime, - evictExclude); - } + /** {@inheritDoc} */ + @Override public IgfsFileInfo lock(IgniteUuid lockId) { + assert lockId != null; + assert this.lockId == null; - /** - * Constructs file information. - * - * @param info File information to copy data from. - * @param len Size of a file. - */ - IgfsFileInfo(IgfsFileInfo info, long len) { - this(info.isDirectory(), info.id, info.blockSize, len, info.affKey, info.listing, info.props, info.fileMap(), - info.lockId, true, info.accessTime, info.modificationTime, info.evictExclude()); - } + IgfsFileInfo res = copy(); - /** - * Constructs file info. - * - * @param info File info. - * @param accessTime Last access time. - * @param modificationTime Last modification time. - */ - IgfsFileInfo(IgfsFileInfo info, long accessTime, long modificationTime) { - this(info.isDirectory(), info.id, info.blockSize, info.len, info.affKey, info.listing, info.props, - info.fileMap(), info.lockId, false, accessTime, modificationTime, info.evictExclude()); - } + res.lockId = lockId; - /** - * Constructs file information. - * - * @param info File information to copy data from. - * @param props File properties to set. - */ - IgfsFileInfo(IgfsFileInfo info, @Nullable Map<String, String> props) { - this(info.isDirectory(), info.id, info.blockSize, info.len, info.affKey, info.listing, props, - info.fileMap(), info.lockId, true, info.accessTime, info.modificationTime, info.evictExclude()); + return res; } - /** - * Constructs file info. - * - * @param blockSize Block size, - * @param len Size of a file. - * @param props File properties to set. - * @param evictExclude Evict exclude flag. - * @param accessTime The access time. - * @param modificationTime The modification time. - */ - IgfsFileInfo(int blockSize, long len, boolean evictExclude, @Nullable Map<String, String> props, - long accessTime, long modificationTime) { - this(blockSize == 0, // NB The contract is: (blockSize == 0) <=> isDirectory() - null, blockSize, len, null, null, props, null, true, accessTime, modificationTime, evictExclude); - } + /** {@inheritDoc} */ + @Override public IgfsFileInfo unlock(long modificationTime) { + IgfsFileInfo res = copy(); - /** - * Constructs file information. - * - * @param info File information to copy data from. - * @param lockId Lock ID. - * @param modificationTime Last modification time. - */ - IgfsFileInfo(IgfsFileInfo info, @Nullable IgniteUuid lockId, long modificationTime) { - this(info.isDirectory(), info.id, info.blockSize, info.len, info.affKey, info.listing, info.props, - info.fileMap(), lockId, true, info.accessTime, modificationTime, info.evictExclude()); + res.lockId = null; + res.modificationTime = modificationTime; + + return res; } - /** - * Constructs file info. - * - * @param listing New directory listing. - * @param old Old file info. - */ - IgfsFileInfo(@Nullable Map<String, IgfsListingEntry> listing, IgfsFileInfo old) { - this(old.isDirectory(), old.id, old.blockSize, old.len, old.affKey, listing, old.props, old.fileMap(), - old.lockId, false, old.accessTime, old.modificationTime, old.evictExclude()); + /** {@inheritDoc} */ + @Override public IgfsFileInfo fileMap(IgfsFileMap fileMap) { + IgfsFileInfo res = copy(); + + res.fileMap = fileMap; + + return res; } - /** - * Constructs file info. - * - * @param isDir Constructs directory info if {@code true} or file info if {@code false}. - * @param id ID or {@code null} to generate it automatically. - * @param blockSize Block size. - * @param len Size of a file. - * @param affKey Affinity key for data blocks. - * @param listing Directory listing. - * @param props File properties. - * @param lockId Lock ID. - * @param cpProps Flag to copy properties map. - * @param accessTime The access time. - * @param modificationTime Last modification time. - * @param evictExclude Evict exclude flag. - */ - private IgfsFileInfo(boolean isDir, @Nullable IgniteUuid id, int blockSize, long len, @Nullable IgniteUuid affKey, - @Nullable Map<String, IgfsListingEntry> listing, @Nullable Map<String, String> props, - @Nullable IgniteUuid lockId, boolean cpProps, long accessTime, long modificationTime, boolean evictExclude) { - this(isDir, id, blockSize, len, affKey, listing, props, null, lockId, cpProps, accessTime, - modificationTime, evictExclude); + /** {@inheritDoc} */ + @Override protected IgfsFileInfo copy() { + return new IgfsFileInfo(id, blockSize, len, affKey, props, fileMap, lockId, accessTime, modificationTime, + evictExclude); } /** * Constructs file info. * - * @param isDir Constructs directory info if {@code true} or file info if {@code false}. * @param id ID or {@code null} to generate it automatically. * @param blockSize Block size. * @param len Size of a file. * @param affKey Affinity key for data blocks. - * @param listing Directory listing. * @param props File properties. * @param fileMap File map. * @param lockId Lock ID. - * @param cpProps Flag to copy properties map. * @param accessTime Last access time. * @param modificationTime Last modification time. * @param evictExclude Evict exclude flag. */ - private IgfsFileInfo(boolean isDir, @Nullable IgniteUuid id, int blockSize, long len, @Nullable IgniteUuid affKey, - @Nullable Map<String, IgfsListingEntry> listing, @Nullable Map<String, String> props, - @Nullable IgfsFileMap fileMap, @Nullable IgniteUuid lockId, boolean cpProps, long accessTime, - long modificationTime, boolean evictExclude) { - assert F.isEmpty(listing) || isDir; - - if (isDir) { - assert len == 0 : "Directory length should be zero: " + len; - assert blockSize == 0 : "Directory block size should be zero: " + blockSize; - } - else { - assert len >= 0 : "File length cannot be negative: " + len; - assert blockSize > 0 : "File block size should be positive: " + blockSize; - } - - this.id = id == null ? IgniteUuid.randomUuid() : id; - this.len = isDir ? 0 : len; - this.blockSize = isDir ? 0 : blockSize; + IgfsFileInfo(IgniteUuid id, int blockSize, long len, @Nullable IgniteUuid affKey, + @Nullable Map<String, String> props, @Nullable IgfsFileMap fileMap, @Nullable IgniteUuid lockId, + long accessTime, long modificationTime, boolean evictExclude) { + super(id, props, accessTime, modificationTime); + + this.len = len; + this.blockSize = blockSize; this.affKey = affKey; - this.listing = listing; - if (fileMap == null && !isDir) + if (fileMap == null) fileMap = new IgfsFileMap(); this.fileMap = fileMap; - this.accessTime = accessTime == ACCESS_TIME_TAKE_MODIFICATION_TIME ? modificationTime : accessTime; - this.modificationTime = modificationTime; - - // Always make a copy of passed properties collection to escape concurrent modifications. - this.props = props == null || props.isEmpty() ? null : - cpProps ? new GridLeanMap<>(props) : props; - this.lockId = lockId; this.evictExclude = evictExclude; } - /** - * A copy constructor, which takes all data from the specified - * object field-by-field. - * - * @param info An object to copy data info. - */ - public IgfsFileInfo(IgfsFileInfo info) { - this(info.isDirectory(), info.id, info.blockSize, info.len, info.affKey, info.listing, info.props, - info.fileMap(), info.lockId, true, info.accessTime, info.modificationTime, info.evictExclude()); - } - - /** - * Creates a builder for the new instance of file info. - * - * @return A builder to construct a new unmodifiable instance - * of this class. - */ - public static Builder builder() { - return new Builder(new IgfsFileInfo()); - } - - /** - * Creates a builder for the new instance of file info, - * based on the specified origin. - * - * @param origin An origin for new instance, from which - * the data will be copied. - * @return A builder to construct a new unmodifiable instance - * of this class. - */ - public static Builder builder(IgfsFileInfo origin) { - return new Builder(new IgfsFileInfo(origin)); - } - - /** - * Gets this item ID. - * - * @return This item ID. - */ - public IgniteUuid id() { - return id; - } - - /** - * Temporal hack to change ID before saving entry to cache. Currently we have too much constructors and adding - * more will make things even worse. Instead, we use this method until directories and files are split into - * separate entities. - * - * @param id ID. - * @deprecated Use only on not-yet-saved entries. - */ - @Deprecated - public void id(IgniteUuid id) { - this.id = id; - } - - /** - * @return {@code True} if this is a file. - */ + /** {@inheritDoc} */ public boolean isFile() { - return blockSize > 0; - } - - /** - * @return {@code True} if this is a directory. - */ - public boolean isDirectory() { - return blockSize == 0; + return true; } - /** - * Get file size. - * - * @return File size. - */ + /** {@inheritDoc} */ public long length() { - assert isFile(); - return len; } - /** - * Get single data block size to store this file. - * - * @return Single data block size to store this file. - */ + /** {@inheritDoc} */ public int blockSize() { - assert isFile(); - return blockSize; } - /** - * @return Number of data blocks to store this file. - */ + /** {@inheritDoc} */ public long blocksCount() { - assert isFile(); - return (len + blockSize() - 1) / blockSize(); } - /** - * @return Last access time. - */ - public long accessTime() { - return accessTime; - } - - /** - * @return Last modification time. - */ - public long modificationTime() { - return modificationTime; - } - - /** - * @return Directory listing. - */ + /** {@inheritDoc} */ public Map<String, IgfsListingEntry> listing() { - return listing != null ? listing : Collections.<String, IgfsListingEntry>emptyMap(); + return Collections.emptyMap(); } - /** - * @return {@code True} if at least one child exists. - */ + /** {@inheritDoc} */ public boolean hasChildren() { - return !F.isEmpty(listing); + return false; } - /** - * @param name Child name. - * @return {@code True} if child with such name exists. - */ + /** {@inheritDoc} */ public boolean hasChild(String name) { - return listing != null && listing.containsKey(name); + return false; } - /** - * @param name Child name. - * @param expId Expected child ID. - * @return {@code True} if child with such name exists. - */ + /** {@inheritDoc} */ public boolean hasChild(String name, IgniteUuid expId) { - if (listing != null) { - IgfsListingEntry entry = listing.get(name); - - if (entry != null) - return F.eq(expId, entry.fileId()); - } - return false; } - /** - * @return Affinity key used for single-node file collocation. If {@code null}, usual - * mapper procedure is used for block affinity detection. - */ + /** {@inheritDoc} */ @Nullable public IgniteUuid affinityKey() { return affKey; } - /** - * @param affKey Affinity key used for single-node file collocation. - */ - public void affinityKey(IgniteUuid affKey) { - this.affKey = affKey; - } - - /** - * @return File affinity map. - */ + /** {@inheritDoc} */ public IgfsFileMap fileMap() { return fileMap; } - /** - * @param fileMap File affinity map. - */ - public void fileMap(IgfsFileMap fileMap) { - this.fileMap = fileMap; - } - - /** - * Get properties of the file. - * - * @return Properties of the file. - */ - public Map<String, String> properties() { - return props == null || props.isEmpty() ? Collections.<String, String>emptyMap() : - Collections.unmodifiableMap(props); - } - - /** - * Get lock ID. - * - * @return Lock ID if file is locked or {@code null} if file is free of locks. - */ + /** {@inheritDoc} */ @Nullable public IgniteUuid lockId() { return lockId; } - /** - * Get evict exclude flag. - * - * @return Evict exclude flag. - */ + /** {@inheritDoc} */ public boolean evictExclude() { return evictExclude; } - /** - * @return Original file path. This is a helper field used only in some operations like delete. - */ - public IgfsPath path() { - return path; - } - /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeGridUuid(out, id); + super.writeExternal(out); + out.writeInt(blockSize); out.writeLong(len); - U.writeStringMap(out, props); U.writeGridUuid(out, lockId); U.writeGridUuid(out, affKey); - out.writeObject(listing); out.writeObject(fileMap); - out.writeLong(accessTime); - out.writeLong(modificationTime); out.writeBoolean(evictExclude); - out.writeObject(path); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - id = U.readGridUuid(in); + super.readExternal(in); + blockSize = in.readInt(); len = in.readLong(); - props = U.readStringMap(in); lockId = U.readGridUuid(in); affKey = U.readGridUuid(in); - listing = (Map<String, IgfsListingEntry>)in.readObject(); fileMap = (IgfsFileMap)in.readObject(); - accessTime = in.readLong(); - modificationTime = in.readLong(); evictExclude = in.readBoolean(); - path = (IgfsPath)in.readObject(); } /** {@inheritDoc} */ @@ -575,42 +253,4 @@ public final class IgfsFileInfo implements Externalizable { @Override public String toString() { return S.toString(IgfsFileInfo.class, this); } - - /** - * Builder for {@link IgfsFileInfo}. - */ - @SuppressWarnings("PublicInnerClass") - public static class Builder { - /** Instance to build. */ - private final IgfsFileInfo info; - - /** - * Private constructor. - * - * @param info Instance to build. - */ - private Builder(IgfsFileInfo info) { - this.info = info; - } - - /** - * @param path A new path value. - * @return This builder instance (for chaining). - */ - public Builder path(IgfsPath path) { - info.path = path; - - return this; - } - - /** - * Finishes instance construction and returns a resulting - * unmodifiable instance. - * - * @return A constructed instance. - */ - public IgfsFileInfo build() { - return info; - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/4794f87b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java index 130846b..17875a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatch.java @@ -17,22 +17,23 @@ package org.apache.ignite.internal.processors.igfs; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.U; + import java.io.IOException; import java.io.OutputStream; import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.typedef.internal.U; /** * Work batch is an abstraction of the logically grouped tasks. */ public abstract class IgfsFileWorkerBatch implements Runnable { /** Stop marker. */ - private static final byte[] STOP_MARKER = new byte[0]; + private static final byte[] FINISH_MARKER = new byte[0]; /** Cancel marker. */ private static final byte[] CANCEL_MARKER = new byte[0]; @@ -49,9 +50,6 @@ public abstract class IgfsFileWorkerBatch implements Runnable { /** Output stream to the file. */ private final OutputStream out; - /** Cancel flag. */ - private volatile boolean cancelled; - /** Finishing flag. */ private volatile boolean finishing; @@ -76,45 +74,55 @@ public abstract class IgfsFileWorkerBatch implements Runnable { * @return {@code True} in case write was enqueued. */ synchronized boolean write(final byte[] data) { - if (!finishing) { - queue.add(data); - - return true; - } - else - return false; + return offer(data, false, false); } /** * Add the last task to that batch which will release all the resources. + * + * @return {@code True} if finish was signalled. */ - synchronized void finish() { - if (!finishing) { - finishing = true; - - queue.add(STOP_MARKER); - } + synchronized boolean finish() { + return offer(FINISH_MARKER, false, true); } /** * Cancel batch processing. + * + * @return {@code True} if cancel was signalled. */ - synchronized void cancel() { - queue.addFirst(CANCEL_MARKER); + synchronized boolean cancel() { + return offer(CANCEL_MARKER, true, true); } /** - * @return {@code True} if finish was called on this batch. + * Add request to queue. + * + * @param data Data. + * @param head Whether to add to head. + * @param finish Whether this is the last batch to be accepted. + * @return {@code True} if request was added to queue. */ - boolean finishing() { - return finishing; + private synchronized boolean offer(byte[] data, boolean head, boolean finish) { + if (finishing) + return false; + + if (head) + queue.addFirst(data); + else + queue.addLast(data); + + if (finish) + finishing = true; + + return true; } /** - * @return {@code True} if batch write was terminated abruptly due to explicit cancellation. + * @return {@code True} if finish was called on this batch. */ - boolean cancelled() { - return cancelled; + boolean finishing() { + return finishing; } /** @@ -129,16 +137,13 @@ public abstract class IgfsFileWorkerBatch implements Runnable { try { byte[] data = queue.poll(1000, TimeUnit.MILLISECONDS); - if (data == STOP_MARKER) { + if (data == FINISH_MARKER) { assert queue.isEmpty(); break; } - else if (data == CANCEL_MARKER) { - cancelled = true; - - throw new IgniteCheckedException("Write to file was cancelled due to node stop."); - } + else if (data == CANCEL_MARKER) + throw new IgfsFileWorkerBatchCancelledException(path); else if (data != null) { try { out.write(data); http://git-wip-us.apache.org/repos/asf/ignite/blob/4794f87b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatchCancelledException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatchCancelledException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatchCancelledException.java new file mode 100644 index 0000000..4ae6964 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileWorkerBatchCancelledException.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.igfs.IgfsPath; + +/** + * Exception indicating that file batch processing was cancelled. + */ +public class IgfsFileWorkerBatchCancelledException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** Path. */ + private IgfsPath path; + + /** + * Default constructor. + */ + public IgfsFileWorkerBatchCancelledException() { + // No-op. + } + + public IgfsFileWorkerBatchCancelledException(IgfsPath path) { + this.path = path; + } + + /** {@inheritDoc} */ + @Override public String getMessage() { + if (path == null) + return "Asynchronous file processing was cancelled due to node stop."; + else + return "Asynchronous file processing was cancelled due to node stop: " + path; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4794f87b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java index 7cc5cb6..194a8ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java @@ -17,23 +17,6 @@ package org.apache.ignite.internal.processors.igfs; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Collection; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; @@ -60,6 +43,23 @@ import org.jetbrains.annotations.Nullable; import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorException; import javax.cache.processor.MutableEntry; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; @@ -274,7 +274,7 @@ public class IgfsFragmentizerManager extends IgfsManager { Collection<IgfsFileAffinityRange> ranges = req.fragmentRanges(); IgniteUuid fileId = req.fileId(); - IgfsFileInfo fileInfo = igfsCtx.meta().info(fileId); + IgfsEntryInfo fileInfo = igfsCtx.meta().info(fileId); if (fileInfo == null) { if (log.isDebugEnabled()) @@ -288,7 +288,7 @@ public class IgfsFragmentizerManager extends IgfsManager { for (IgfsFileAffinityRange range : ranges) { try { - IgfsFileInfo updated; + IgfsEntryInfo updated; switch (range.status()) { case RANGE_STATUS_INITIAL: { @@ -345,7 +345,7 @@ public class IgfsFragmentizerManager extends IgfsManager { /** * Update range processor. */ - private static class RangeUpdateProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, IgfsFileInfo>, + private static class RangeUpdateProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, IgfsEntryInfo>, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -375,17 +375,15 @@ public class IgfsFragmentizerManager extends IgfsManager { } /** {@inheritDoc} */ - @Override public IgfsFileInfo process(MutableEntry<IgniteUuid, IgfsFileInfo> entry, Object... args) + @Override public IgfsEntryInfo process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args) throws EntryProcessorException { - IgfsFileInfo oldInfo = entry.getValue(); + IgfsEntryInfo oldInfo = entry.getValue(); IgfsFileMap newMap = new IgfsFileMap(oldInfo.fileMap()); newMap.updateRangeStatus(range, status); - IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, oldInfo.length()); - - newInfo.fileMap(newMap); + IgfsEntryInfo newInfo = oldInfo.fileMap(newMap); entry.setValue(newInfo); @@ -413,7 +411,7 @@ public class IgfsFragmentizerManager extends IgfsManager { /** * Delete range processor. */ - private static class RangeDeleteProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, IgfsFileInfo>, + private static class RangeDeleteProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, IgfsEntryInfo>, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -438,17 +436,15 @@ public class IgfsFragmentizerManager extends IgfsManager { } /** {@inheritDoc} */ - @Override public IgfsFileInfo process(MutableEntry<IgniteUuid, IgfsFileInfo> entry, Object... args) + @Override public IgfsEntryInfo process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args) throws EntryProcessorException { - IgfsFileInfo oldInfo = entry.getValue(); + IgfsEntryInfo oldInfo = entry.getValue(); IgfsFileMap newMap = new IgfsFileMap(oldInfo.fileMap()); newMap.deleteRange(range); - IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, oldInfo.length()); - - newInfo.fileMap(newMap); + IgfsEntryInfo newInfo = oldInfo.fileMap(newMap); entry.setValue(newInfo); @@ -507,7 +503,7 @@ public class IgfsFragmentizerManager extends IgfsManager { // If we have room for files, add them to fragmentizer. try { while (fragmentingFiles.size() < igfsCtx.configuration().getFragmentizerConcurrentFiles()) { - IgfsFileInfo fileInfo = fileForFragmentizer(fragmentingFiles.keySet()); + IgfsEntryInfo fileInfo = fileForFragmentizer(fragmentingFiles.keySet()); // If no colocated files found, exit loop. if (fileInfo == null) @@ -715,7 +711,7 @@ public class IgfsFragmentizerManager extends IgfsManager { * * @param fileInfo File info to process. */ - private void requestFragmenting(IgfsFileInfo fileInfo) { + private void requestFragmenting(IgfsEntryInfo fileInfo) { IgfsFileMap map = fileInfo.fileMap(); assert map != null && !map.ranges().isEmpty(); @@ -789,7 +785,7 @@ public class IgfsFragmentizerManager extends IgfsManager { * @return File ID to process or {@code null} if there are no such files. * @throws IgniteCheckedException In case of error. */ - @Nullable private IgfsFileInfo fileForFragmentizer(Collection<IgniteUuid> exclude) throws IgniteCheckedException { + @Nullable private IgfsEntryInfo fileForFragmentizer(Collection<IgniteUuid> exclude) throws IgniteCheckedException { return fragmentizerEnabled ? igfsCtx.meta().fileForFragmentizer(exclude) : null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/4794f87b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index 9ec583c..398428a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -622,7 +622,7 @@ public final class IgfsImpl implements IgfsEx { await(path); - IgfsFileInfo info = meta.updateDual(secondaryFs, path, props); + IgfsEntryInfo info = meta.updateDual(secondaryFs, path, props); if (info == null) return null; @@ -637,7 +637,7 @@ public final class IgfsImpl implements IgfsEx { if (fileId == null) return null; - IgfsFileInfo info = meta.updateProperties(fileId, props); + IgfsEntryInfo info = meta.updateProperties(fileId, props); if (info != null) { if (evts.isRecordable(EVT_IGFS_META_UPDATED)) @@ -691,7 +691,7 @@ public final class IgfsImpl implements IgfsEx { return null; } - IgfsFileInfo info = meta.move(src, dest); + IgfsEntryInfo info = meta.move(src, dest); int evtTyp = info.isFile() ? EVT_IGFS_FILE_RENAMED : EVT_IGFS_DIR_RENAMED; @@ -858,7 +858,7 @@ public final class IgfsImpl implements IgfsEx { IgniteUuid fileId = meta.fileId(path); if (fileId != null) { - IgfsFileInfo info = meta.info(fileId); + IgfsEntryInfo info = meta.info(fileId); // Handle concurrent deletion. if (info != null) { @@ -869,7 +869,7 @@ public final class IgfsImpl implements IgfsEx { // Perform the listing. for (Map.Entry<String, IgfsListingEntry> e : info.listing().entrySet()) { - IgfsFileInfo childInfo = meta.info(e.getValue().fileId()); + IgfsEntryInfo childInfo = meta.info(e.getValue().fileId()); if (childInfo != null) { IgfsPath childPath = new IgfsPath(path, e.getKey()); @@ -933,7 +933,7 @@ public final class IgfsImpl implements IgfsEx { return os; } - IgfsFileInfo info = meta.info(meta.fileId(path)); + IgfsEntryInfo info = meta.info(meta.fileId(path)); if (info == null) { checkConflictWithPrimary(path); @@ -1034,7 +1034,7 @@ public final class IgfsImpl implements IgfsEx { else dirProps = fileProps = new HashMap<>(props); - IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = meta.create( + IgniteBiTuple<IgfsEntryInfo, IgniteUuid> t2 = meta.create( path, dirProps, overwrite, @@ -1112,7 +1112,7 @@ public final class IgfsImpl implements IgfsEx { else dirProps = fileProps = new HashMap<>(props); - IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = meta.append( + IgniteBiTuple<IgfsEntryInfo, IgniteUuid> t2 = meta.append( path, dirProps, create, @@ -1194,7 +1194,7 @@ public final class IgfsImpl implements IgfsEx { // Check memory first. IgniteUuid fileId = meta.fileId(path); - IgfsFileInfo info = meta.info(fileId); + IgfsEntryInfo info = meta.info(fileId); if (info == null && mode != PRIMARY) { assert mode == DUAL_SYNC || mode == DUAL_ASYNC; @@ -1291,7 +1291,7 @@ public final class IgfsImpl implements IgfsEx { private void summary0(IgniteUuid fileId, IgfsPathSummary sum) throws IgniteCheckedException { assert sum != null; - IgfsFileInfo info = meta.info(fileId); + IgfsEntryInfo info = meta.info(fileId); if (info != null) { if (info.isDirectory()) { @@ -1401,7 +1401,7 @@ public final class IgfsImpl implements IgfsEx { List<IgniteUuid> ids = meta.fileIds(path); - IgfsFileInfo fileInfo = meta.info(ids.get(ids.size() - 1)); + IgfsEntryInfo fileInfo = meta.info(ids.get(ids.size() - 1)); if (fileInfo == null) return null; // File does not exist. @@ -1571,7 +1571,7 @@ public final class IgfsImpl implements IgfsEx { assert path != null; assert mode != null; - IgfsFileInfo info = null; + IgfsEntryInfo info = null; switch (mode) { case PRIMARY: @@ -1698,7 +1698,7 @@ public final class IgfsImpl implements IgfsEx { * @param mode IGFS mode. * @param batch Optional secondary file system batch. */ - IgfsEventAwareOutputStream(IgfsPath path, IgfsFileInfo fileInfo, + IgfsEventAwareOutputStream(IgfsPath path, IgfsEntryInfo fileInfo, IgniteUuid parentId, int bufSize, IgfsMode mode, @Nullable IgfsFileWorkerBatch batch) { super(igfsCtx, path, fileInfo, parentId, bufSize, mode, batch, metrics); @@ -1737,7 +1737,7 @@ public final class IgfsImpl implements IgfsEx { * @param secReader Optional secondary file system reader. * @param metrics Metrics. */ - IgfsEventAwareInputStream(IgfsContext igfsCtx, IgfsPath path, IgfsFileInfo fileInfo, + IgfsEventAwareInputStream(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int prefetchBlocks, int seqReadsBeforePrefetch, @Nullable IgfsSecondaryFileSystemPositionedReadable secReader, IgfsLocalMetrics metrics) { super(igfsCtx, path, fileInfo, prefetchBlocks, seqReadsBeforePrefetch, secReader, metrics); http://git-wip-us.apache.org/repos/asf/ignite/blob/4794f87b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamAdapter.java index afcece7..07ab051 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamAdapter.java @@ -17,10 +17,11 @@ package org.apache.ignite.internal.processors.igfs; -import java.io.IOException; import org.apache.ignite.igfs.IgfsInputStream; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; +import java.io.IOException; + /** * Implementation adapter providing necessary methods. */ @@ -36,7 +37,7 @@ public abstract class IgfsInputStreamAdapter extends IgfsInputStream * * @return File info. */ - public abstract IgfsFileInfo fileInfo(); + public abstract IgfsEntryInfo fileInfo(); /** * Reads bytes from given position. http://git-wip-us.apache.org/repos/asf/ignite/blob/4794f87b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java index 62f8034..5d41543 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java @@ -17,16 +17,6 @@ package org.apache.ignite.internal.processors.igfs; -import java.io.EOFException; -import java.io.IOException; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.igfs.IgfsCorruptedFileException; @@ -42,6 +32,17 @@ import org.apache.ignite.lang.IgniteInClosure; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import java.io.EOFException; +import java.io.IOException; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + /** * Input stream to read data from grid cache with separate blocks. */ @@ -66,7 +67,7 @@ public class IgfsInputStreamImpl extends IgfsInputStreamAdapter { protected final IgfsPath path; /** File descriptor. */ - private volatile IgfsFileInfo fileInfo; + private volatile IgfsEntryInfo fileInfo; /** The number of already read bytes. Important! Access to the property is guarded by this object lock. */ private long pos; @@ -121,7 +122,7 @@ public class IgfsInputStreamImpl extends IgfsInputStreamAdapter { * @param secReader Optional secondary file system reader. * @param metrics Local IGFS metrics. */ - IgfsInputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsFileInfo fileInfo, int prefetchBlocks, + IgfsInputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int prefetchBlocks, int seqReadsBeforePrefetch, @Nullable IgfsSecondaryFileSystemPositionedReadable secReader, IgfsLocalMetrics metrics) { assert igfsCtx != null; assert path != null; @@ -157,7 +158,7 @@ public class IgfsInputStreamImpl extends IgfsInputStreamAdapter { } /** {@inheritDoc} */ - @Override public IgfsFileInfo fileInfo() { + @Override public IgfsEntryInfo fileInfo() { return fileInfo; } @@ -404,7 +405,7 @@ public class IgfsInputStreamImpl extends IgfsInputStreamAdapter { // This failure may be caused by file being fragmented. if (fileInfo.fileMap() != null && !fileInfo.fileMap().ranges().isEmpty()) { - IgfsFileInfo newInfo = meta.info(fileInfo.id()); + IgfsEntryInfo newInfo = meta.info(fileInfo.id()); // File was deleted. if (newInfo == null) @@ -534,7 +535,8 @@ public class IgfsInputStreamImpl extends IgfsInputStreamAdapter { * @return Requested data block or {@code null} if nothing found. * @throws IgniteCheckedException If failed. */ - @Nullable protected IgniteInternalFuture<byte[]> dataBlock(IgfsFileInfo fileInfo, long blockIdx) throws IgniteCheckedException { + @Nullable protected IgniteInternalFuture<byte[]> dataBlock(IgfsEntryInfo fileInfo, long blockIdx) + throws IgniteCheckedException { return data.dataBlock(fileInfo, path, blockIdx, secReader); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4794f87b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java index 0ed7c0d..eadbdb2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java @@ -17,11 +17,6 @@ package org.apache.ignite.internal.processors.igfs; -import java.io.Closeable; -import java.io.DataInput; -import java.io.IOException; -import java.util.Iterator; -import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -46,6 +41,12 @@ import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; +import java.io.Closeable; +import java.io.DataInput; +import java.io.IOException; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicLong; + /** * IGFS IPC handler. */ @@ -342,10 +343,7 @@ class IgfsIpcHandler implements IgfsServerHandler { log.debug("Opened IGFS input stream for file read [igfsName=" + igfs.name() + ", path=" + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); - IgfsFileInfo info = new IgfsFileInfo(igfsIn.fileInfo(), null, - igfsIn.fileInfo().modificationTime()); - - res.response(new IgfsInputStreamDescriptor(streamId, info.length())); + res.response(new IgfsInputStreamDescriptor(streamId, igfsIn.fileInfo().length())); break; } http://git-wip-us.apache.org/repos/asf/ignite/blob/4794f87b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsListingEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsListingEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsListingEntry.java index 4fe0dca..ea05ca3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsListingEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsListingEntry.java @@ -17,16 +17,16 @@ package org.apache.ignite.internal.processors.igfs; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; - import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + /** * Directory listing entry. */ @@ -52,7 +52,7 @@ public class IgfsListingEntry implements Externalizable { * * @param fileInfo File info to construct listing entry from. */ - public IgfsListingEntry(IgfsFileInfo fileInfo) { + public IgfsListingEntry(IgfsEntryInfo fileInfo) { id = fileInfo.id(); dir = fileInfo.isDirectory(); }
