This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new b55107f HDDS-5550. EC: Adapt KeyInputStream to read EC Blocks (#2653)
b55107f is described below
commit b55107fe76db6fe889ded42dcda19fe853044ce1
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Wed Sep 22 15:51:35 2021 +0100
HDDS-5550. EC: Adapt KeyInputStream to read EC Blocks (#2653)
---
.../hdds/scm/storage/BlockExtendedInputStream.java | 18 +--
.../hadoop/hdds/scm/storage/BlockInputStream.java | 53 +-------
.../hdds/scm/storage/ExtendedInputStream.java | 91 ++++++++++++++
.../ozone/client/io/BlockInputStreamFactory.java | 55 +++++++++
.../client/io/BlockInputStreamFactoryImpl.java | 70 +++++++++++
.../client/io/BlockInputStreamProviderImpl.java | 51 --------
.../hadoop/ozone/client/io/ECBlockInputStream.java | 137 ++++++++++++---------
.../hadoop/ozone/client/io/KeyInputStream.java | 60 +++++----
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 10 +-
.../hadoop/ozone/client/TestOzoneECClient.java | 54 ++++----
.../ozone/client/rpc/TestECKeyOutputStream.java | 10 ++
.../rpc/read/TestBlockInputStreamFactoryImpl.java | 111 +++++++++++++++++
.../client/rpc/read/TestChunkInputStream.java | 7 +-
.../client/rpc/read/TestECBlockInputStream.java | 124 ++++++++++++-------
.../ozone/client/rpc/read/TestInputStreamBase.java | 11 +-
15 files changed, 582 insertions(+), 280 deletions(-)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamProvider.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java
similarity index 62%
rename from
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamProvider.java
rename to
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java
index d1cc74a..5be2b07 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamProvider.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockExtendedInputStream.java
@@ -15,20 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.ozone.client.io;
+package org.apache.hadoop.hdds.scm.storage;
import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
-import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
-import org.apache.hadoop.security.token.Token;
/**
- * Interface used to create BlockInputStreams.
+ * Abstract class used as an interface for input streams related to Ozone
+ * blocks.
*/
-public interface BlockInputStreamProvider {
+public abstract class BlockExtendedInputStream extends ExtendedInputStream {
- BlockInputStream provide(BlockID blockId, long blockLen, Pipeline pipeline,
- Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum);
+ public abstract BlockID getBlockID();
+
+ public abstract long getRemaining();
+
+ public abstract long getLength();
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index fea14bf..643ab0a 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hdds.scm.storage;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -29,9 +28,6 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.ByteBufferReadable;
-import org.apache.hadoop.fs.CanUnbuffer;
-import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
@@ -59,14 +55,11 @@ import org.slf4j.LoggerFactory;
* This class encapsulates all state management for iterating
* through the sequence of chunks through {@link ChunkInputStream}.
*/
-public class BlockInputStream extends InputStream
- implements Seekable, CanUnbuffer, ByteBufferReadable {
+public class BlockInputStream extends BlockExtendedInputStream {
private static final Logger LOG =
LoggerFactory.getLogger(BlockInputStream.class);
- private static final int EOF = -1;
-
private final BlockID blockID;
private final long length;
private Pipeline pipeline;
@@ -250,46 +243,14 @@ public class BlockInputStream extends InputStream
xceiverClientFactory, () -> pipeline, verifyChecksum, token);
}
+ @Override
public synchronized long getRemaining() {
return length - getPos();
}
- /**
- * {@inheritDoc}
- */
- @Override
- public synchronized int read() throws IOException {
- byte[] buf = new byte[1];
- if (read(buf, 0, 1) == EOF) {
- return EOF;
- }
- return Byte.toUnsignedInt(buf[0]);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public synchronized int read(byte[] b, int off, int len) throws IOException {
- ByteReaderStrategy strategy = new ByteArrayReader(b, off, len);
- if (len == 0) {
- return 0;
- }
- return readWithStrategy(strategy);
- }
-
@Override
- public synchronized int read(ByteBuffer byteBuffer) throws IOException {
- ByteReaderStrategy strategy = new ByteBufferReader(byteBuffer);
- int len = strategy.getTargetLength();
- if (len == 0) {
- return 0;
- }
- return readWithStrategy(strategy);
- }
-
- synchronized int readWithStrategy(ByteReaderStrategy strategy) throws
- IOException {
+ protected synchronized int readWithStrategy(ByteReaderStrategy strategy)
+ throws IOException {
Preconditions.checkArgument(strategy != null);
if (!initialized) {
initialize();
@@ -448,10 +409,6 @@ public class BlockInputStream extends InputStream
}
}
- public synchronized void resetPosition() {
- this.blockPosition = 0;
- }
-
/**
* Checks if the stream is open. If not, throw an exception.
*
@@ -463,10 +420,12 @@ public class BlockInputStream extends InputStream
}
}
+ @Override
public BlockID getBlockID() {
return blockID;
}
+ @Override
public long getLength() {
return length;
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ExtendedInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ExtendedInputStream.java
new file mode 100644
index 0000000..d09afe1
--- /dev/null
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ExtendedInputStream.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.storage;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanUnbuffer;
+import org.apache.hadoop.fs.Seekable;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Abstact class which extends InputStream and some common interfaces used by
+ * various Ozone InputStream classes.
+ */
+public abstract class ExtendedInputStream extends InputStream
+ implements Seekable, CanUnbuffer, ByteBufferReadable {
+
+ protected static final int EOF = -1;
+
+ @Override
+ public synchronized int read() throws IOException {
+ byte[] buf = new byte[1];
+ if (read(buf, 0, 1) == EOF) {
+ return EOF;
+ }
+ return Byte.toUnsignedInt(buf[0]);
+ }
+
+ @Override
+ public synchronized int read(byte[] b, int off, int len) throws IOException {
+ ByteReaderStrategy strategy = new ByteArrayReader(b, off, len);
+ int bufferLen = strategy.getTargetLength();
+ if (bufferLen == 0) {
+ return 0;
+ }
+ return readWithStrategy(strategy);
+ }
+
+ @Override
+ public synchronized int read(ByteBuffer byteBuffer) throws IOException {
+ ByteReaderStrategy strategy = new ByteBufferReader(byteBuffer);
+ int bufferLen = strategy.getTargetLength();
+ if (bufferLen == 0) {
+ return 0;
+ }
+ return readWithStrategy(strategy);
+ }
+
+ /**
+ * This must be overridden by the extending classes to call read on the
+ * underlying stream they are reading from. The last stream in the chain (the
+ * one which provides the actual data) needs to provide a real read via the
+ * read methods. For example if a test is extending this class, then it will
+ * need to override both read methods above and provide a dummy
+ * readWithStrategy implementation, as it will never be called by the tests.
+ *
+ * @param strategy
+ * @return
+ * @throws IOException
+ */
+ protected abstract int readWithStrategy(ByteReaderStrategy strategy) throws
+ IOException;
+
+ @Override
+ public synchronized void seek(long l) throws IOException {
+ throw new NotImplementedException("Seek is not implements for EC");
+ }
+
+ @Override
+ public synchronized boolean seekToNewSource(long l) throws IOException {
+ return false;
+ }
+}
\ No newline at end of file
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
new file mode 100644
index 0000000..d1bf7f3
--- /dev/null
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+
+import java.util.function.Function;
+
+/**
+ * Interface used by classes which need to obtain BlockStream instances.
+ */
+public interface BlockInputStreamFactory {
+
+ /**
+ * Create a new BlockInputStream based on the replication Config. If the
+ * replication Config indicates the block is EC, then it will create an
+ * ECBlockInputStream, otherwise a BlockInputStream will be returned.
+ * @param repConfig The replication Config
+ * @param blockInfo The blockInfo representing the block.
+ * @param pipeline The pipeline to be used for reading the block
+ * @param token The block Access Token
+ * @param verifyChecksum Whether to verify checksums or not.
+ * @param xceiverFactory Factory to create the xceiver in the client
+ * @param refreshFunction Function to refresh the pipeline if needed
+ * @return BlockExtendedInputStream of the correct type.
+ */
+ BlockExtendedInputStream create(ReplicationConfig repConfig,
+ OmKeyLocationInfo blockInfo, Pipeline pipeline,
+ Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+ XceiverClientFactory xceiverFactory,
+ Function<BlockID, Pipeline> refreshFunction);
+
+}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
new file mode 100644
index 0000000..1d372a7
--- /dev/null
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+
+import java.util.function.Function;
+
+/**
+ * Factory class to create various BlockStream instances.
+ */
+public class BlockInputStreamFactoryImpl implements BlockInputStreamFactory {
+
+ public static BlockInputStreamFactory getInstance() {
+ return new BlockInputStreamFactoryImpl();
+ }
+
+ /**
+ * Create a new BlockInputStream based on the replication Config. If the
+ * replication Config indicates the block is EC, then it will create an
+ * ECBlockInputStream, otherwise a BlockInputStream will be returned.
+ * @param repConfig The replication Config
+ * @param blockInfo The blockInfo representing the block.
+ * @param pipeline The pipeline to be used for reading the block
+ * @param token The block Access Token
+ * @param verifyChecksum Whether to verify checksums or not.
+ * @param xceiverFactory Factory to create the xceiver in the client
+ * @param refreshFunction Function to refresh the pipeline if needed
+ * @return BlockExtendedInputStream of the correct type.
+ */
+ public BlockExtendedInputStream create(ReplicationConfig repConfig,
+ OmKeyLocationInfo blockInfo, Pipeline pipeline,
+ Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+ XceiverClientFactory xceiverFactory,
+ Function<BlockID, Pipeline> refreshFunction) {
+ if (repConfig.getReplicationType().equals(HddsProtos.ReplicationType.EC)) {
+ return new ECBlockInputStream((ECReplicationConfig)repConfig, blockInfo,
+ verifyChecksum, xceiverFactory, refreshFunction, this);
+ } else {
+ return new BlockInputStream(blockInfo.getBlockID(),
blockInfo.getLength(),
+ pipeline, token, verifyChecksum, xceiverFactory, refreshFunction);
+ }
+ }
+
+}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamProviderImpl.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamProviderImpl.java
deleted file mode 100644
index 507dadc..0000000
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamProviderImpl.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.client.io;
-
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
-import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
-import org.apache.hadoop.security.token.Token;
-
-import java.util.function.Function;
-
-/**
- * Concrete implementation of a BlockInputStreamProvider to create
- * BlockInputStreams in a real cluster.
- */
-public class BlockInputStreamProviderImpl implements BlockInputStreamProvider {
-
- private final XceiverClientFactory xceiverClientFactory;
- private final Function<BlockID, Pipeline> refreshFunction;
-
- public BlockInputStreamProviderImpl(XceiverClientFactory xceiverFactory,
- Function<BlockID, Pipeline> refreshFunction) {
- this.xceiverClientFactory = xceiverFactory;
- this.refreshFunction = refreshFunction;
- }
-
- @Override
- public BlockInputStream provide(BlockID blockId, long blockLen,
- Pipeline pipeline, Token<OzoneBlockTokenIdentifier> token,
- boolean verifyChecksum) {
- return new BlockInputStream(blockId, blockLen, pipeline, token,
- verifyChecksum, xceiverClientFactory, refreshFunction);
- }
-}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index 9c9d0a6..f919528 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -19,60 +19,77 @@ package org.apache.hadoop.ozone.client.io;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.NotImplementedException;
-import org.apache.hadoop.fs.ByteBufferReadable;
-import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
-import org.apache.hadoop.hdds.scm.storage.ByteArrayReader;
-import org.apache.hadoop.hdds.scm.storage.ByteBufferReader;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.function.Function;
/**
* Class to read data from an EC Block Group.
*/
-public class ECBlockInputStream extends InputStream
- implements Seekable, CanUnbuffer, ByteBufferReadable {
+public class ECBlockInputStream extends BlockExtendedInputStream {
- private static final int EOF = -1;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ECBlockInputStream.class);
private final ECReplicationConfig repConfig;
+ // TODO - HDDS-5741 - remove hardcoded value
+ private static final int HARDCODED_CHUNK_SIZE = 1024;
private final int ecChunkSize;
- private final BlockInputStreamProvider streamProvider;
+ private final BlockInputStreamFactory streamFactory;
private final boolean verifyChecksum;
+ private final XceiverClientFactory xceiverClientFactory;
+ private final Function<BlockID, Pipeline> refreshFunction;
private final OmKeyLocationInfo blockInfo;
private final DatanodeDetails[] dataLocations;
private final DatanodeDetails[] parityLocations;
- private final BlockInputStream[] blockStreams;
+ private final BlockExtendedInputStream[] blockStreams;
private final int maxLocations;
- private int position = 0;
+ private long position = 0;
private boolean closed = false;
+ public ECBlockInputStream(ECReplicationConfig repConfig,
+ OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+ XceiverClientFactory xceiverClientFactory, Function<BlockID,
+ Pipeline> refreshFunction, BlockInputStreamFactory streamFactory) {
+
+ this(repConfig, HARDCODED_CHUNK_SIZE, blockInfo, verifyChecksum,
+ xceiverClientFactory, refreshFunction, streamFactory);
+ }
+
+ // TODO - HDDS-5741 - remove this constructor - ecChunkSize should not be
+ // there
public ECBlockInputStream(ECReplicationConfig repConfig, int ecChunkSize,
OmKeyLocationInfo blockInfo, boolean verifyChecksum,
- BlockInputStreamProvider streamProvider) {
+ XceiverClientFactory xceiverClientFactory, Function<BlockID,
+ Pipeline> refreshFunction, BlockInputStreamFactory streamFactory) {
this.repConfig = repConfig;
+ // TODO - HDDS-5741
this.ecChunkSize = ecChunkSize;
this.verifyChecksum = verifyChecksum;
this.blockInfo = blockInfo;
- this.streamProvider = streamProvider;
+ this.streamFactory = streamFactory;
+ this.xceiverClientFactory = xceiverClientFactory;
+ this.refreshFunction = refreshFunction;
this.maxLocations = repConfig.getData() + repConfig.getParity();
this.dataLocations = new DatanodeDetails[repConfig.getData()];
this.parityLocations = new DatanodeDetails[repConfig.getParity()];
- this.blockStreams = new BlockInputStream[repConfig.getData()];
+ this.blockStreams = new BlockExtendedInputStream[repConfig.getData()];
setBlockLocations(this.blockInfo.getPipeline());
}
@@ -105,7 +122,7 @@ public class ECBlockInputStream extends InputStream
* @return
*/
private int currentStreamIndex() {
- return ((position / ecChunkSize) % repConfig.getData());
+ return (int)((position / ecChunkSize) % repConfig.getData());
}
/**
@@ -114,9 +131,9 @@ public class ECBlockInputStream extends InputStream
* stream if it has not been opened already.
* @return BlockInput stream to read from.
*/
- private BlockInputStream getOrOpenStream() {
+ private BlockExtendedInputStream getOrOpenStream() {
int ind = currentStreamIndex();
- BlockInputStream stream = blockStreams[ind];
+ BlockExtendedInputStream stream = blockStreams[ind];
if (stream == null) {
// To read an EC block, we create a STANDALONE pipeline that contains the
// single location for the block index we want to read. The EC blocks are
@@ -130,9 +147,18 @@ public class ECBlockInputStream extends InputStream
.setState(Pipeline.PipelineState.CLOSED)
.build();
- stream = streamProvider.provide(blockInfo.getBlockID(),
- internalBlockLength(ind+1), pipeline, blockInfo.getToken(),
- verifyChecksum);
+ OmKeyLocationInfo blkInfo = new OmKeyLocationInfo.Builder()
+ .setBlockID(blockInfo.getBlockID())
+ .setLength(internalBlockLength(ind+1))
+ .setPipeline(blockInfo.getPipeline())
+ .setToken(blockInfo.getToken())
+ .setPartNumber(blockInfo.getPartNumber())
+ .build();
+ stream = streamFactory.create(
+ new StandaloneReplicationConfig(HddsProtos.ReplicationFactor.ONE),
+ blkInfo, pipeline,
+ blockInfo.getToken(), verifyChecksum, xceiverClientFactory,
+ refreshFunction);
blockStreams[ind] = stream;
}
return stream;
@@ -188,35 +214,6 @@ public class ECBlockInputStream extends InputStream
return blockLength() - position;
}
- @Override
- public synchronized int read() throws IOException {
- byte[] buf = new byte[1];
- if (read(buf, 0, 1) == EOF) {
- return EOF;
- }
- return Byte.toUnsignedInt(buf[0]);
- }
-
- @Override
- public synchronized int read(byte[] b, int off, int len) throws IOException {
- ByteReaderStrategy strategy = new ByteArrayReader(b, off, len);
- int bufferLen = strategy.getTargetLength();
- if (bufferLen == 0) {
- return 0;
- }
- return readWithStrategy(strategy);
- }
-
- @Override
- public synchronized int read(ByteBuffer byteBuffer) throws IOException {
- ByteReaderStrategy strategy = new ByteBufferReader(byteBuffer);
- int bufferLen = strategy.getTargetLength();
- if (bufferLen == 0) {
- return 0;
- }
- return readWithStrategy(strategy);
- }
-
/**
* Read from the internal BlockInputStreams one EC cell at a time into the
* strategy buffer. This call may read from several internal
BlockInputStreams
@@ -225,8 +222,9 @@ public class ECBlockInputStream extends InputStream
* @return
* @throws IOException
*/
- private synchronized int readWithStrategy(ByteReaderStrategy strategy) throws
- IOException {
+ @Override
+ protected synchronized int readWithStrategy(ByteReaderStrategy strategy)
+ throws IOException {
Preconditions.checkArgument(strategy != null);
checkOpen();
@@ -236,7 +234,7 @@ public class ECBlockInputStream extends InputStream
int totalRead = 0;
while(strategy.getTargetLength() > 0 && remaining() > 0) {
- BlockInputStream stream = getOrOpenStream();
+ BlockExtendedInputStream stream = getOrOpenStream();
int read = readFromStream(stream, strategy);
totalRead += read;
position += read;
@@ -244,6 +242,21 @@ public class ECBlockInputStream extends InputStream
return totalRead;
}
+ @Override
+ public synchronized long getRemaining() {
+ return blockInfo.getLength() - position;
+ }
+
+ @Override
+ public synchronized long getLength() {
+ return blockInfo.getLength();
+ }
+
+ @Override
+ public BlockID getBlockID() {
+ return blockInfo.getBlockID();
+ }
+
/**
* Read the most allowable amount of data from the current stream. This
* ensures we don't read past the end of an EC cell or the overall block
@@ -253,7 +266,7 @@ public class ECBlockInputStream extends InputStream
* @return
* @throws IOException
*/
- private int readFromStream(BlockInputStream stream,
+ private int readFromStream(BlockExtendedInputStream stream,
ByteReaderStrategy strategy)
throws IOException {
// Number of bytes left to read from this streams EC cell.
@@ -288,9 +301,13 @@ public class ECBlockInputStream extends InputStream
@Override
public synchronized void close() {
- for (BlockInputStream stream : blockStreams) {
+ for (BlockExtendedInputStream stream : blockStreams) {
if (stream != null) {
- stream.close();
+ try {
+ stream.close();
+ } catch (IOException e) {
+ LOG.error("Failed to close stream {}", stream, e);
+ }
}
}
closed = true;
@@ -298,7 +315,7 @@ public class ECBlockInputStream extends InputStream
@Override
public synchronized void unbuffer() {
- for (BlockInputStream stream : blockStreams) {
+ for (BlockExtendedInputStream stream : blockStreams) {
if (stream != null) {
stream.unbuffer();
}
@@ -311,7 +328,7 @@ public class ECBlockInputStream extends InputStream
}
@Override
- public synchronized long getPos() throws IOException {
+ public synchronized long getPos() {
return position;
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
index e84e39a..0d0b167 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.ozone.client.io;
import java.io.EOFException;
import java.io.IOException;
-import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -30,17 +29,17 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.ByteBufferReadable;
-import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
import org.apache.hadoop.hdds.scm.storage.ByteArrayReader;
import org.apache.hadoop.hdds.scm.storage.ByteBufferReader;
import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.hdds.scm.storage.ExtendedInputStream;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -52,8 +51,7 @@ import org.slf4j.LoggerFactory;
/**
* Maintaining a list of BlockInputStream. Read based on offset.
*/
-public class KeyInputStream extends InputStream
- implements Seekable, CanUnbuffer, ByteBufferReadable {
+public class KeyInputStream extends ExtendedInputStream {
private static final Logger LOG =
LoggerFactory.getLogger(KeyInputStream.class);
@@ -65,7 +63,7 @@ public class KeyInputStream extends InputStream
private boolean closed = false;
// List of BlockInputStreams, one for each block in the key
- private final List<BlockInputStream> blockStreams;
+ private final List<BlockExtendedInputStream> blockStreams;
// blockOffsets[i] stores the index of the first data byte in
// blockStream w.r.t the key data.
@@ -93,20 +91,23 @@ public class KeyInputStream extends InputStream
*/
public static LengthInputStream getFromOmKeyInfo(OmKeyInfo keyInfo,
XceiverClientFactory xceiverClientFactory,
- boolean verifyChecksum, Function<OmKeyInfo, OmKeyInfo> retryFunction) {
+ boolean verifyChecksum, Function<OmKeyInfo, OmKeyInfo> retryFunction,
+ BlockInputStreamFactory blockStreamFactory) {
List<OmKeyLocationInfo> keyLocationInfos = keyInfo
.getLatestVersionLocations().getBlocksLatestVersionOnly();
KeyInputStream keyInputStream = new KeyInputStream();
keyInputStream.initialize(keyInfo, keyLocationInfos,
- xceiverClientFactory, verifyChecksum, retryFunction);
+ xceiverClientFactory, verifyChecksum, retryFunction,
+ blockStreamFactory);
return new LengthInputStream(keyInputStream, keyInputStream.length);
}
public static List<LengthInputStream> getStreamsFromKeyInfo(OmKeyInfo
keyInfo,
XceiverClientFactory xceiverClientFactory, boolean verifyChecksum,
- Function<OmKeyInfo, OmKeyInfo> retryFunction) {
+ Function<OmKeyInfo, OmKeyInfo> retryFunction,
+ BlockInputStreamFactory blockStreamFactory) {
List<OmKeyLocationInfo> keyLocationInfos = keyInfo
.getLatestVersionLocations().getBlocksLatestVersionOnly();
@@ -137,7 +138,8 @@ public class KeyInputStream extends InputStream
partsToBlocksMap.entrySet()) {
KeyInputStream keyInputStream = new KeyInputStream();
keyInputStream.initialize(keyInfo, entry.getValue(),
- xceiverClientFactory, verifyChecksum, retryFunction);
+ xceiverClientFactory, verifyChecksum, retryFunction,
+ blockStreamFactory);
lengthInputStreams.add(new LengthInputStream(keyInputStream,
partsLengthMap.get(entry.getKey())));
}
@@ -148,7 +150,8 @@ public class KeyInputStream extends InputStream
private synchronized void initialize(OmKeyInfo keyInfo,
List<OmKeyLocationInfo> blockInfos,
XceiverClientFactory xceiverClientFactory,
- boolean verifyChecksum, Function<OmKeyInfo, OmKeyInfo> retryFunction) {
+ boolean verifyChecksum, Function<OmKeyInfo, OmKeyInfo> retryFunction,
+ BlockInputStreamFactory blockStreamFactory) {
this.key = keyInfo.getKeyName();
this.blockOffsets = new long[blockInfos.size()];
long keyLength = 0;
@@ -161,7 +164,8 @@ public class KeyInputStream extends InputStream
// We also pass in functional reference which is used to refresh the
// pipeline info for a given OM Key location info.
- addStream(omKeyLocationInfo, xceiverClientFactory,
+ addStream(keyInfo.getReplicationConfig(), omKeyLocationInfo,
+ xceiverClientFactory,
verifyChecksum, keyLocationInfo -> {
OmKeyInfo newKeyInfo = retryFunction.apply(keyInfo);
BlockID blockID = keyLocationInfo.getBlockID();
@@ -176,7 +180,7 @@ public class KeyInputStream extends InputStream
} else {
return null;
}
- });
+ }, blockStreamFactory);
this.blockOffsets[i] = keyLength;
keyLength += omKeyLocationInfo.getLength();
@@ -190,12 +194,13 @@ public class KeyInputStream extends InputStream
* BlockInputStream is initialized when a read operation is performed on
* the block for the first time.
*/
- private synchronized void addStream(OmKeyLocationInfo blockInfo,
- XceiverClientFactory xceiverClientFactory,
- boolean verifyChecksum,
- Function<OmKeyLocationInfo, Pipeline> refreshPipelineFunction) {
- blockStreams.add(new BlockInputStream(blockInfo.getBlockID(),
- blockInfo.getLength(), blockInfo.getPipeline(), blockInfo.getToken(),
+ private synchronized void addStream(ReplicationConfig repConfig,
+ OmKeyLocationInfo blockInfo,
+ XceiverClientFactory xceiverClientFactory, boolean verifyChecksum,
+ Function<OmKeyLocationInfo, Pipeline> refreshPipelineFunction,
+ BlockInputStreamFactory blockStreamFactory) {
+ blockStreams.add(blockStreamFactory.create(repConfig, blockInfo,
+ blockInfo.getPipeline(), blockInfo.getToken(),
verifyChecksum, xceiverClientFactory,
blockID -> refreshPipelineFunction.apply(blockInfo)));
}
@@ -240,8 +245,9 @@ public class KeyInputStream extends InputStream
return readWithStrategy(strategy);
}
- synchronized int readWithStrategy(ByteReaderStrategy strategy) throws
- IOException {
+ @Override
+ protected synchronized int readWithStrategy(ByteReaderStrategy strategy)
+ throws IOException {
Preconditions.checkArgument(strategy != null);
checkOpen();
@@ -257,7 +263,7 @@ public class KeyInputStream extends InputStream
}
// Get the current blockStream and read data from it
- BlockInputStream current = blockStreams.get(blockIndex);
+ BlockExtendedInputStream current = blockStreams.get(blockIndex);
int numBytesToRead = Math.min(buffLen, (int)current.getRemaining());
int numBytesRead = strategy.readFromBlock(current, numBytesToRead);
if (numBytesRead != numBytesToRead) {
@@ -326,7 +332,7 @@ public class KeyInputStream extends InputStream
}
// Reset the previous blockStream's position
- blockStreams.get(blockIndexOfPrevPosition).resetPosition();
+ blockStreams.get(blockIndexOfPrevPosition).seek(0);
// Reset all the blockStreams above the blockIndex. We do this to reset
// any previous reads which might have updated the blockPosition and
@@ -360,7 +366,7 @@ public class KeyInputStream extends InputStream
@Override
public void close() throws IOException {
closed = true;
- for (BlockInputStream blockStream : blockStreams) {
+ for (ExtendedInputStream blockStream : blockStreams) {
blockStream.close();
}
}
@@ -400,13 +406,13 @@ public class KeyInputStream extends InputStream
@Override
public void unbuffer() {
- for (BlockInputStream is : blockStreams) {
+ for (ExtendedInputStream is : blockStreams) {
is.unbuffer();
}
}
@VisibleForTesting
- public List<BlockInputStream> getBlockStreams() {
+ public List<BlockExtendedInputStream> getBlockStreams() {
return blockStreams;
}
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 01fee21..0d79676 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -73,6 +73,7 @@ import
org.apache.hadoop.ozone.client.OzoneMultipartUploadList;
import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.VolumeArgs;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
import org.apache.hadoop.ozone.client.io.KeyInputStream;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
@@ -1347,7 +1348,8 @@ public class RpcClient implements ClientProtocol {
if (feInfo == null) {
LengthInputStream lengthInputStream = KeyInputStream
.getFromOmKeyInfo(keyInfo, xceiverClientManager,
- clientConfig.isChecksumVerify(), retryFunction);
+ clientConfig.isChecksumVerify(), retryFunction,
+ BlockInputStreamFactoryImpl.getInstance());
try {
Map< String, String > keyInfoMetadata = keyInfo.getMetadata();
if (Boolean.valueOf(keyInfoMetadata.get(OzoneConsts.GDPR_FLAG))) {
@@ -1367,7 +1369,8 @@ public class RpcClient implements ClientProtocol {
// Regular Key with FileEncryptionInfo
LengthInputStream lengthInputStream = KeyInputStream
.getFromOmKeyInfo(keyInfo, xceiverClientManager,
- clientConfig.isChecksumVerify(), retryFunction);
+ clientConfig.isChecksumVerify(), retryFunction,
+ BlockInputStreamFactoryImpl.getInstance());
final KeyProvider.KeyVersion decrypted = getDEK(feInfo);
final CryptoInputStream cryptoIn =
new CryptoInputStream(lengthInputStream.getWrappedStream(),
@@ -1378,7 +1381,8 @@ public class RpcClient implements ClientProtocol {
// Multipart Key with FileEncryptionInfo
List<LengthInputStream> lengthInputStreams = KeyInputStream
.getStreamsFromKeyInfo(keyInfo, xceiverClientManager,
- clientConfig.isChecksumVerify(), retryFunction);
+ clientConfig.isChecksumVerify(), retryFunction,
+ BlockInputStreamFactoryImpl.getInstance());
final KeyProvider.KeyVersion decrypted = getDEK(feInfo);
List<OzoneCryptoInputStream> cryptoInputStreams = new ArrayList<>();
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index fed29b4..cf5872c 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -184,29 +184,12 @@ public class TestOzoneECClient {
Assert.assertEquals(keyName, key.getName());
try (OzoneInputStream is = bucket.readKey(keyName)) {
byte[] fileContent = new byte[1024];
- Assert.assertEquals(inputChunks[0].length, is.read(fileContent));
- Assert.assertEquals(new String(inputChunks[0], UTF_8),
- new String(fileContent, UTF_8));
- }
-
- // Since EC read is not ready yet, let's use the regular read by
- // tweaking the pipeline.
- // Remove first node in EC pipeline. So, regular read will hit the
- // first node in pipeline and assert for second chunk in EC data.
- updatePipelineToKeepSingleNode(2);
- try (OzoneInputStream is = bucket.readKey(keyName)) {
- byte[] fileContent = new byte[1024];
- Assert.assertEquals(inputChunks[1].length, is.read(fileContent));
- Assert.assertEquals(new String(inputChunks[1], UTF_8),
- new String(fileContent, UTF_8));
- }
-
- updatePipelineToKeepSingleNode(3);
- try (OzoneInputStream is = bucket.readKey(keyName)) {
- byte[] fileContent = new byte[1024];
- Assert.assertEquals(inputChunks[2].length, is.read(fileContent));
- Assert.assertEquals(new String(inputChunks[2], UTF_8),
- new String(fileContent, UTF_8));
+ for (int i=0; i<dataBlocks; i++) {
+ Assert.assertEquals(inputChunks[i].length, is.read(fileContent));
+ Assert.assertTrue(Arrays.equals(inputChunks[i], fileContent));
+ }
+ // A further read should give EOF
+ Assert.assertEquals(-1, is.read(fileContent));
}
}
@@ -308,7 +291,10 @@ public class TestOzoneECClient {
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
- byte[] lastChunk = inputChunks[inputChunks.length - 1];
+ // Last chunk is one byte short of the others.
+ byte[] lastChunk =
+ Arrays.copyOf(inputChunks[inputChunks.length - 1],
+ inputChunks[inputChunks.length - 1].length - 1);
try (OzoneOutputStream out = bucket.createKey(keyName, 2000,
new ECReplicationConfig(dataBlocks, parityBlocks), new HashMap<>())) {
@@ -316,20 +302,22 @@ public class TestOzoneECClient {
out.write(inputChunks[i]);
}
- for (int i = 0; i < lastChunk.length - 1; i++) {
+ for (int i = 0; i < lastChunk.length; i++) {
out.write(lastChunk[i]);
}
}
- // Making sure to keep only the 3rd node in pipeline, so that 3rd chunk can
- // be read.
- updatePipelineToKeepSingleNode(3);
try (OzoneInputStream is = bucket.readKey(keyName)) {
- byte[] fileContent = new byte[1023];
- Assert.assertEquals(lastChunk.length - 1, is.read(fileContent));
- Assert.assertEquals(
- new String(Arrays.copyOf(lastChunk, lastChunk.length - 1), UTF_8),
- new String(fileContent, UTF_8));
+ byte[] fileContent = new byte[1024];
+ for (int i=0; i<2; i++) {
+ Assert.assertEquals(inputChunks[i].length, is.read(fileContent));
+ Assert.assertTrue(Arrays.equals(inputChunks[i], fileContent));
+ }
+ Assert.assertEquals(lastChunk.length, is.read(fileContent));
+ Assert.assertTrue(Arrays.equals(lastChunk,
+ Arrays.copyOf(fileContent, lastChunk.length)));
+ // A further read should give EOF
+ Assert.assertEquals(-1, is.read(fileContent));
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
index 28f7c2c..b8169ca 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.TestHelper;
import org.junit.AfterClass;
@@ -47,6 +48,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -163,6 +165,14 @@ public class TestECKeyOutputStream {
out.write(inputChunks[i]);
}
}
+ byte[] buf = new byte[chunkSize];
+ try (OzoneInputStream in = bucket.readKey(keyString)) {
+ for (int i=0; i< inputChunks.length; i++) {
+ int read = in.read(buf, 0, chunkSize);
+ Assert.assertEquals(chunkSize, read);
+ Assert.assertTrue(Arrays.equals(buf, inputChunks[i]));
+ }
+ }
}
@Test
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestBlockInputStreamFactoryImpl.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestBlockInputStreamFactoryImpl.java
new file mode 100644
index 0000000..84e1817
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestBlockInputStreamFactoryImpl.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.junit.Test;
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests for BlockInputStreamFactoryImpl.
+ */
+public class TestBlockInputStreamFactoryImpl {
+
+ @Test
+ public void testNonECGivesBlockInputStream() {
+ BlockInputStreamFactory factory = new BlockInputStreamFactoryImpl();
+ ReplicationConfig repConfig =
+ new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE);
+
+ OmKeyLocationInfo blockInfo = createKeyLocationInfo(repConfig, 3,
+ 1024 * 1024 * 10);
+
+ BlockExtendedInputStream stream =
+ factory.create(repConfig, blockInfo, blockInfo.getPipeline(),
+ blockInfo.getToken(), true, null, null);
+ Assert.assertTrue(stream instanceof BlockInputStream);
+ Assert.assertEquals(stream.getBlockID(), blockInfo.getBlockID());
+ Assert.assertEquals(stream.getLength(), blockInfo.getLength());
+ }
+
+ @Test
+ public void testECGivesECBlockInputStream() {
+ BlockInputStreamFactory factory = new BlockInputStreamFactoryImpl();
+ ReplicationConfig repConfig =
+ new ECReplicationConfig(3, 2);
+
+ OmKeyLocationInfo blockInfo = createKeyLocationInfo(repConfig, 5,
+ 1024*1024*10);
+
+ BlockExtendedInputStream stream =
+ factory.create(repConfig, blockInfo, blockInfo.getPipeline(),
+ blockInfo.getToken(), true, null, null);
+ Assert.assertTrue(stream instanceof ECBlockInputStream);
+ Assert.assertEquals(stream.getBlockID(), blockInfo.getBlockID());
+ Assert.assertEquals(stream.getLength(), blockInfo.getLength());
+ }
+
+ private OmKeyLocationInfo createKeyLocationInfo(ReplicationConfig repConf,
+ long blockLength, Map<DatanodeDetails, Integer> dnMap) {
+
+ Pipeline pipeline = Pipeline.newBuilder()
+ .setState(Pipeline.PipelineState.CLOSED)
+ .setId(PipelineID.randomId())
+ .setNodes(new ArrayList<>(dnMap.keySet()))
+ .setReplicaIndexes(dnMap)
+ .setReplicationConfig(repConf)
+ .build();
+
+ OmKeyLocationInfo keyInfo = new OmKeyLocationInfo.Builder()
+ .setBlockID(new BlockID(1, 1))
+ .setLength(blockLength)
+ .setOffset(0)
+ .setPipeline(pipeline)
+ .setPartNumber(0)
+ .build();
+ return keyInfo;
+ }
+
+ private OmKeyLocationInfo createKeyLocationInfo(ReplicationConfig repConf,
+ int nodeCount, long blockLength) {
+ Map<DatanodeDetails, Integer> datanodes = new HashMap<>();
+ for (int i = 0; i < nodeCount; i++) {
+ datanodes.put(MockDatanodeDetails.randomDatanodeDetails(), i + 1);
+ }
+ return createKeyLocationInfo(repConf, blockLength, datanodes);
+ }
+
+}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java
index fc051c8..6a77564 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.client.rpc.read;
import java.io.IOException;
import java.nio.ByteBuffer;
+
import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
import org.apache.hadoop.hdds.scm.storage.ChunkInputStream;
import org.apache.hadoop.ozone.client.io.KeyInputStream;
@@ -47,7 +48,8 @@ public class TestChunkInputStream extends TestInputStreamBase
{
KeyInputStream keyInputStream = getKeyInputStream(keyName);
- BlockInputStream block0Stream = keyInputStream.getBlockStreams().get(0);
+ BlockInputStream block0Stream =
+ (BlockInputStream)keyInputStream.getBlockStreams().get(0);
block0Stream.initialize();
ChunkInputStream chunk0Stream = block0Stream.getChunkStreams().get(0);
@@ -112,7 +114,8 @@ public class TestChunkInputStream extends
TestInputStreamBase {
try (KeyInputStream keyInputStream = getKeyInputStream(keyName)) {
- BlockInputStream block0Stream = keyInputStream.getBlockStreams().get(0);
+ BlockInputStream block0Stream =
+ (BlockInputStream)keyInputStream.getBlockStreams().get(0);
block0Stream.initialize();
ChunkInputStream chunk0Stream = block0Stream.getChunkStreams().get(0);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
index c5f9d0b..9790843 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
@@ -25,14 +25,16 @@ import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
-import org.apache.hadoop.ozone.client.io.BlockInputStreamProvider;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.security.token.Token;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
@@ -51,12 +53,12 @@ public class TestECBlockInputStream {
private static final int ONEMB = 1024 * 1024;
private ECReplicationConfig repConfig;
- private TestBlockInputStreamProvider streamProvider;
+ private TestBlockInputStreamFactory streamFactory;
@Before
public void setup() {
repConfig = new ECReplicationConfig(3, 2);
- streamProvider = new TestBlockInputStreamProvider();
+ streamFactory = new TestBlockInputStreamFactory();
}
@Test
@@ -65,14 +67,14 @@ public class TestECBlockInputStream {
// EC-3-2, 5MB block, so all 3 data locations are needed
OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
- keyInfo, true, new TestBlockInputStreamProvider())) {
+ keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
Assert.assertTrue(ecb.hasSufficientLocations());
}
// EC-3-2, very large block, so all 3 data locations are needed
keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
- keyInfo, true, new TestBlockInputStreamProvider())) {
+ keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
Assert.assertTrue(ecb.hasSufficientLocations());
}
@@ -82,7 +84,7 @@ public class TestECBlockInputStream {
dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
keyInfo = createKeyInfo(repConfig, ONEMB - 1, dnMap);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
- keyInfo, true, new TestBlockInputStreamProvider())) {
+ keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
Assert.assertTrue(ecb.hasSufficientLocations());
}
@@ -91,7 +93,7 @@ public class TestECBlockInputStream {
dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 2);
keyInfo = createKeyInfo(repConfig, ONEMB, dnMap);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
- keyInfo, true, new TestBlockInputStreamProvider())) {
+ keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
Assert.assertFalse(ecb.hasSufficientLocations());
}
@@ -101,7 +103,7 @@ public class TestECBlockInputStream {
dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
- keyInfo, true, new TestBlockInputStreamProvider())) {
+ keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
Assert.assertFalse(ecb.hasSufficientLocations());
}
@@ -113,7 +115,7 @@ public class TestECBlockInputStream {
dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
- keyInfo, true, new TestBlockInputStreamProvider())) {
+ keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
Assert.assertFalse(ecb.hasSufficientLocations());
}
}
@@ -125,11 +127,11 @@ public class TestECBlockInputStream {
OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, ONEMB - 100);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
- keyInfo, true, streamProvider)) {
+ keyInfo, true, null, null, streamFactory)) {
ecb.read(buf);
// We expect only 1 block stream and it should have a length passed of
// ONEMB - 100.
- List<TestBlockInputStream> streams = streamProvider.getBlockStreams();
+ List<TestBlockInputStream> streams = streamFactory.getBlockStreams();
Assert.assertEquals(ONEMB - 100, streams.get(0).getLength());
}
}
@@ -140,10 +142,10 @@ public class TestECBlockInputStream {
ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, ONEMB + 100);
- try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
- keyInfo, true, streamProvider)) {
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+ keyInfo, true, null, null, streamFactory)) {
ecb.read(buf);
- List<TestBlockInputStream> streams = streamProvider.getBlockStreams();
+ List<TestBlockInputStream> streams = streamFactory.getBlockStreams();
Assert.assertEquals(ONEMB, streams.get(0).getLength());
Assert.assertEquals(100, streams.get(1).getLength());
}
@@ -156,9 +158,9 @@ public class TestECBlockInputStream {
OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 2 * ONEMB + 100);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
- keyInfo, true, streamProvider)) {
+ keyInfo, true, null, null, streamFactory)) {
ecb.read(buf);
- List<TestBlockInputStream> streams = streamProvider.getBlockStreams();
+ List<TestBlockInputStream> streams = streamFactory.getBlockStreams();
Assert.assertEquals(ONEMB, streams.get(0).getLength());
Assert.assertEquals(ONEMB, streams.get(1).getLength());
Assert.assertEquals(100, streams.get(2).getLength());
@@ -172,9 +174,9 @@ public class TestECBlockInputStream {
OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 10 * ONEMB + 100);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
- keyInfo, true, streamProvider)) {
+ keyInfo, true, null, null, streamFactory)) {
ecb.read(buf);
- List<TestBlockInputStream> streams = streamProvider.getBlockStreams();
+ List<TestBlockInputStream> streams = streamFactory.getBlockStreams();
Assert.assertEquals(4 * ONEMB, streams.get(0).getLength());
Assert.assertEquals(3 * ONEMB + 100, streams.get(1).getLength());
Assert.assertEquals(3 * ONEMB, streams.get(2).getLength());
@@ -188,9 +190,9 @@ public class TestECBlockInputStream {
OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, ONEMB);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
- keyInfo, true, streamProvider)) {
+ keyInfo, true, null, null, streamFactory)) {
ecb.read(buf);
- List<TestBlockInputStream> streams = streamProvider.getBlockStreams();
+ List<TestBlockInputStream> streams = streamFactory.getBlockStreams();
Assert.assertEquals(ONEMB, streams.get(0).getLength());
}
}
@@ -202,9 +204,9 @@ public class TestECBlockInputStream {
OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 9 * ONEMB);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
- keyInfo, true, streamProvider)) {
+ keyInfo, true, null, null, streamFactory)) {
ecb.read(buf);
- List<TestBlockInputStream> streams = streamProvider.getBlockStreams();
+ List<TestBlockInputStream> streams = streamFactory.getBlockStreams();
Assert.assertEquals(3 * ONEMB, streams.get(0).getLength());
Assert.assertEquals(3 * ONEMB, streams.get(1).getLength());
Assert.assertEquals(3 * ONEMB, streams.get(2).getLength());
@@ -215,7 +217,7 @@ public class TestECBlockInputStream {
public void testSimpleRead() throws IOException {
OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
- keyInfo, true, streamProvider)) {
+ keyInfo, true, null, null, streamFactory)) {
ByteBuffer buf = ByteBuffer.allocate(100);
@@ -224,7 +226,7 @@ public class TestECBlockInputStream {
validateBufferContents(buf, 0, 100, (byte) 0);
Assert.assertEquals(100, ecb.getPos());
}
- for (TestBlockInputStream s : streamProvider.getBlockStreams()) {
+ for (TestBlockInputStream s : streamFactory.getBlockStreams()) {
Assert.assertTrue(s.isClosed());
}
}
@@ -233,7 +235,7 @@ public class TestECBlockInputStream {
public void testReadPastEOF() throws IOException {
OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 50);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
- keyInfo, true, streamProvider)) {
+ keyInfo, true, null, null, streamFactory)) {
ByteBuffer buf = ByteBuffer.allocate(100);
@@ -244,12 +246,14 @@ public class TestECBlockInputStream {
}
}
+ @Ignore("HDDS-5741")
+ // TODO - HDDS-5741 this test needs the RepConfig codec to be set correctly
@Test
public void testReadCrossingMultipleECChunkBounds() throws IOException {
// EC-3-2, 5MB block, so all 3 data locations are needed
OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
- try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, 100,
- keyInfo, true, streamProvider)) {
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+ keyInfo, true, null, null, streamFactory)) {
// EC Chunk size is 100 and 3-2. Create a byte buffer to read 3.5 chunks,
// so 350
@@ -272,7 +276,7 @@ public class TestECBlockInputStream {
validateBufferContents(buf, 250, 350, (byte) 0);
}
- for (TestBlockInputStream s : streamProvider.getBlockStreams()) {
+ for (TestBlockInputStream s : streamFactory.getBlockStreams()) {
Assert.assertTrue(s.isClosed());
}
}
@@ -315,8 +319,8 @@ public class TestECBlockInputStream {
return createKeyInfo(repConf, blockLength, datanodes);
}
- private static class TestBlockInputStreamProvider implements
- BlockInputStreamProvider {
+ private static class TestBlockInputStreamFactory implements
+ BlockInputStreamFactory {
private List<TestBlockInputStream> blockStreams = new ArrayList<>();
@@ -324,44 +328,61 @@ public class TestECBlockInputStream {
return blockStreams;
}
- @Override
- public BlockInputStream provide(BlockID blockId, long blockLen,
- Pipeline pipeline, Token<OzoneBlockTokenIdentifier> token,
- boolean verifyChecksum) {
- TestBlockInputStream stream = new TestBlockInputStream(blockId, blockLen,
- pipeline, token, verifyChecksum, null, null,
+ public BlockExtendedInputStream create(ReplicationConfig repConfig,
+ OmKeyLocationInfo blockInfo, Pipeline pipeline,
+ Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+ XceiverClientFactory xceiverFactory,
+ Function<BlockID, Pipeline> refreshFunction) {
+ TestBlockInputStream stream = new TestBlockInputStream(
+ blockInfo.getBlockID(), blockInfo.getLength(),
(byte)blockStreams.size());
blockStreams.add(stream);
return stream;
}
}
- private static class TestBlockInputStream extends BlockInputStream {
+ private static class TestBlockInputStream extends BlockExtendedInputStream {
private long position = 0;
private boolean closed = false;
private byte dataVal = 1;
+ private BlockID blockID;
+ private long length;
private static final byte EOF = -1;
@SuppressWarnings("checkstyle:parameternumber")
- TestBlockInputStream(BlockID blockId, long blockLen,
- Pipeline pipeline, Token<OzoneBlockTokenIdentifier> token,
- boolean verifyChecksum, XceiverClientFactory xceiverClientFactory,
- Function<BlockID, Pipeline> refreshPipelineFunction, byte dataVal) {
- super(blockId, blockLen, pipeline, token, verifyChecksum,
- xceiverClientFactory, refreshPipelineFunction);
+ TestBlockInputStream(BlockID blockId, long blockLen, byte dataVal) {
this.dataVal = dataVal;
+ this.blockID = blockId;
+ this.length = blockLen;
}
public boolean isClosed() {
return closed;
}
+ @Override
+ public BlockID getBlockID() {
+ return blockID;
+ }
+
+ @Override
+ public long getLength() {
+ return length;
+ }
+
+ @Override
public long getRemaining() {
return getLength() - position;
}
@Override
+ public int read(byte[] b, int off, int len)
+ throws IOException {
+ return read(ByteBuffer.wrap(b, off, len));
+ }
+
+ @Override
public int read(ByteBuffer buf) {
if (getRemaining() == 0) {
return EOF;
@@ -373,12 +394,27 @@ public class TestECBlockInputStream {
}
position += toRead;
return toRead;
+ };
+
+ @Override
+ protected int readWithStrategy(ByteReaderStrategy strategy) throws
+ IOException {
+ throw new IOException("Should not be called");
}
@Override
public void close() {
closed = true;
}
+
+ @Override
+ public void unbuffer() {
+ }
+
+ @Override
+ public long getPos() {
+ return 0;
+ }
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
index ccfd541..3b1ee58 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import
org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
import org.apache.hadoop.hdds.scm.storage.ChunkInputStream;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -219,19 +220,21 @@ public abstract class TestInputStreamBase {
// Verify BlockStreams and ChunkStreams
int expectedNumBlockStreams = BufferUtils.getNumberOfBins(
dataLength, BLOCK_SIZE);
- List<BlockInputStream> blockStreams = keyInputStream.getBlockStreams();
+ List<BlockExtendedInputStream> blockStreams =
+ keyInputStream.getBlockStreams();
Assert.assertEquals(expectedNumBlockStreams, blockStreams.size());
int readBlockLength = 0;
- for (BlockInputStream blockStream : blockStreams) {
+ for (BlockExtendedInputStream blockStream : blockStreams) {
int blockStreamLength = Math.min(BLOCK_SIZE,
dataLength - readBlockLength);
Assert.assertEquals(blockStreamLength, blockStream.getLength());
int expectedNumChunkStreams =
BufferUtils.getNumberOfBins(blockStreamLength, CHUNK_SIZE);
- blockStream.initialize();
- List<ChunkInputStream> chunkStreams = blockStream.getChunkStreams();
+ ((BlockInputStream)blockStream).initialize();
+ List<ChunkInputStream> chunkStreams =
+ ((BlockInputStream)blockStream).getChunkStreams();
Assert.assertEquals(expectedNumChunkStreams, chunkStreams.size());
int readChunkLength = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]