IGNITE-3890: IGFS: Simplified IgfsInputStream hierarchy.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/16c5a715 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/16c5a715 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/16c5a715 Branch: refs/heads/ignite-3661 Commit: 16c5a715889322d31ed95a2a29206d3a909aa7b7 Parents: 43f65fe Author: vozerov-gridgain <voze...@gridgain.com> Authored: Tue Sep 13 18:00:31 2016 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Tue Sep 13 18:00:31 2016 +0300 ---------------------------------------------------------------------- .../internal/processors/igfs/IgfsAsyncImpl.java | 7 +-- .../ignite/internal/processors/igfs/IgfsEx.java | 10 ---- .../internal/processors/igfs/IgfsImpl.java | 11 +++-- .../processors/igfs/IgfsInputStreamAdapter.java | 51 -------------------- .../processors/igfs/IgfsInputStreamImpl.java | 17 +++++-- .../processors/igfs/IgfsIpcHandler.java | 7 +-- .../igfs/IgfsSecondaryFileSystemImpl.java | 2 +- .../processors/igfs/IgfsMetricsSelfTest.java | 21 +++----- .../internal/processors/igfs/IgfsMock.java | 8 +-- .../hadoop/igfs/HadoopIgfsInProc.java | 12 ++--- .../hadoop/HadoopCommandLineTest.java | 4 +- 11 files changed, 47 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/16c5a715/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java index 07b070e..743601e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java @@ -25,6 +25,7 @@ import org.apache.ignite.IgniteFileSystem; import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsInputStream; import org.apache.ignite.igfs.IgfsMetrics; import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsOutputStream; @@ -125,18 +126,18 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme } /** {@inheritDoc} */ - @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize, + @Override public IgfsInputStream open(IgfsPath path, int bufSize, int seqReadsBeforePrefetch) { return igfs.open(path, bufSize, seqReadsBeforePrefetch); } /** {@inheritDoc} */ - @Override public IgfsInputStreamAdapter open(IgfsPath path) { + @Override public IgfsInputStream open(IgfsPath path) { return igfs.open(path); } /** {@inheritDoc} */ - @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) { + @Override public IgfsInputStream open(IgfsPath path, int bufSize) { return igfs.open(path, bufSize); } http://git-wip-us.apache.org/repos/asf/ignite/blob/16c5a715/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java index 9760f43..05e157d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java @@ -49,16 +49,6 @@ public interface IgfsEx extends IgniteFileSystem { */ public IgfsPaths proxyPaths(); - /** {@inheritDoc} */ - @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize, int seqReadsBeforePrefetch) - throws IgniteException; - - /** {@inheritDoc} */ - @Override public IgfsInputStreamAdapter open(IgfsPath path) throws IgniteException; - - /** {@inheritDoc} */ - @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) throws IgniteException; - /** * Gets global space counters. * http://git-wip-us.apache.org/repos/asf/ignite/blob/16c5a715/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index 2720f24..2c1f0f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -36,6 +36,7 @@ import org.apache.ignite.events.IgfsEvent; import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsException; import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsInputStream; import org.apache.ignite.igfs.IgfsInvalidPathException; import org.apache.ignite.igfs.IgfsMetrics; import org.apache.ignite.igfs.IgfsMode; @@ -948,24 +949,24 @@ public final class IgfsImpl implements IgfsEx { } /** {@inheritDoc} */ - @Override public IgfsInputStreamAdapter open(IgfsPath path) { + @Override public IgfsInputStream open(IgfsPath path) { return open(path, cfg.getStreamBufferSize(), cfg.getSequentialReadsBeforePrefetch()); } /** {@inheritDoc} */ - @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) { + @Override public IgfsInputStream open(IgfsPath path, int bufSize) { return open(path, bufSize, cfg.getSequentialReadsBeforePrefetch()); } /** {@inheritDoc} */ - @Override public IgfsInputStreamAdapter open(final IgfsPath path, final int bufSize, + @Override public IgfsInputStream open(final IgfsPath path, final int bufSize, final int seqReadsBeforePrefetch) { A.notNull(path, "path"); A.ensure(bufSize >= 0, "bufSize >= 0"); A.ensure(seqReadsBeforePrefetch >= 0, "seqReadsBeforePrefetch >= 0"); - return safeOp(new Callable<IgfsInputStreamAdapter>() { - @Override public IgfsInputStreamAdapter call() throws Exception { + return safeOp(new Callable<IgfsInputStream>() { + @Override public IgfsInputStream call() throws Exception { if (log.isDebugEnabled()) log.debug("Open file for reading [path=" + path + ", bufSize=" + bufSize + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/16c5a715/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamAdapter.java deleted file mode 100644 index 07ab051..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamAdapter.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.igfs; - -import org.apache.ignite.igfs.IgfsInputStream; -import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; - -import java.io.IOException; - -/** - * Implementation adapter providing necessary methods. - */ -public abstract class IgfsInputStreamAdapter extends IgfsInputStream - implements IgfsSecondaryFileSystemPositionedReadable { - /** {@inheritDoc} */ - @Override public long length() { - return fileInfo().length(); - } - - /** - * Gets file info for opened file. - * - * @return File info. - */ - public abstract IgfsEntryInfo fileInfo(); - - /** - * Reads bytes from given position. - * - * @param pos Position to read from. - * @param len Number of bytes to read. - * @return Array of chunks with respect to chunk file representation. - * @throws IOException If read failed. - */ - public abstract byte[][] readChunks(long pos, int len) throws IOException; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/16c5a715/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java index ca2f9f7..f20a423 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java @@ -46,7 +46,7 @@ import java.util.concurrent.locks.ReentrantLock; /** * Input stream to read data from grid cache with separate blocks. */ -public class IgfsInputStreamImpl extends IgfsInputStreamAdapter { +public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondaryFileSystemPositionedReadable { /** Empty chunks result. */ private static final byte[][] EMPTY_CHUNKS = new byte[0][]; @@ -158,8 +158,8 @@ public class IgfsInputStreamImpl extends IgfsInputStreamAdapter { } /** {@inheritDoc} */ - @Override public IgfsEntryInfo fileInfo() { - return fileInfo; + @Override public long length() { + return fileInfo.length(); } /** {@inheritDoc} */ @@ -234,9 +234,16 @@ public class IgfsInputStreamImpl extends IgfsInputStreamAdapter { return readFromStore(pos, buf, off, len); } - /** {@inheritDoc} */ + /** + * Reads bytes from given position. + * + * @param pos Position to read from. + * @param len Number of bytes to read. + * @return Array of chunks with respect to chunk file representation. + * @throws IOException If read failed. + */ @SuppressWarnings("IfMayBeConditional") - @Override public synchronized byte[][] readChunks(long pos, int len) throws IOException { + public synchronized byte[][] readChunks(long pos, int len) throws IOException { // Readable bytes in the file, starting from the specified position. long readable = fileInfo.length() - pos; http://git-wip-us.apache.org/repos/asf/ignite/blob/16c5a715/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java index a888aff..6047604 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.igfs; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.igfs.IgfsInputStream; import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration; import org.apache.ignite.igfs.IgfsOutOfSpaceException; import org.apache.ignite.igfs.IgfsOutputStream; @@ -381,7 +382,7 @@ class IgfsIpcHandler implements IgfsServerHandler { break; case OPEN_READ: { - IgfsInputStreamAdapter igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) : + IgfsInputStream igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) : igfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch()); long streamId = registerResource(ses, igfsIn); @@ -390,7 +391,7 @@ class IgfsIpcHandler implements IgfsServerHandler { log.debug("Opened IGFS input stream for file read [igfsName=" + igfs.name() + ", path=" + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); - res.response(new IgfsInputStreamDescriptor(streamId, igfsIn.fileInfo().length())); + res.response(new IgfsInputStreamDescriptor(streamId, igfsIn.length())); break; } @@ -514,7 +515,7 @@ class IgfsIpcHandler implements IgfsServerHandler { long pos = req.position(); int size = req.length(); - IgfsInputStreamAdapter igfsIn = (IgfsInputStreamAdapter)resource(ses, rsrcId); + IgfsInputStreamImpl igfsIn = (IgfsInputStreamImpl)resource(ses, rsrcId); if (igfsIn == null) throw new IgniteCheckedException("Input stream not found (already closed?): " + rsrcId); http://git-wip-us.apache.org/repos/asf/ignite/blob/16c5a715/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java index 453682c..526e60d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java @@ -86,7 +86,7 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystemV2 { /** {@inheritDoc} */ @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) throws IgniteException { - return igfs.open(path, bufSize); + return (IgfsSecondaryFileSystemPositionedReadable)igfs.open(path, bufSize); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/16c5a715/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsSelfTest.java index 38cfc00..8d742fb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsSelfTest.java @@ -116,6 +116,7 @@ public class IgfsMetricsSelfTest extends IgfsCommonAbstractTest { * @return Configuration. * @throws Exception If failed. */ + @SuppressWarnings("unchecked") private IgniteConfiguration primaryConfiguration(int idx) throws Exception { FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); @@ -172,6 +173,7 @@ public class IgfsMetricsSelfTest extends IgfsCommonAbstractTest { * * @throws Exception If failed. */ + @SuppressWarnings("unchecked") private void startSecondary() throws Exception { FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); @@ -384,6 +386,7 @@ public class IgfsMetricsSelfTest extends IgfsCommonAbstractTest { * * @throws Exception If failed. */ + @SuppressWarnings({"ResultOfMethodCallIgnored", "ConstantConditions"}) public void testBlockMetrics() throws Exception { IgfsEx igfs = (IgfsEx)igfsPrimary[0]; @@ -424,7 +427,7 @@ public class IgfsMetricsSelfTest extends IgfsCommonAbstractTest { checkBlockMetrics(initMetrics, igfs.metrics(), 0, 0, 0, 3, 0, blockSize * 3); // Read data from the first file. - IgfsInputStreamAdapter is = igfs.open(file1); + IgfsInputStream is = igfs.open(file1); is.readFully(0, new byte[blockSize * 2]); is.close(); @@ -432,7 +435,7 @@ public class IgfsMetricsSelfTest extends IgfsCommonAbstractTest { // Read data from the second file with hits. is = igfs.open(file2); - is.readChunks(0, blockSize); + is.read(new byte[blockSize]); is.close(); checkBlockMetrics(initMetrics, igfs.metrics(), 3, 0, blockSize * 3, 3, 0, blockSize * 3); @@ -449,7 +452,7 @@ public class IgfsMetricsSelfTest extends IgfsCommonAbstractTest { // Read remote file. is = igfs.open(fileRemote); - is.readChunks(0, rmtBlockSize); + is.read(new byte[rmtBlockSize]); is.close(); checkBlockMetrics(initMetrics, igfs.metrics(), 4, 1, blockSize * 3 + rmtBlockSize, 3, 0, blockSize * 3); @@ -459,7 +462,7 @@ public class IgfsMetricsSelfTest extends IgfsCommonAbstractTest { // Read remote file again. is = igfs.open(fileRemote); - is.readChunks(0, rmtBlockSize); + is.read(new byte[rmtBlockSize]); is.close(); checkBlockMetrics(initMetrics, igfs.metrics(), 5, 1, blockSize * 3 + rmtBlockSize * 2, 3, 0, blockSize * 3); @@ -495,16 +498,6 @@ public class IgfsMetricsSelfTest extends IgfsCommonAbstractTest { checkBlockMetrics(initMetrics, igfs.metrics(), 5, 1, blockSize * 3 + rmtBlockSize * 2, 5, 1, blockSize * 7 / 2 + rmtBlockSize); - // Now read partial block. - // Read remote file again. - is = igfs.open(file1); - is.seek(blockSize * 2); - is.readChunks(0, blockSize / 2); - is.close(); - - checkBlockMetrics(initMetrics, igfs.metrics(), 6, 1, blockSize * 7 / 2 + rmtBlockSize * 2, 5, 1, - blockSize * 7 / 2 + rmtBlockSize); - igfs.resetMetrics(); metrics = igfs.metrics(); http://git-wip-us.apache.org/repos/asf/ignite/blob/16c5a715/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java index 0138907..2b989c5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java @@ -23,6 +23,7 @@ import org.apache.ignite.IgniteFileSystem; import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsInputStream; import org.apache.ignite.igfs.IgfsMetrics; import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsOutputStream; @@ -75,21 +76,22 @@ public class IgfsMock implements IgfsEx { } /** {@inheritDoc} */ - @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize, int seqReadsBeforePrefetch) throws IgniteException { + @Override public IgfsInputStream open(IgfsPath path, int bufSize, int seqReadsBeforePrefetch) + throws IgniteException { throwUnsupported(); return null; } /** {@inheritDoc} */ - @Override public IgfsInputStreamAdapter open(IgfsPath path) throws IgniteException { + @Override public IgfsInputStream open(IgfsPath path) throws IgniteException { throwUnsupported(); return null; } /** {@inheritDoc} */ - @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) throws IgniteException { + @Override public IgfsInputStream open(IgfsPath path, int bufSize) throws IgniteException { throwUnsupported(); return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/16c5a715/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java index f426243..3220538 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java @@ -28,6 +28,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsInputStream; import org.apache.ignite.igfs.IgfsOutputStream; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.igfs.IgfsPathSummary; @@ -35,7 +36,6 @@ import org.apache.ignite.igfs.IgfsUserContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.igfs.IgfsEx; import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse; -import org.apache.ignite.internal.processors.igfs.IgfsInputStreamAdapter; import org.apache.ignite.internal.processors.igfs.IgfsStatus; import org.apache.ignite.internal.processors.igfs.IgfsUtils; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -316,9 +316,9 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { try { return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() { @Override public HadoopIgfsStreamDelegate apply() { - IgfsInputStreamAdapter stream = igfs.open(path, bufSize); + IgfsInputStream stream = igfs.open(path, bufSize); - return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length()); + return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.length()); } }); } @@ -336,9 +336,9 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { try { return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() { @Override public HadoopIgfsStreamDelegate apply() { - IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch); + IgfsInputStream stream = igfs.open(path, bufSize, seqReadsBeforePrefetch); - return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length()); + return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.length()); } }); } @@ -394,7 +394,7 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { /** {@inheritDoc} */ @Override public IgniteInternalFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, long pos, int len, @Nullable byte[] outBuf, int outOff, int outLen) { - IgfsInputStreamAdapter stream = delegate.target(); + IgfsInputStream stream = delegate.target(); try { byte[] res = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/16c5a715/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java index 8dc2717..7ee318a 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java @@ -34,11 +34,11 @@ import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.Ignition; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter; +import org.apache.ignite.igfs.IgfsInputStream; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker; import org.apache.ignite.internal.processors.igfs.IgfsEx; -import org.apache.ignite.internal.processors.igfs.IgfsInputStreamAdapter; import org.apache.ignite.internal.processors.resource.GridSpringResourceContext; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; @@ -398,7 +398,7 @@ public class HadoopCommandLineTest extends GridCommonAbstractTest { "location '/result' as " + qry )); - IgfsInputStreamAdapter in = igfs.open(new IgfsPath("/result/000000_0")); + IgfsInputStream in = igfs.open(new IgfsPath("/result/000000_0")); byte[] buf = new byte[(int) in.length()];