IGNITE-3961: IGFS: Added IgfsSecondaryFileSystem.affintiy() method. This closes #1114. This closes #1252.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2df39a80 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2df39a80 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2df39a80 Branch: refs/heads/ignite-3477 Commit: 2df39a80d80e2575be61a902ccd48615796fcde9 Parents: a61b0ea Author: tledkov-gridgain <tled...@gridgain.com> Authored: Wed Dec 28 16:47:24 2016 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Wed Dec 28 16:47:24 2016 +0300 ---------------------------------------------------------------------- .../igfs/IgfsGroupDataBlocksKeyMapper.java | 17 +- .../igfs/secondary/IgfsSecondaryFileSystem.java | 18 ++ .../local/LocalIgfsSecondaryFileSystem.java | 96 ++++++- .../processors/igfs/IgfsBaseBlockKey.java | 42 +++ .../internal/processors/igfs/IgfsBlockKey.java | 26 +- .../processors/igfs/IgfsBlockLocationImpl.java | 55 ++++ .../processors/igfs/IgfsDataManager.java | 12 +- .../internal/processors/igfs/IgfsImpl.java | 12 +- .../processors/igfs/IgfsKernalContextAware.java | 32 --- .../igfs/IgfsSecondaryFileSystemImpl.java | 7 + .../local/LocalFileSystemBlockKey.java | 103 +++++++ .../LocalFileSystemPositionedReadable.java | 65 +++++ ...fsSecondaryFileSystemPositionedReadable.java | 65 ----- .../processors/resource/GridResourceIoc.java | 6 +- .../resource/GridResourceProcessor.java | 31 ++- .../ignite/resources/FileSystemResource.java | 62 +++++ .../processors/igfs/IgfsAbstractSelfTest.java | 2 +- .../igfs/IgfsDualAbstractSelfTest.java | 14 +- ...fsLocalSecondaryFileSystemProxySelfTest.java | 81 ++++++ ...gfsSecondaryFileSystemInjectionSelfTest.java | 270 +++++++++++++++++++ .../ignite/testsuites/IgniteIgfsTestSuite.java | 3 + .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 21 +- ...doopIgfsSecondaryFileSystemDelegateImpl.java | 61 ++++- .../impl/igfs/Hadoop1OverIgfsProxyTest.java | 67 +++++ .../testsuites/IgniteHadoopTestSuite.java | 2 + 25 files changed, 1031 insertions(+), 139 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/igfs/IgfsGroupDataBlocksKeyMapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsGroupDataBlocksKeyMapper.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsGroupDataBlocksKeyMapper.java index b35b692..09143d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsGroupDataBlocksKeyMapper.java +++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsGroupDataBlocksKeyMapper.java @@ -18,9 +18,10 @@ package org.apache.ignite.igfs; import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper; -import org.apache.ignite.internal.processors.igfs.IgfsBlockKey; +import org.apache.ignite.internal.processors.igfs.IgfsBaseBlockKey; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; /** * {@code IGFS} class providing ability to group file's data blocks together on one node. @@ -84,15 +85,17 @@ public class IgfsGroupDataBlocksKeyMapper extends GridCacheDefaultAffinityKeyMap /** {@inheritDoc} */ @Override public Object affinityKey(Object key) { - if (key != null && IgfsBlockKey.class.equals(key.getClass())) { - IgfsBlockKey blockKey = (IgfsBlockKey)key; + if (key instanceof IgfsBaseBlockKey) { + IgfsBaseBlockKey blockKey = (IgfsBaseBlockKey)key; - if (blockKey.affinityKey() != null) - return blockKey.affinityKey(); + IgniteUuid affKey = blockKey.affinityKey(); - long grpId = blockKey.getBlockId() / grpSize; + if (affKey != null) + return affKey; - return blockKey.getFileId().hashCode() + (int)(grpId ^ (grpId >>> 32)); + long grpId = blockKey.blockId() / grpSize; + + return blockKey.fileHash() + (int)(grpId ^ (grpId >>> 32)); } return super.affinityKey(key); http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java index 37c9c7d..76ba454 100644 --- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java +++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java @@ -21,8 +21,10 @@ import java.io.OutputStream; import java.util.Collection; import java.util.Map; import org.apache.ignite.IgniteException; +import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsFile; import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.igfs.IgfsPathNotFoundException; import org.jetbrains.annotations.Nullable; /** @@ -202,4 +204,20 @@ public interface IgfsSecondaryFileSystem { * @throws IgniteException If failed. */ public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException; + + /** + * Get affinity block locations for data blocks of the file. In case {@code maxLen} parameter is set and + * particular block location length is greater than this value, block locations will be split into smaller + * chunks. + * + * @param path File path to get affinity for. + * @param start Position in the file to start affinity resolution from. + * @param len Size of data in the file to resolve affinity for. + * @param maxLen Maximum length of a single returned block location length. + * @return Affinity block locations. + * @throws IgniteException In case of error. + * @throws IgfsPathNotFoundException If path doesn't exist. + */ + public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len, long maxLen) + throws IgniteException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java index 18d31de..86f7387 100644 --- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java +++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java @@ -17,10 +17,14 @@ package org.apache.ignite.igfs.secondary.local; +import java.util.ArrayList; import java.nio.file.attribute.BasicFileAttributeView; import java.nio.file.attribute.FileTime; import java.util.concurrent.TimeUnit; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsException; import org.apache.ignite.igfs.IgfsFile; import org.apache.ignite.igfs.IgfsPath; @@ -29,14 +33,20 @@ import org.apache.ignite.igfs.IgfsPathIsNotDirectoryException; import org.apache.ignite.igfs.IgfsPathNotFoundException; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; +import org.apache.ignite.internal.processors.igfs.IgfsDataManager; +import org.apache.ignite.internal.processors.igfs.IgfsImpl; +import org.apache.ignite.internal.processors.igfs.secondary.local.LocalFileSystemBlockKey; import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.processors.igfs.IgfsBlockLocationImpl; import org.apache.ignite.internal.processors.igfs.secondary.local.LocalFileSystemIgfsFile; import org.apache.ignite.internal.processors.igfs.secondary.local.LocalFileSystemSizeVisitor; import org.apache.ignite.internal.processors.igfs.secondary.local.LocalFileSystemUtils; -import org.apache.ignite.internal.processors.igfs.secondary.local.LocalIgfsSecondaryFileSystemPositionedReadable; +import org.apache.ignite.internal.processors.igfs.secondary.local.LocalFileSystemPositionedReadable; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lifecycle.LifecycleAware; +import org.apache.ignite.resources.FileSystemResource; +import org.apache.ignite.resources.LoggerResource; import org.jetbrains.annotations.Nullable; import java.io.File; @@ -61,6 +71,16 @@ public class LocalIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, Li /** Path that will be added to each passed path. */ private String workDir; + /** Logger. */ + @SuppressWarnings("unused") + @LoggerResource + private IgniteLogger log; + + /** IGFS instance. */ + @SuppressWarnings("unused") + @FileSystemResource + private IgfsImpl igfs; + /** * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception. * @@ -258,7 +278,7 @@ public class LocalIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, Li try { FileInputStream in = new FileInputStream(fileForPath(path)); - return new LocalIgfsSecondaryFileSystemPositionedReadable(in, bufSize); + return new LocalFileSystemPositionedReadable(in, bufSize); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to open file for read: " + path); @@ -402,6 +422,78 @@ public class LocalIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, Li // No-op. } + /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len, + long maxLen) throws IgniteException { + File f = fileForPath(path); + + if (!f.exists()) + throw new IgfsPathNotFoundException("File not found: " + path); + + // Create fake block & fake affinity for blocks + long blockSize = igfs.configuration().getBlockSize(); + + if (maxLen <= 0) + maxLen = Long.MAX_VALUE; + + assert maxLen > 0 : "maxLen : " + maxLen; + + long end = start + len; + + Collection<IgfsBlockLocation> blocks = new ArrayList<>((int)(len / maxLen)); + + IgfsDataManager data = igfs.context().data(); + + Collection<ClusterNode> lastNodes = null; + + long lastBlockIdx = -1; + + IgfsBlockLocationImpl lastBlock = null; + + for (long offset = start; offset < end; ) { + long blockIdx = offset / blockSize; + + // Each step is min of maxLen and end of block. + long lenStep = Math.min( + maxLen - (lastBlock != null ? lastBlock.length() : 0), + (blockIdx + 1) * blockSize - offset); + + lenStep = Math.min(lenStep, end - offset); + + // Create fake affinity key to map blocks of secondary filesystem to nodes. + LocalFileSystemBlockKey affKey = new LocalFileSystemBlockKey(path, blockIdx); + + if (blockIdx != lastBlockIdx) { + Collection<ClusterNode> nodes = data.affinityNodes(affKey); + + if (!nodes.equals(lastNodes) && lastNodes != null && lastBlock != null) { + blocks.add(lastBlock); + + lastBlock = null; + } + + lastNodes = nodes; + + lastBlockIdx = blockIdx; + } + + if(lastBlock == null) + lastBlock = new IgfsBlockLocationImpl(offset, lenStep, lastNodes); + else + lastBlock.increaseLength(lenStep); + + if (lastBlock.length() == maxLen || lastBlock.start() + lastBlock.length() == end) { + blocks.add(lastBlock); + + lastBlock = null; + } + + offset += lenStep; + } + + return blocks; + } + /** * Get work directory. * http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBaseBlockKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBaseBlockKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBaseBlockKey.java new file mode 100644 index 0000000..05ef086 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBaseBlockKey.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +/** + * The base class to block key that is used by the {@link IgfsGroupDataBlocksKeyMapper} + */ +public interface IgfsBaseBlockKey { + /** + * @return Block ID. + */ + public long blockId(); + + /** + * @return Hash based on a file identifier (path, ID, etc). + */ + public int fileHash(); + + /** + * @return Block affinity key (if any). + */ + @Nullable public IgniteUuid affinityKey(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java index c366ae3..414f6b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java @@ -44,8 +44,8 @@ import org.jetbrains.annotations.Nullable; /** * File's binary data block key. */ -@GridInternal -public final class IgfsBlockKey implements Message, Externalizable, Binarylizable, Comparable<IgfsBlockKey> { +public final class IgfsBlockKey implements IgfsBaseBlockKey, Message, Externalizable, Binarylizable, + Comparable<IgfsBlockKey> { /** */ private static final long serialVersionUID = 0L; @@ -93,13 +93,21 @@ public final class IgfsBlockKey implements Message, Externalizable, Binarylizabl return fileId; } - /** - * @return Block affinity key. - */ - public IgniteUuid affinityKey() { + /** {@inheritDoc} */ + @Override public IgniteUuid affinityKey() { return affKey; } + /** {@inheritDoc} */ + @Override public long blockId() { + return blockId; + } + + /** {@inheritDoc} */ + @Override public int fileHash() { + return fileId.hashCode(); + } + /** * @return Evict exclude flag. */ @@ -107,12 +115,6 @@ public final class IgfsBlockKey implements Message, Externalizable, Binarylizabl return evictExclude; } - /** - * @return Block ID. - */ - public long getBlockId() { - return blockId; - } /** {@inheritDoc} */ @Override public void onAckReceived() { http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java index 2d4a0af..3f5d9fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java @@ -24,6 +24,7 @@ import java.io.ObjectOutput; import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.LinkedHashSet; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; @@ -39,6 +40,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.NotNull; /** * File block location in the grid. @@ -61,6 +63,7 @@ public class IgfsBlockLocationImpl implements IgfsBlockLocation, Externalizable, private Collection<String> names; /** */ + @GridToStringInclude private Collection<String> hosts; /** @@ -102,6 +105,44 @@ public class IgfsBlockLocationImpl implements IgfsBlockLocation, Externalizable, } /** + * @param start Start. + * @param len Length. + * @param block Block. + */ + public IgfsBlockLocationImpl(long start, long len, IgfsBlockLocation block) { + assert start >= 0; + assert len > 0; + + this.start = start; + this.len = len; + + nodeIds = block.nodeIds(); + names = block.names(); + hosts = block.hosts(); + } + + /** + * @param start Start. + * @param len Length. + * @param names Collection of host:port addresses. + * @param hosts Collection of host:port addresses. + */ + public IgfsBlockLocationImpl(long start, long len, Collection<String> names, Collection<String> hosts) { + assert start >= 0; + assert len > 0; + assert names != null && !names.isEmpty(); + assert hosts != null && !hosts.isEmpty(); + + this.start = start; + this.len = len; + + nodeIds = Collections.emptySet(); + + this.names = names; + this.hosts = hosts; + } + + /** * @return Start position. */ @Override public long start() { @@ -116,6 +157,20 @@ public class IgfsBlockLocationImpl implements IgfsBlockLocation, Externalizable, } /** + * @param addLen Length to increase. + */ + public void increaseLength(long addLen) { + len += addLen; + } + + /** + * @param len Block length. + */ + public void length(long len) { + this.len = len; + } + + /** * @return Node IDs. */ @Override public Collection<UUID> nodeIds() { http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java index 4490a68..d6297b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java @@ -278,6 +278,16 @@ public class IgfsDataManager extends IgfsManager { } /** + * Maps affinity key to node. + * + * @param affinityKey Affinity key to map. + * @return Primary node for this key. + */ + public Collection<ClusterNode> affinityNodes(Object affinityKey) { + return dataCache.affinity().mapKeyToPrimaryAndBackups(affinityKey); + } + + /** * Creates new instance of explicit data streamer. * * @return New instance of data streamer. @@ -1045,7 +1055,7 @@ public class IgfsDataManager extends IgfsManager { // Create non-colocated key. IgfsBlockKey key = new IgfsBlockKey(colocatedKey.getFileId(), null, - colocatedKey.evictExclude(), colocatedKey.getBlockId()); + colocatedKey.evictExclude(), colocatedKey.blockId()); try (IgniteInternalTx tx = dataCachePrj.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { // Lock keys. http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index 01e434f..59674f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -179,8 +179,11 @@ public final class IgfsImpl implements IgfsEx { data = igfsCtx.data(); secondaryFs = cfg.getSecondaryFileSystem(); - if (secondaryFs instanceof IgfsKernalContextAware) - ((IgfsKernalContextAware)secondaryFs).setKernalContext(igfsCtx.kernalContext()); + if (secondaryFs != null) { + igfsCtx.kernalContext().resource().injectGeneric(secondaryFs); + + igfsCtx.kernalContext().resource().injectFileSystem(secondaryFs, this); + } if (secondaryFs instanceof LifecycleAware) ((LifecycleAware)secondaryFs).start(); @@ -635,7 +638,7 @@ public final class IgfsImpl implements IgfsEx { IgfsFile file = secondaryFs.update(path, props); if (file != null) - return new IgfsFileImpl(secondaryFs.update(path, props), data.groupBlockSize()); + return new IgfsFileImpl(file, data.groupBlockSize()); } return null; @@ -1263,6 +1266,9 @@ public final class IgfsImpl implements IgfsEx { IgfsMode mode = resolveMode(path); + if (mode == PROXY) + return secondaryFs.affinity(path, start, len, maxLen); + // Check memory first. IgfsEntryInfo info = meta.infoForPath(path); http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsKernalContextAware.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsKernalContextAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsKernalContextAware.java deleted file mode 100644 index 7f59db4..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsKernalContextAware.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.igfs; - -import org.apache.ignite.internal.GridKernalContext; - -/** - * Indicates whether particular file system accepts kernal context. - */ -public interface IgfsKernalContextAware { - /** - * Set kernal context. - * - * @param ctx Kernal context. - */ - public void setKernalContext(GridKernalContext ctx); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java index 4e14b46..1c135fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.igfs; import org.apache.ignite.IgniteException; +import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsFile; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; @@ -121,4 +122,10 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem { @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException { igfs.setTimes(path, accessTime, modificationTime); } + + /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len, + long maxLen) throws IgniteException { + return igfs.affinity(path, start, len, maxLen); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemBlockKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemBlockKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemBlockKey.java new file mode 100644 index 0000000..e3990df --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemBlockKey.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs.secondary.local; + +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.processors.igfs.IgfsBaseBlockKey; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.NotNull; + +/** + * File's binary data block key. + */ +public final class LocalFileSystemBlockKey implements IgfsBaseBlockKey, Comparable<LocalFileSystemBlockKey> { + /** IGFS path. */ + private IgfsPath path; + + /** Block ID. */ + private long blockId; + + /** + * Constructs file's binary data block key. + * + * @param path IGFS path. + * @param blockId Block ID. + */ + public LocalFileSystemBlockKey(IgfsPath path, long blockId) { + assert path != null; + assert blockId >= 0; + + this.path = path; + this.blockId = blockId; + } + + /** {@inheritDoc} */ + @Override public long blockId() { + return blockId; + } + + /** {@inheritDoc} */ + @Override public int fileHash() { + return path.hashCode(); + } + + /** {@inheritDoc} */ + @Override public IgniteUuid affinityKey() { + return null; + } + + /** {@inheritDoc} */ + @Override public int compareTo(@NotNull LocalFileSystemBlockKey o) { + int res = path.compareTo(o.path); + + if (res != 0) + return res; + + long v1 = blockId; + long v2 = o.blockId; + + if (v1 != v2) + return v1 > v2 ? 1 : -1; + + return 0; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return path.hashCode() + (int)(blockId ^ (blockId >>> 32)); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (o == this) + return true; + + if (o == null || !(o instanceof LocalFileSystemBlockKey)) + return false; + + LocalFileSystemBlockKey that = (LocalFileSystemBlockKey)o; + + return blockId == that.blockId && path.equals(that.path); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(LocalFileSystemBlockKey.class, this); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemPositionedReadable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemPositionedReadable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemPositionedReadable.java new file mode 100644 index 0000000..6bdba95 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemPositionedReadable.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs.secondary.local; + +import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.IOException; + +/** + * Positioned readable interface for local secondary file system. + */ +public class LocalFileSystemPositionedReadable extends BufferedInputStream + implements IgfsSecondaryFileSystemPositionedReadable { + /** Last read position. */ + private long lastReadPos; + + /** + * Constructor. + * + * @param in Input stream. + * @param bufSize Buffer size. + */ + public LocalFileSystemPositionedReadable(FileInputStream in, int bufSize) { + super(in, bufSize); + } + + /** {@inheritDoc} */ + @Override public int read(long readPos, byte[] buf, int off, int len) throws IOException { + if (in == null) + throw new IOException("Stream is closed."); + + if (readPos < lastReadPos || readPos + len > lastReadPos + this.buf.length) { + ((FileInputStream)in).getChannel().position(readPos); + + pos = 0; + count = 0; + } + + int bytesRead = read(buf, off, len); + + if (bytesRead != -1) { + // Advance last read position only if we really read some bytes from the stream. + lastReadPos = readPos + bytesRead; + } + + return bytesRead; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalIgfsSecondaryFileSystemPositionedReadable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalIgfsSecondaryFileSystemPositionedReadable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalIgfsSecondaryFileSystemPositionedReadable.java deleted file mode 100644 index ebf56ad..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalIgfsSecondaryFileSystemPositionedReadable.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.igfs.secondary.local; - -import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; - -import java.io.BufferedInputStream; -import java.io.FileInputStream; -import java.io.IOException; - -/** - * Positioned readable interface for local secondary file system. - */ -public class LocalIgfsSecondaryFileSystemPositionedReadable extends BufferedInputStream - implements IgfsSecondaryFileSystemPositionedReadable { - /** Last read position. */ - private long lastReadPos; - - /** - * Constructor. - * - * @param in Input stream. - * @param bufSize Buffer size. - */ - public LocalIgfsSecondaryFileSystemPositionedReadable(FileInputStream in, int bufSize) { - super(in, bufSize); - } - - /** {@inheritDoc} */ - @Override public int read(long readPos, byte[] buf, int off, int len) throws IOException { - if (in == null) - throw new IOException("Stream is closed."); - - if (readPos < lastReadPos || readPos + len > lastReadPos + this.buf.length) { - ((FileInputStream)in).getChannel().position(readPos); - - pos = 0; - count = 0; - } - - int bytesRead = read(buf, off, len); - - if (bytesRead != -1) { - // Advance last read position only if we really read some bytes from the stream. - lastReadPos = readPos + bytesRead; - } - - return bytesRead; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java index 0158973..07a4fff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.resources.CacheNameResource; import org.apache.ignite.resources.CacheStoreSessionResource; +import org.apache.ignite.resources.FileSystemResource; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.JobContextResource; import org.apache.ignite.resources.LoadBalancerResource; @@ -511,7 +512,10 @@ public class GridResourceIoc { JOB_CONTEXT(JobContextResource.class), /** */ - CACHE_STORE_SESSION(CacheStoreSessionResource.class); + CACHE_STORE_SESSION(CacheStoreSessionResource.class), + + /** */ + FILESYSTEM_RESOURCE(FileSystemResource.class); /** */ public final Class<? extends Annotation> clazz; http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java index 84d07b6..bdfbe50 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java @@ -20,8 +20,8 @@ package org.apache.ignite.internal.processors.resource; import java.lang.annotation.Annotation; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.Collection; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteFileSystem; import org.apache.ignite.cache.store.CacheStoreSession; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.ComputeJobContext; @@ -201,6 +201,26 @@ public class GridResourceProcessor extends GridProcessorAdapter { } /** + * Injects filesystem instance into given object. + * + * @param obj Object. + * @param igfs Ignite filesystem to inject. + * @return {@code True} if filesystem was injected. + * @throws IgniteCheckedException If failed to inject. + */ + public boolean injectFileSystem(Object obj, IgniteFileSystem igfs) throws IgniteCheckedException { + assert obj != null; + + if (log.isDebugEnabled()) + log.debug("Injecting cache store session: " + obj); + + // Unwrap Proxy object. + obj = unwrapTarget(obj); + + return inject(obj, GridResourceIoc.ResourceAnnotation.FILESYSTEM_RESOURCE, null, null, igfs); + } + + /** * @param obj Object to inject. * @throws IgniteCheckedException If failed to inject. */ @@ -308,6 +328,10 @@ public class GridResourceProcessor extends GridProcessorAdapter { res = new GridResourceJobContextInjector((ComputeJobContext)param); break; + case FILESYSTEM_RESOURCE: + res = new GridResourceBasicInjector<>(param); + break; + default: res = injectorByAnnotation[ann.ordinal()]; break; @@ -318,6 +342,11 @@ public class GridResourceProcessor extends GridProcessorAdapter { /** * @param obj Object to inject. + * @param ann Annotation enum. + * @param dep Grid deployment object. + * @param depCls Grid deployment class. + * @param param Resource to inject. + * @return {@code True} if resource was injected. * @throws IgniteCheckedException If failed to inject. */ private boolean inject(Object obj, GridResourceIoc.ResourceAnnotation ann, @Nullable GridDeployment dep, http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/resources/FileSystemResource.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/resources/FileSystemResource.java b/modules/core/src/main/java/org/apache/ignite/resources/FileSystemResource.java new file mode 100644 index 0000000..e2aa06d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/resources/FileSystemResource.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.resources; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotates a field or a setter method for injection of primary Ignite filesystem to a secondary + * filesystem implementation. + * + * <p> + * Here is how injection would typically happen: + * <pre name="code" class="java"> + * public class MySecondaryFS implements IgfsSecondaryFileSystem { + * ... + * // Inject instance of primary filesystem. + * @FileSystemResource + * private IgniteFileSystem igfs; + * ... + * } + * </pre> + * or attach the same annotations to methods: + * <pre name="code" class="java"> + * public class MySecondaryFS implements IgfsSecondaryFileSystem { + * ... + * private IgniteFileSystem igfs; + * ... + * // Inject instance of primary filesystem. + * @FileSystemResource + * public void setIgfs(IgniteFileSystem igfs) { + * this.igfs = igfs; + * } + * ... + * } + * </pre> + * <p> + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.METHOD, ElementType.FIELD}) +public @interface FileSystemResource { + // No-op. +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java index 04f3c8e..d0b700e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java @@ -736,7 +736,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsAbstractBaseSelfTest { if(!propertiesSupported()) return; - if (dual && !(igfsSecondaryFileSystem instanceof IgfsSecondaryFileSystemImpl)) { + if (mode != PRIMARY && !(igfsSecondaryFileSystem instanceof IgfsSecondaryFileSystemImpl)) { // In case of Hadoop dual mode only user name, group name, and permission properties are updated, // an arbitrary named property is just ignored: checkRootPropertyUpdate("foo", "moo", null); http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java index bea318d..7b83cfc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.CyclicBarrier; import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC; import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC; +import static org.apache.ignite.igfs.IgfsMode.PROXY; /** * Tests for IGFS working in mode when remote file system exists: DUAL_SYNC, DUAL_ASYNC. @@ -48,8 +49,6 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { */ protected IgfsDualAbstractSelfTest(IgfsMode mode) { super(mode); - - assert mode == DUAL_SYNC || mode == DUAL_ASYNC; } /** {@inheritDoc} */ @@ -72,10 +71,13 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest { assert igfs.exists(p); assert igfs.modeResolver().resolveMode(gg) == mode; - assert igfs.modeResolver().resolveMode(new IgfsPath(gg, "sync")) == IgfsMode.DUAL_SYNC; - assert igfs.modeResolver().resolveMode(new IgfsPath(gg, "async")) == IgfsMode.DUAL_ASYNC; - assert igfs.modeResolver().resolveMode(new IgfsPath(gg, "primary")) == IgfsMode.PRIMARY; - assert !igfsSecondary.exists("/ignite/primary"); // PRIMARY mode path must exist in upper level fs only. + + if (mode != PROXY) { + assert igfs.modeResolver().resolveMode(new IgfsPath(gg, "sync")) == IgfsMode.DUAL_SYNC; + assert igfs.modeResolver().resolveMode(new IgfsPath(gg, "async")) == IgfsMode.DUAL_ASYNC; + assert igfs.modeResolver().resolveMode(new IgfsPath(gg, "primary")) == IgfsMode.PRIMARY; + assert !igfsSecondary.exists("/ignite/primary"); // PRIMARY mode path must exist in upper level fs only. + } // All the child paths of "/ignite/" must be visible in listings: assert igfs.listFiles(gg).size() == 3; http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsLocalSecondaryFileSystemProxySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsLocalSecondaryFileSystemProxySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsLocalSecondaryFileSystemProxySelfTest.java index e7f9bbb..e7e3ac8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsLocalSecondaryFileSystemProxySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsLocalSecondaryFileSystemProxySelfTest.java @@ -24,6 +24,7 @@ import java.io.OutputStream; import java.nio.file.Files; import java.util.Collection; import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsFile; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; @@ -58,6 +59,11 @@ public class IgfsLocalSecondaryFileSystemProxySelfTest extends IgfsProxySelfTest /** */ private final File fileLinkSrc = new File(FS_WORK_DIR + File.separatorChar + "file"); + /** {@inheritDoc} */ + @Override protected int nodeCount() { + return 3; + } + /** * Creates secondary filesystems. * @return IgfsSecondaryFileSystem @@ -215,6 +221,81 @@ public class IgfsLocalSecondaryFileSystemProxySelfTest extends IgfsProxySelfTest } /** + * @throws Exception If failed. + */ + public void testAffinityMaxLen() throws Exception { + awaitPartitionMapExchange(); + + long fileSize = 32 * 1024 * 1024; + + IgfsPath filePath = new IgfsPath("/file"); + + try (OutputStream os = igfs.create(filePath, true)) { + for(int i = 0; i < fileSize / chunk.length; ++i) + os.write(chunk); + } + + Collection<IgfsBlockLocation> blocks; + + long len = igfs.info(filePath).length(); + int start = 0; + + // Check default maxLen (maxLen = 0) + for (int i = 0; i < igfs.context().data().groupBlockSize() / 1024; i++) { + Collection<IgfsBlockLocation> blocks0 = + igfs.affinity(filePath, start, len, 0); + + blocks = igfs.affinity(filePath, start, len, Long.MAX_VALUE); + + assertTrue(blocks0.size() > 1); + assertEquals(blocks0.size(), blocks.size()); + assertEquals(F.first(blocks).start(), start); + assertEquals(start + len, F.last(blocks).start() + F.last(blocks).length()); + assertEquals(blocks0, blocks); + + len -= 1024 * 2; + start += 1024; + System.out.println("+++ "); + } + + len = igfs.info(filePath).length(); + start = 0; + long maxLen = igfs.context().data().groupBlockSize() * 2; + + // Different cases of start, len and maxLen + for (int i = 0; i < igfs.context().data().groupBlockSize() / 1024; i++) { + blocks = igfs.affinity(filePath, start, len, maxLen); + + assertEquals(F.first(blocks).start(), start); + assertEquals(start + len, F.last(blocks).start() + F.last(blocks).length()); + + long totalLen = 0; + + for (IgfsBlockLocation block : blocks) { + totalLen += block.length(); + + assert block.length() <= maxLen : "block.length() <= maxLen. [block.length=" + block.length() + + ", maxLen=" + maxLen + ']'; + + assert block.length() + block.start() <= start + len : "block.length() + block.start() < start + len. [block.length=" + block.length() + + ", block.start()=" + block.start() + ", start=" + start +", len=" + len + ']'; + + for (IgfsBlockLocation block0 : blocks) + if (!block0.equals(block)) + assert block.start() < block0.start() && block.start() + block.length() <= block0.start() || + block.start() > block0.start() && block0.start() + block0.length() <= block.start() + : "Blocks cross each other: block0=" + block + ", block1= " + block0; + } + + assert totalLen == len : "Summary length of blocks must be: " + len + " actual: " + totalLen; + + len -= 1024 * 2; + start += 1024; + maxLen -= igfs.context().data().groupBlockSize() * 2 / 1024; + } + } + + /** * * @throws Exception If failed. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemInjectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemInjectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemInjectionSelfTest.java new file mode 100644 index 0000000..d4187cb --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemInjectionSelfTest.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import java.io.OutputStream; +import java.util.Collection; +import java.util.Map; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteFileSystem; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.igfs.IgfsBlockLocation; +import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsMode; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; +import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.resources.FileSystemResource; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; + +/** + * Tests for resource injection to secondary file system. + */ +public class IgfsSecondaryFileSystemInjectionSelfTest extends GridCommonAbstractTest { + /** IGFS name. */ + protected static final String IGFS_NAME = "igfs-test"; + + /** Test implementation of secondary filesystem. */ + private TestBaseSecondaryFsMock secondary; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + G.stopAll(true); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); + + igfsCfg.setDataCacheName("dataCache"); + igfsCfg.setMetaCacheName("metaCache"); + igfsCfg.setName(IGFS_NAME); + igfsCfg.setDefaultMode(IgfsMode.DUAL_SYNC); + igfsCfg.setSecondaryFileSystem(secondary); + + CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); + + dataCacheCfg.setName("dataCache"); + dataCacheCfg.setCacheMode(PARTITIONED); + dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + dataCacheCfg.setAtomicityMode(TRANSACTIONAL); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("metaCache"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + cfg.setFileSystemConfiguration(igfsCfg); + cfg.setCacheConfiguration(metaCacheCfg, dataCacheCfg); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings({"UnusedDeclaration"}) + public void testInjectPrimaryByField() throws Exception { + secondary = new TestBaseSecondaryFsMock() { + @FileSystemResource + private IgfsImpl igfs; + + @LoggerResource + private IgniteLogger log; + + @IgniteInstanceResource + private Ignite ig; + + @Override boolean checkInjection(Ignite ignite, IgniteFileSystem primary) { + return igfs == primary && log instanceof IgniteLogger && ig == ignite; + } + }; + + Ignite ig = startGrid(0); + + IgniteFileSystem igfs = ig.fileSystem(IGFS_NAME); + + assert secondary.checkInjection(ig, igfs); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings({"UnusedDeclaration"}) + public void testInjectPrimaryByMethods() throws Exception { + secondary = new TestBaseSecondaryFsMock() { + /** Ignite instance. */ + private Ignite ig; + + /** IGFS instance. */ + private IgniteFileSystem igfs; + + /** Logger injected flag */ + private boolean logSet; + + /** + * @param igfs Primary IGFS. + */ + @FileSystemResource + void setPrimaryIgfs(IgfsImpl igfs) { + this.igfs = igfs; + } + + /** + * @param log Ignite logger. + */ + @LoggerResource + void setIgLogger(IgniteLogger log) { + logSet = log instanceof IgniteLogger; + } + + /** + * @param ig Ignite instance. + */ + @IgniteInstanceResource + void setIgniteInst(Ignite ig) { + this.ig = ig; + } + + @Override boolean checkInjection(Ignite ignite, IgniteFileSystem primary) { + return ignite == ig && primary == igfs && logSet; + } + }; + + Ignite ig = startGrid(0); + + IgniteFileSystem igfs = ig.fileSystem(IGFS_NAME); + + assert secondary.checkInjection(ig, igfs); + } + + /** + * + */ + private static abstract class TestBaseSecondaryFsMock implements IgfsSecondaryFileSystem { + + /** {@inheritDoc} */ + @Override public boolean exists(IgfsPath path) { + return false; + } + + /** {@inheritDoc} */ + @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteException { + return null; + } + + /** {@inheritDoc} */ + @Override public void rename(IgfsPath src, IgfsPath dest) throws IgniteException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean delete(IgfsPath path, boolean recursive) throws IgniteException { + return false; + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path) throws IgniteException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) throws IgniteException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteException { + return null; + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteException { + return null; + } + + /** {@inheritDoc} */ + @Override + public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) throws IgniteException { + return null; + } + + /** {@inheritDoc} */ + @Override public OutputStream create(IgfsPath path, boolean overwrite) throws IgniteException { + return null; + } + + /** {@inheritDoc} */ + @Override + public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, long blockSize, + @Nullable Map<String, String> props) throws IgniteException { + return null; + } + + /** {@inheritDoc} */ + @Override public OutputStream append(IgfsPath path, int bufSize, boolean create, + @Nullable Map<String, String> props) throws IgniteException { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsFile info(IgfsPath path) throws IgniteException { + return null; + } + + /** {@inheritDoc} */ + @Override public long usedSpaceSize() throws IgniteException { + return 0; + } + + /** {@inheritDoc} */ + @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len, + long maxLen) throws IgniteException { + return null; + } + + /** + * @param ignite Ignite instance. + * @param primary Primary IGFS. + * @return {@code True} if injection is correct. + */ + abstract boolean checkInjection(Ignite ignite, IgniteFileSystem primary); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java index 775c2ce..76ed440 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java @@ -59,6 +59,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsProcessorSelfTest; import org.apache.ignite.internal.processors.igfs.IgfsProcessorValidationSelfTest; import org.apache.ignite.internal.processors.igfs.IgfsProxySelfTest; import org.apache.ignite.internal.processors.igfs.IgfsLocalSecondaryFileSystemProxySelfTest; +import org.apache.ignite.internal.processors.igfs.IgfsSecondaryFileSystemInjectionSelfTest; import org.apache.ignite.internal.processors.igfs.IgfsServerManagerIpcEndpointRegistrationOnWindowsSelfTest; import org.apache.ignite.internal.processors.igfs.IgfsSizeSelfTest; import org.apache.ignite.internal.processors.igfs.IgfsStartCacheTest; @@ -167,6 +168,8 @@ public class IgniteIgfsTestSuite extends TestSuite { suite.addTestSuite(IgfsAtomicPrimaryOffheapTieredSelfTest.class); suite.addTestSuite(IgfsAtomicPrimaryOffheapValuesSelfTest.class); + suite.addTestSuite(IgfsSecondaryFileSystemInjectionSelfTest.class); + return suite; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java index d9215db..674cca7 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java @@ -20,20 +20,22 @@ package org.apache.ignite.hadoop.fs; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteFileSystem; +import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsFile; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.igfs.IgfsUserContext; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader; import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils; import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware; import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils; import org.apache.ignite.internal.processors.hadoop.delegate.HadoopIgfsSecondaryFileSystemDelegate; -import org.apache.ignite.internal.processors.igfs.IgfsKernalContextAware; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lifecycle.LifecycleAware; +import org.apache.ignite.resources.IgniteInstanceResource; import org.jetbrains.annotations.Nullable; import java.io.OutputStream; @@ -46,8 +48,8 @@ import java.util.concurrent.Callable; * <p> * Target {@code FileSystem}'s are created on per-user basis using passed {@link HadoopFileSystemFactory}. */ -public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, IgfsKernalContextAware, - LifecycleAware, HadoopPayloadAware { +public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, LifecycleAware, + HadoopPayloadAware { /** The default user name. It is used if no user context is set. */ private String dfltUsrName; @@ -245,8 +247,17 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys } /** {@inheritDoc} */ - @Override public void setKernalContext(GridKernalContext ctx) { - this.ctx = ctx; + @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len, + long maxLen) throws IgniteException { + return target.affinity(path, start, len, maxLen); + } + + /** + * @param ignite Ignite instance. + */ + @IgniteInstanceResource + public void setIgniteInstance(IgniteEx ignite) { + ctx = ignite.context(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java index e336fad..fe6492e 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.processors.hadoop.impl.delegate; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.ParentNotDirectoryException; @@ -28,6 +31,7 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; +import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException; import org.apache.ignite.igfs.IgfsException; import org.apache.ignite.igfs.IgfsFile; @@ -42,6 +46,7 @@ import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFac import org.apache.ignite.internal.processors.hadoop.delegate.HadoopIgfsSecondaryFileSystemDelegate; import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsProperties; import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsSecondaryFileSystemPositionedReadable; +import org.apache.ignite.internal.processors.igfs.IgfsBlockLocationImpl; import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo; import org.apache.ignite.internal.processors.igfs.IgfsFileImpl; import org.apache.ignite.internal.processors.igfs.IgfsUtils; @@ -104,12 +109,17 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco final FileSystem fileSys = fileSystemForUser(); + Path hadoopPath = convert(path); + try { + if (!fileSys.exists(hadoopPath)) + return null; + if (props0.userName() != null || props0.groupName() != null) - fileSys.setOwner(convert(path), props0.userName(), props0.groupName()); + fileSys.setOwner(hadoopPath, props0.userName(), props0.groupName()); if (props0.permission() != null) - fileSys.setPermission(convert(path), props0.permission()); + fileSys.setPermission(hadoopPath, props0.permission()); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]"); @@ -266,7 +276,14 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco @Override public OutputStream append(IgfsPath path, int bufSize, boolean create, @Nullable Map<String, String> props) { try { - return fileSystemForUser().append(convert(path), bufSize); + Path hadoopPath = convert(path); + + FileSystem fs = fileSystemForUser(); + + if (create && !fs.exists(hadoopPath)) + return fs.create(hadoopPath, false, bufSize); + else + return fs.append(convert(path), bufSize); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]"); @@ -371,6 +388,24 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco } /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len, + long maxLen) throws IgniteException { + try { + BlockLocation[] hadoopBlocks = fileSystemForUser().getFileBlockLocations(convert(path), start, len); + + List<IgfsBlockLocation> blks = new ArrayList<>(hadoopBlocks.length); + + for (int i = 0; i < hadoopBlocks.length; ++i) + blks.add(convertBlockLocation(hadoopBlocks[i])); + + return blks; + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed affinity for path: " + path); + } + } + + /** {@inheritDoc} */ public void start() { factory.start(); } @@ -393,6 +428,25 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco } /** + * Convert IGFS affinity block location into Hadoop affinity block location. + * + * @param block IGFS affinity block location. + * @return Hadoop affinity block location. + */ + private IgfsBlockLocation convertBlockLocation(BlockLocation block) { + try { + String[] names = block.getNames(); + String[] hosts = block.getHosts(); + + return new IgfsBlockLocationImpl( + block.getOffset(), block.getLength(), + Arrays.asList(names), Arrays.asList(hosts)); + } catch (IOException e) { + throw handleSecondaryFsError(e, "Failed convert block location: " + block); + } + } + + /** * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception. * * @param e Exception to check. @@ -406,6 +460,7 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco /** * Cast IO exception to IGFS exception. * + * @param msg Error message. * @param e IO exception. * @return IGFS exception. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsProxyTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsProxyTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsProxyTest.java new file mode 100644 index 0000000..c7c792d --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsProxyTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.impl.igfs; + +import java.io.OutputStream; +import java.util.Collection; +import org.apache.ignite.igfs.IgfsBlockLocation; +import org.apache.ignite.igfs.IgfsMode; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.util.typedef.F; + +/** + * DUAL_ASYNC mode test. + */ +public class Hadoop1OverIgfsProxyTest extends Hadoop1DualAbstractTest { + /** + * Constructor. + */ + public Hadoop1OverIgfsProxyTest() { + super(IgfsMode.PROXY); + } + + /** + * @throws Exception If failed. + */ + public void testAffinity() throws Exception { + long fileSize = 32 * 1024 * 1024; + + IgfsPath filePath = new IgfsPath("/file"); + + try (OutputStream os = igfs.create(filePath, true)) { + for(int i = 0; i < fileSize / chunk.length; ++i) + os.write(chunk); + } + + long len = igfs.info(filePath).length(); + int start = 0; + + // Check default maxLen (maxLen = 0) + for (int i = 0; i < igfs.context().data().groupBlockSize() / 1024; i++) { + Collection<IgfsBlockLocation> blocks = igfs.affinity(filePath, start, len); + + assertEquals(F.first(blocks).start(), start); + assertEquals(start + len, F.last(blocks).start() + F.last(blocks).length()); + + len -= 1024 * 2; + start += 1024; + } + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index 6046cc1..01893fb 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.hadoop.impl.client.HadoopClientProt import org.apache.ignite.internal.processors.hadoop.impl.client.HadoopClientProtocolSelfTest; import org.apache.ignite.internal.processors.hadoop.impl.HadoopTxConfigCacheTest; import org.apache.ignite.internal.processors.hadoop.impl.fs.KerberosHadoopFileSystemFactorySelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.igfs.Hadoop1OverIgfsProxyTest; import org.apache.ignite.internal.processors.hadoop.impl.util.BasicUserNameMapperSelfTest; import org.apache.ignite.internal.processors.hadoop.impl.util.ChainedUserNameMapperSelfTest; import org.apache.ignite.internal.processors.hadoop.impl.util.KerberosUserNameMapperSelfTest; @@ -136,6 +137,7 @@ public class IgniteHadoopTestSuite extends TestSuite { suite.addTest(new TestSuite(ldr.loadClass(Hadoop1OverIgfsDualSyncTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(Hadoop1OverIgfsDualAsyncTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(Hadoop1OverIgfsProxyTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopFIleSystemFactorySelfTest.class.getName())));