This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new 4115827 HDDS-4940 : EC: Implement the ECKeyOutputStream which should
handle the EC mode writes. (#2335)
4115827 is described below
commit 4115827ff9cb90bfc1ec6f4863a0a800d80b82c0
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Mon Jul 12 09:18:20 2021 -0700
HDDS-4940 : EC: Implement the ECKeyOutputStream which should handle the EC
mode writes. (#2335)
---
.../hadoop/hdds/scm/storage/BlockInputStream.java | 3 +-
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 28 +-
.../hdds/scm/storage/ECBlockOutputStream.java | 118 ++++
hadoop-ozone/client/pom.xml | 4 +
.../ozone/client/io/BlockOutputStreamEntry.java | 16 +-
.../client/io/BlockOutputStreamEntryPool.java | 39 +-
.../ozone/client/io/ECBlockOutputStreamEntry.java | 126 ++++
.../client/io/ECBlockOutputStreamEntryPool.java | 149 +++++
.../hadoop/ozone/client/io/ECKeyOutputStream.java | 648 +++++++++++++++++++++
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 30 +-
.../hadoop/ozone/client/MockDatanodeStorage.java | 7 +-
.../hadoop/ozone/client/MockOmTransport.java | 4 +
.../ozone/client/MockXceiverClientFactory.java | 9 +-
.../client/MultiNodePipelineBlockAllocator.java | 71 +++
.../hadoop/ozone/client/TestOzoneECClient.java | 274 +++++++++
15 files changed, 1497 insertions(+), 29 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index def810f..fea14bf 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -199,7 +199,8 @@ public class BlockInputStream extends InputStream
protected List<ChunkInfo> getChunkInfos() throws IOException {
// irrespective of the container state, we will always read via Standalone
// protocol.
- if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
+ if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE &&
pipeline
+ .getType() != HddsProtos.ReplicationType.EC) {
pipeline = Pipeline.newBuilder(pipeline)
.setReplicationConfig(new StandaloneReplicationConfig(
ReplicationConfig
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 92ce3c6..98bd157 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -219,6 +219,22 @@ public class BlockOutputStream extends OutputStream {
return ioException.get();
}
+ XceiverClientSpi getXceiverClientSpi(){
+ return this.xceiverClient;
+ }
+
+ BlockData.Builder getContainerBlockData(){
+ return this.containerBlockData;
+ }
+
+ Token<? extends TokenIdentifier> getToken(){
+ return this.token;
+ }
+
+ ExecutorService getResponseExecutor(){
+ return this.responseExecutor;
+ }
+
@VisibleForTesting
public Map<Long, List<ChunkBuffer>> getCommitIndex2flushedDataMap() {
return commitWatcher.getCommitIndex2flushedDataMap();
@@ -405,7 +421,7 @@ public class BlockOutputStream extends OutputStream {
* @param force true if no data was written since most recent putBlock and
* stream is being closed
*/
- private CompletableFuture<ContainerProtos.
+ CompletableFuture<ContainerProtos.
ContainerCommandResponseProto> executePutBlock(boolean close,
boolean force) throws IOException {
checkOpen();
@@ -573,7 +589,7 @@ public class BlockOutputStream extends OutputStream {
combinedFuture.get();
}
- private void validateResponse(
+ void validateResponse(
ContainerProtos.ContainerCommandResponseProto responseProto)
throws IOException {
try {
@@ -592,7 +608,7 @@ public class BlockOutputStream extends OutputStream {
}
- private void setIoException(Exception e) {
+ void setIoException(Exception e) {
IOException ioe = getIoException();
if (ioe == null) {
IOException exception = new IOException(EXCEPTION_MSG + e.toString(),
e);
@@ -624,7 +640,7 @@ public class BlockOutputStream extends OutputStream {
*
* @throws IOException if stream is closed
*/
- private void checkOpen() throws IOException {
+ void checkOpen() throws IOException {
if (isClosed()) {
throw new IOException("BlockOutputStream has been closed.");
} else if (getIoException() != null) {
@@ -645,7 +661,7 @@ public class BlockOutputStream extends OutputStream {
* @throws OzoneChecksumException if there is an error while computing
* checksum
*/
- private void writeChunkToContainer(ChunkBuffer chunk) throws IOException {
+ void writeChunkToContainer(ChunkBuffer chunk) throws IOException {
int effectiveChunkSize = chunk.remaining();
final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
final ByteString data = chunk.toByteString(
@@ -705,7 +721,7 @@ public class BlockOutputStream extends OutputStream {
* handle ExecutionException else skip it.
* @throws IOException
*/
- private void handleInterruptedException(Exception ex,
+ void handleInterruptedException(Exception ex,
boolean processExecutionException)
throws IOException {
LOG.error("Command execution was interrupted.");
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
new file mode 100644
index 0000000..30bd2b7
--- /dev/null
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.common.ChunkBuffer;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+
+import static
org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
+
+/**
+ * Handles the chunk EC writes for an EC internal block.
+ */
+public class ECBlockOutputStream extends BlockOutputStream{
+
+ /**
+ * Creates a new ECBlockOutputStream.
+ *
+ * @param blockID block ID
+ * @param xceiverClientManager client manager that controls client
+ * @param pipeline pipeline where block will be written
+ * @param bufferPool pool of buffers
+ */
+ public ECBlockOutputStream(
+ BlockID blockID,
+ XceiverClientFactory xceiverClientManager,
+ Pipeline pipeline,
+ BufferPool bufferPool,
+ OzoneClientConfig config,
+ Token<? extends TokenIdentifier> token
+ ) throws IOException {
+ super(blockID, xceiverClientManager,
+ pipeline, bufferPool, config, token);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ writeChunkToContainer(ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)));
+ }
+
+ /**
+ * @param close whether putBlock is happening as part of closing the stream
+ * @param force true if no data was written since most recent putBlock and
+ * stream is being closed
+ */
+ public CompletableFuture<ContainerProtos.
+ ContainerCommandResponseProto> executePutBlock(boolean close,
+ boolean force) throws IOException {
+ checkOpen();
+
+ CompletableFuture<ContainerProtos.
+ ContainerCommandResponseProto> flushFuture = null;
+ try {
+ ContainerProtos.BlockData blockData = getContainerBlockData().build();
+ XceiverClientReply asyncReply =
+ putBlockAsync(getXceiverClient(), blockData, close, getToken());
+ CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
+ asyncReply.getResponse();
+ flushFuture = future.thenApplyAsync(e -> {
+ try {
+ validateResponse(e);
+ } catch (IOException sce) {
+ throw new CompletionException(sce);
+ }
+ // if the ioException is not set, putBlock is successful
+ if (getIoException() == null) {
+ BlockID responseBlockID = BlockID.getFromProtobuf(
+ e.getPutBlock().getCommittedBlockLength().getBlockID());
+ Preconditions.checkState(getBlockID().getContainerBlockID()
+ .equals(responseBlockID.getContainerBlockID()));
+ }
+ return e;
+ }, getResponseExecutor()).exceptionally(e -> {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("putBlock failed for blockID {} with exception {}",
+ getBlockID(), e.getLocalizedMessage());
+ }
+ CompletionException ce = new CompletionException(e);
+ setIoException(ce);
+ throw ce;
+ });
+ } catch (IOException | ExecutionException e) {
+ throw new IOException(EXCEPTION_MSG + e.toString(), e);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ handleInterruptedException(ex, false);
+ }
+ return flushFuture;
+ }
+}
diff --git a/hadoop-ozone/client/pom.xml b/hadoop-ozone/client/pom.xml
index e7635d0..56ebe56 100644
--- a/hadoop-ozone/client/pom.xml
+++ b/hadoop-ozone/client/pom.xml
@@ -43,6 +43,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 594bbf0..21c88cf 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -37,7 +37,7 @@ import com.google.common.annotations.VisibleForTesting;
/**
* Helper class used inside {@link BlockOutputStream}.
* */
-public final class BlockOutputStreamEntry extends OutputStream {
+public class BlockOutputStreamEntry extends OutputStream {
private final OzoneClientConfig config;
private OutputStream outputStream;
@@ -54,7 +54,7 @@ public final class BlockOutputStreamEntry extends
OutputStream {
private BufferPool bufferPool;
@SuppressWarnings({"parameternumber", "squid:S00107"})
- private BlockOutputStreamEntry(
+ BlockOutputStreamEntry(
BlockID blockID, String key,
XceiverClientFactory xceiverClientManager,
Pipeline pipeline,
@@ -95,12 +95,14 @@ public final class BlockOutputStreamEntry extends
OutputStream {
*/
private void checkStream() throws IOException {
if (this.outputStream == null) {
- this.outputStream =
- new BlockOutputStream(blockID, xceiverClientManager,
- pipeline, bufferPool, config, token);
+ this.outputStream = createOutputStream();
}
}
+ BlockOutputStream createOutputStream() throws IOException {
+ return new BlockOutputStream(blockID, xceiverClientManager,
+ pipeline, bufferPool, config, token);
+ }
@Override
public void write(int b) throws IOException {
@@ -270,6 +272,10 @@ public final class BlockOutputStreamEntry extends
OutputStream {
return key;
}
+ public OzoneClientConfig getConf(){
+ return this.config;
+ }
+
public XceiverClientFactory getXceiverClientManager() {
return xceiverClientManager;
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index e9147a8..0af651c 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -145,7 +145,7 @@ public class BlockOutputStreamEntryPool {
}
}
- private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
+ void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
Preconditions.checkNotNull(subKeyInfo.getPipeline());
BlockOutputStreamEntry.Builder builder =
new BlockOutputStreamEntry.Builder()
@@ -160,9 +160,18 @@ public class BlockOutputStreamEntryPool {
streamEntries.add(builder.build());
}
- public List<OmKeyLocationInfo> getLocationInfoList() {
+ public List<OmKeyLocationInfo> getLocationInfoList() {
+ List<OmKeyLocationInfo> locationInfoList;
+ List<OmKeyLocationInfo> currBlocksLocationInfoList =
+ getOmKeyLocationInfos(streamEntries);
+ locationInfoList = currBlocksLocationInfoList;
+ return locationInfoList;
+ }
+
+ List<OmKeyLocationInfo> getOmKeyLocationInfos(
+ List<BlockOutputStreamEntry> streams) {
List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
- for (BlockOutputStreamEntry streamEntry : streamEntries) {
+ for (BlockOutputStreamEntry streamEntry : streams) {
long length = streamEntry.getCurrentPosition();
// Commit only those blocks to OzoneManager which are not empty
@@ -184,6 +193,14 @@ public class BlockOutputStreamEntryPool {
return locationInfoList;
}
+ public BufferPool getBufferPool() {
+ return this.bufferPool;
+ }
+
+ public OzoneClientConfig getConfig() {
+ return config;
+ }
+
/**
* Discards the subsequent pre allocated blocks and removes the streamEntries
* from the streamEntries list for the container which is closed.
@@ -224,8 +241,8 @@ public class BlockOutputStreamEntryPool {
}
long getKeyLength() {
- return streamEntries.stream().mapToLong(
- BlockOutputStreamEntry::getCurrentPosition).sum();
+ return streamEntries.stream()
+ .mapToLong(BlockOutputStreamEntry::getCurrentPosition).sum();
}
/**
* Contact OM to get a new block. Set the new block with the index (e.g.
@@ -274,6 +291,18 @@ public class BlockOutputStreamEntryPool {
}
}
+ public int getCurrIdx(){
+ return currentStreamIndex;
+ }
+
+ public void setCurrIdx(int currIdx) {
+ this.currentStreamIndex = currIdx;
+ }
+
+ public void updateToNextStream(int rotation){
+ currentStreamIndex = (currentStreamIndex+1) % rotation;
+ }
+
BlockOutputStreamEntry allocateBlockIfNeeded() throws IOException {
BlockOutputStreamEntry streamEntry = getCurrentStreamEntry();
if (streamEntry != null && streamEntry.isClosed()) {
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
new file mode 100644
index 0000000..383ed17
--- /dev/null
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BufferPool;
+import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+import java.io.IOException;
+
+/**
+ * Helper for {@link ECBlockOutputStream}.
+ */
+public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
+ private ECBlockOutputStream out;
+ @SuppressWarnings({"parameternumber", "squid:S00107"})
+ ECBlockOutputStreamEntry(BlockID blockID, String key,
+ XceiverClientFactory xceiverClientManager, Pipeline pipeline, long
length,
+ BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
+ OzoneClientConfig config) {
+ super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
+ token, config);
+ }
+
+ @Override
+ ECBlockOutputStream createOutputStream() throws IOException {
+ this.out = new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
+ getPipeline(), getBufferPool(), getConf(), getToken());
+ return this.out;
+ }
+
+ void executePutBlock() throws IOException {
+ this.out.executePutBlock(false, true);
+ }
+
+ /**
+ * Builder class for ChunkGroupOutputStreamEntry.
+ * */
+ public static class Builder {
+
+ private BlockID blockID;
+ private String key;
+ private XceiverClientFactory xceiverClientManager;
+ private Pipeline pipeline;
+ private long length;
+ private BufferPool bufferPool;
+ private Token<OzoneBlockTokenIdentifier> token;
+ private OzoneClientConfig config;
+
+ public ECBlockOutputStreamEntry.Builder setBlockID(BlockID bID) {
+ this.blockID = bID;
+ return this;
+ }
+
+ public ECBlockOutputStreamEntry.Builder setKey(String keys) {
+ this.key = keys;
+ return this;
+ }
+
+ public ECBlockOutputStreamEntry.Builder setXceiverClientManager(
+ XceiverClientFactory
+ xClientManager) {
+ this.xceiverClientManager = xClientManager;
+ return this;
+ }
+
+ public ECBlockOutputStreamEntry.Builder setPipeline(Pipeline ppln) {
+ this.pipeline = ppln;
+ return this;
+ }
+
+
+ public ECBlockOutputStreamEntry.Builder setLength(long len) {
+ this.length = len;
+ return this;
+ }
+
+
+ public ECBlockOutputStreamEntry.Builder setBufferPool(BufferPool pool) {
+ this.bufferPool = pool;
+ return this;
+ }
+
+ public ECBlockOutputStreamEntry.Builder setConfig(
+ OzoneClientConfig clientConfig) {
+ this.config = clientConfig;
+ return this;
+ }
+
+ public ECBlockOutputStreamEntry.Builder setToken(
+ Token<OzoneBlockTokenIdentifier> bToken) {
+ this.token = bToken;
+ return this;
+ }
+
+ public ECBlockOutputStreamEntry build() {
+ return new ECBlockOutputStreamEntry(blockID,
+ key,
+ xceiverClientManager,
+ pipeline,
+ length,
+ bufferPool,
+ token, config);
+ }
+ }
+}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
new file mode 100644
index 0000000..ca76b75
--- /dev/null
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class manages the stream entries list and handles block allocation
+ * from OzoneManager for EC writes.
+ */
+public class ECBlockOutputStreamEntryPool extends BlockOutputStreamEntryPool {
+ private final List<BlockOutputStreamEntry> finishedStreamEntries;
+ private final ECReplicationConfig ecReplicationConfig;
+
+ @SuppressWarnings({"parameternumber", "squid:S00107"})
+ public ECBlockOutputStreamEntryPool(OzoneClientConfig config,
+ OzoneManagerProtocol omClient,
+ String requestId,
+ ReplicationConfig replicationConfig,
+ String uploadID,
+ int partNumber,
+ boolean isMultipart,
+ OmKeyInfo info,
+ boolean unsafeByteBufferConversion,
+ XceiverClientFactory xceiverClientFactory,
+ long openID) {
+ super(config, omClient, requestId, replicationConfig, uploadID, partNumber,
+ isMultipart, info, unsafeByteBufferConversion, xceiverClientFactory,
+ openID);
+ this.finishedStreamEntries = new ArrayList<>();
+ assert replicationConfig instanceof ECReplicationConfig;
+ this.ecReplicationConfig = (ECReplicationConfig) replicationConfig;
+ }
+
+ @Override
+ void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
+ Preconditions.checkNotNull(subKeyInfo.getPipeline());
+ List<DatanodeDetails> nodes = subKeyInfo.getPipeline().getNodes();
+ for (int i = 0; i < nodes.size(); i++) {
+ List<DatanodeDetails> nodeStatus = new ArrayList<>();
+ nodeStatus.add(nodes.get(i));
+ Map<DatanodeDetails, Integer> nodeVsIdx = new HashMap<>();
+ nodeVsIdx.put(nodes.get(i), i + 1);
+ Pipeline pipeline =
+ Pipeline.newBuilder().setId(subKeyInfo.getPipeline().getId())
+ .setReplicationConfig(
+ subKeyInfo.getPipeline().getReplicationConfig())
+ .setState(subKeyInfo.getPipeline().getPipelineState())
+ .setNodes(nodeStatus).setReplicaIndexes(nodeVsIdx).build();
+
+ ECBlockOutputStreamEntry.Builder builder =
+ new ECBlockOutputStreamEntry.Builder()
+ .setBlockID(subKeyInfo.getBlockID()).setKey(getKeyName())
+ .setXceiverClientManager(getXceiverClientFactory())
+ .setPipeline(pipeline).setConfig(getConfig())
+ .setLength(subKeyInfo.getLength()).setBufferPool(getBufferPool())
+ .setToken(subKeyInfo.getToken());
+ getStreamEntries().add(builder.build());
+ }
+ }
+
+ public List<OmKeyLocationInfo> getLocationInfoList() {
+ List<OmKeyLocationInfo> locationInfoList;
+ List<OmKeyLocationInfo> currBlocksLocationInfoList =
+ getOmKeyLocationInfos(getStreamEntries());
+ List<OmKeyLocationInfo> prevBlksKeyLocationInfos =
+ getOmKeyLocationInfos(finishedStreamEntries);
+ prevBlksKeyLocationInfos.addAll(currBlocksLocationInfoList);
+ locationInfoList = prevBlksKeyLocationInfos;
+ return locationInfoList;
+ }
+
+ long getKeyLength() {
+ long totalLength = getStreamEntries().stream().filter(c -> {
+ return (c.getPipeline().getReplicaIndex(
+ c.getPipeline().getNodes().iterator()
+ .next())) <= ecReplicationConfig.getData();
+ }).mapToLong(BlockOutputStreamEntry::getCurrentPosition).sum();
+ totalLength += finishedStreamEntries.stream().filter(c -> {
+ return (c.getPipeline().getReplicaIndex(
+ c.getPipeline().getNodes().iterator()
+ .next())) <= ecReplicationConfig.getData();
+ }).mapToLong(BlockOutputStreamEntry::getCurrentPosition).sum();
+ return totalLength;
+ }
+
+ public void endECBlock() throws IOException {
+ List<BlockOutputStreamEntry> entries = getStreamEntries();
+ if (entries.size() > 0) {
+ for (int i = entries.size() - 1; i >= 0; i--) {
+ finishedStreamEntries.add(entries.remove(i));
+ }
+ }
+
+ for (BlockOutputStreamEntry entry : finishedStreamEntries) {
+ entry.close();
+ }
+ super.cleanup();
+ }
+
+ void executePutBlockForAll() throws IOException {
+ List<BlockOutputStreamEntry> streamEntries = getStreamEntries();
+ for (int i = 0; i < streamEntries.size(); i++) {
+ ((ECBlockOutputStreamEntry) streamEntries.get(i)).executePutBlock();
+ }
+ }
+
+ void cleanupAll() {
+ super.cleanup();
+ if (finishedStreamEntries != null) {
+ finishedStreamEntries.clear();
+ }
+ }
+
+ public void updateToNextStream(int rotation) {
+ super.setCurrIdx((getCurrIdx() + 1) % rotation);
+ }
+
+}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
new file mode 100644
index 0000000..474dcab
--- /dev/null
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.ErasureCodecOptions;
+import org.apache.hadoop.io.erasurecode.codec.RSErasureCodec;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ECKeyOutputStream handles the EC writes by writing the data into underlying
+ * block output streams chunk by chunk.
+ */
+public class ECKeyOutputStream extends KeyOutputStream {
+ private OzoneClientConfig config;
+ private ECChunkBuffers ecChunkBufferCache;
+ private int ecChunkSize = 1024;
+ private final int numDataBlks;
+ private final int numParityBlks;
+ private static final ByteBufferPool BUFFER_POOL = new
ElasticByteBufferPool();
+ private final RawErasureEncoder encoder;
+ // TODO: EC: Currently using the below EC Schema. This has to be modified and
+ // created dynamically once OM return the configured scheme details.
+ private static final String DEFAULT_CODEC_NAME = "rs";
+
+ private long currentBlockGroupLen = 0;
+ /**
+ * Defines stream action while calling handleFlushOrClose.
+ */
+ enum StreamAction {
+ FLUSH, CLOSE, FULL
+ }
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(KeyOutputStream.class);
+
+ private boolean closed;
+ private FileEncryptionInfo feInfo;
+ // how much of data is actually written yet to underlying stream
+ private long offset;
+ // how much data has been ingested into the stream
+ private long writeOffset;
+ // whether an exception is encountered while write and whole write could
+ // not succeed
+ private boolean isException;
+ private final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool;
+
+ @VisibleForTesting
+ public List<BlockOutputStreamEntry> getStreamEntries() {
+ return blockOutputStreamEntryPool.getStreamEntries();
+ }
+
+ @VisibleForTesting
+ public XceiverClientFactory getXceiverClientFactory() {
+ return blockOutputStreamEntryPool.getXceiverClientFactory();
+ }
+
+ @VisibleForTesting
+ public List<OmKeyLocationInfo> getLocationInfoList() {
+ return blockOutputStreamEntryPool.getLocationInfoList();
+ }
+
+ @SuppressWarnings({"parameternumber", "squid:S00107"})
+ public ECKeyOutputStream(OzoneClientConfig config, OpenKeySession handler,
+ XceiverClientFactory xceiverClientManager, OzoneManagerProtocol omClient,
+ int chunkSize, String requestId, ReplicationConfig replicationConfig,
+ String uploadID, int partNumber, boolean isMultipart,
+ boolean unsafeByteBufferConversion) {
+ this.config = config;
+ // For EC, cell/chunk size and buffer size can be same for now.
+ this.config.setStreamBufferMaxSize(ecChunkSize);
+ this.config.setStreamBufferFlushSize(ecChunkSize);
+ this.config.setStreamBufferSize(ecChunkSize);
+ assert replicationConfig instanceof ECReplicationConfig;
+ this.numDataBlks = ((ECReplicationConfig) replicationConfig).getData();
+ this.numParityBlks = ((ECReplicationConfig) replicationConfig).getParity();
+ ecChunkBufferCache =
+ new ECChunkBuffers(ecChunkSize, numDataBlks, numParityBlks);
+ OmKeyInfo info = handler.getKeyInfo();
+ blockOutputStreamEntryPool =
+ new ECBlockOutputStreamEntryPool(config, omClient, requestId,
+ replicationConfig, uploadID, partNumber, isMultipart, info,
+ unsafeByteBufferConversion, xceiverClientManager, handler.getId());
+
+ // Retrieve the file encryption key info, null if file is not in
+ // encrypted bucket.
+ this.feInfo = info.getFileEncryptionInfo();
+ this.isException = false;
+ this.writeOffset = 0;
+ OzoneConfiguration conf = new OzoneConfiguration();
+ ECSchema schema =
+ new ECSchema(DEFAULT_CODEC_NAME, numDataBlks, numParityBlks);
+ ErasureCodecOptions options = new ErasureCodecOptions(schema);
+ RSErasureCodec codec = new RSErasureCodec(conf, options);
+ this.encoder = CodecUtil.createRawEncoder(conf,
+ SystemErasureCodingPolicies.getPolicies().get(1).getCodecName(),
+ codec.getCoderOptions());
+ }
+
+ /**
+ * When a key is opened, it is possible that there are some blocks already
+ * allocated to it for this open session. In this case, to make use of these
+ * blocks, we need to add these blocks to stream entries. But, a key's
version
+ * also includes blocks from previous versions, we need to avoid adding these
+ * old blocks to stream entries, because these old blocks should not be
picked
+ * for write. To do this, the following method checks that, only those
+ * blocks created in this particular open version are added to stream
entries.
+ *
+ * @param version the set of blocks that are pre-allocated.
+ * @param openVersion the version corresponding to the pre-allocation.
+ * @throws IOException
+ */
+ public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
+ long openVersion) throws IOException {
+ blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
+ }
+
+ /**
+ * Try to write the bytes sequence b[off:off+len) to underlying EC block
+ * streams.
+ *
+ * @param b byte data
+ * @param off starting offset
+ * @param len length to write
+ * @throws IOException
+ */
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ checkNotClosed();
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+ || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException();
+ }
+ if (len == 0) {
+ return;
+ }
+
+ int currentChunkBufferRemainingLength =
+ ecChunkBufferCache.dataBuffers[blockOutputStreamEntryPool.getCurrIdx()]
+ .remaining();
+ int currentChunkBufferLen =
+ ecChunkBufferCache.dataBuffers[blockOutputStreamEntryPool.getCurrIdx()]
+ .position();
+ int maxLenToCurrChunkBuffer = (int) Math.min(len, ecChunkSize);
+ int currentWriterChunkLenToWrite =
+ Math.min(currentChunkBufferRemainingLength, maxLenToCurrChunkBuffer);
+ handleWrite(b, off, currentWriterChunkLenToWrite,
+ currentChunkBufferLen + currentWriterChunkLenToWrite == ecChunkSize,
+ false);
+ checkAndWriteParityCells();
+
+ int remLen = len - currentWriterChunkLenToWrite;
+ int iters = remLen / ecChunkSize;
+ int lastCellSize = remLen % ecChunkSize;
+ while (iters > 0) {
+ handleWrite(b, off, ecChunkSize, true, false);
+ off += ecChunkSize;
+ iters--;
+ checkAndWriteParityCells();
+ }
+
+ if (lastCellSize > 0) {
+ handleWrite(b, off, lastCellSize, false, false);
+ checkAndWriteParityCells();
+ }
+ writeOffset += len;
+ }
+
+ private void checkAndWriteParityCells() throws IOException {
+ //check data blocks finished
+ //If index is > datanum blks
+ if (blockOutputStreamEntryPool.getCurrIdx() == numDataBlks) {
+ //Lets encode and write
+ //encoder.encode();
+ writeParityCells();
+ // By this time, we should have finished full stripe. So, lets call
+ // executePutBlock for all.
+ // TODO: we should alter the put block calls to share CRC to each stream.
+ blockOutputStreamEntryPool.executePutBlockForAll();
+ ecChunkBufferCache.clear();
+
+ // check if block ends?
+ if (currentBlockGroupLen == numDataBlks * blockOutputStreamEntryPool
+ .getStreamEntries().get(blockOutputStreamEntryPool.getCurrIdx())
+ .getLength()) {
+ blockOutputStreamEntryPool.endECBlock();
+ currentBlockGroupLen = 0;
+ }
+ }
+ }
+
+ void writeParityCells() throws IOException {
+ final ByteBuffer[] buffers = ecChunkBufferCache.getDataBuffers();
+ //encode the data cells
+ for (int i = 0; i < numDataBlks; i++) {
+ buffers[i].flip();
+ }
+
+ final ByteBuffer[] parityBuffers = ecChunkBufferCache.getParityBuffers();
+ encoder.encode(buffers, parityBuffers);
+ for (int i =
+ numDataBlks; i < (this.numDataBlks + this.numParityBlks); i++) {
+ handleWrite(parityBuffers[i - numDataBlks].array(), 0, ecChunkSize, true,
+ true);
+ }
+ }
+
+ private void handleWrite(byte[] b, int off, long len, boolean isFullCell,
+ boolean isParity) throws IOException {
+ if (!isParity) {
+ ecChunkBufferCache
+ .addToDataBuffer(blockOutputStreamEntryPool.getCurrIdx(), b, off,
+ (int) len);
+ }
+ BlockOutputStreamEntry current =
+ blockOutputStreamEntryPool.allocateBlockIfNeeded();
+ int writeLengthToCurrStream =
+ Math.min((int) len, (int) current.getRemaining());
+ currentBlockGroupLen += isParity ? 0 : writeLengthToCurrStream;
+ if (current.getRemaining() <= 0) {
+ // since the current block is already written close the stream.
+ closeCurrentStream(StreamAction.CLOSE);
+ }
+
+ len -= writeLengthToCurrStream;
+ if (isFullCell) {
+ ByteBuffer bytesToWrite = isParity ?
+ ecChunkBufferCache.getParityBuffers()[blockOutputStreamEntryPool
+ .getCurrIdx() - numDataBlks] :
+ ecChunkBufferCache.getDataBuffers()[blockOutputStreamEntryPool
+ .getCurrIdx()];
+ try {
+ writeToOutputStream(current, len, bytesToWrite.array(),
+ bytesToWrite.array().length, 0, current.getWrittenDataLength(),
+ isParity);
+ } catch (Exception e) {
+ markStreamClosed();
+ }
+
+ blockOutputStreamEntryPool
+ .updateToNextStream(numDataBlks + numParityBlks);
+ }
+ }
+
+ private int writeToOutputStream(BlockOutputStreamEntry current, long len,
+ byte[] b, int writeLen, int off, long currentPos, boolean isParity)
+ throws IOException {
+ try {
+ current.write(b, off, writeLen);
+ if (!isParity) {
+ offset += writeLen;
+ }
+ } catch (IOException ioe) {
+ // for the current iteration, totalDataWritten - currentPos gives the
+ // amount of data already written to the buffer
+
+ // In the retryPath, the total data to be written will always be equal
+ // to or less than the max length of the buffer allocated.
+ // The len specified here is the combined sum of the data length of
+ // the buffers
+ Preconditions.checkState(len <= config.getStreamBufferMaxSize());
+ int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
+ writeLen = dataWritten;
+
+ if (!isParity) {
+ offset += writeLen;
+ }
+ LOG.debug("writeLen {}, total len {}", writeLen, len);
+ handleException(current, ioe);
+ }
+ return writeLen;
+ }
+
+ private void handleException(BlockOutputStreamEntry streamEntry,
+ IOException exception) throws IOException {
+ Throwable t = HddsClientUtils.checkForException(exception);
+ Preconditions.checkNotNull(t);
+ // In EC, we will just close the current stream.
+ streamEntry.close();
+ }
+
+ private void markStreamClosed() {
+ blockOutputStreamEntryPool.cleanup();
+ closed = true;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ checkNotClosed();
+ handleFlushOrClose(StreamAction.FLUSH);
+ }
+
+ /**
+ * Close or Flush the latest outputStream depending upon the action.
+ * This function gets called when while write is going on, the current stream
+ * gets full or explicit flush or close request is made by client.
+ *
+ * @param op Flag which decides whether to call close or flush on the
+ * outputStream.
+ * @throws IOException In case, flush or close fails with exception.
+ */
+ @SuppressWarnings("squid:S1141")
+ private void handleFlushOrClose(StreamAction op) throws IOException {
+ if (!blockOutputStreamEntryPool.isEmpty()) {
+ while (true) {
+ try {
+ BlockOutputStreamEntry entry =
+ blockOutputStreamEntryPool.getCurrentStreamEntry();
+ if (entry != null) {
+ try {
+ handleStreamAction(entry, op);
+ } catch (IOException ioe) {
+ handleException(entry, ioe);
+ continue;
+ }
+ }
+ return;
+ } catch (Exception e) {
+ markStreamClosed();
+ throw e;
+ }
+ }
+ }
+ }
+
+ private void handleFlushOrCloseAllStreams(StreamAction op)
+ throws IOException {
+ if (!blockOutputStreamEntryPool.isEmpty()) {
+ List<BlockOutputStreamEntry> allStreamEntries =
+ blockOutputStreamEntryPool.getStreamEntries();
+ for (int i = 0; i < allStreamEntries.size(); i++) {
+ while (true) {
+ try {
+ BlockOutputStreamEntry entry = allStreamEntries.get(i);
+ if (entry != null) {
+ try {
+ handleStreamAction(entry, op);
+ } catch (IOException ioe) {
+ handleException(entry, ioe);
+ continue;
+ }
+ }
+ return;
+ } catch (Exception e) {
+ markStreamClosed();
+ throw e;
+ }
+ }
+ }
+ }
+ }
+
+ private void closeCurrentStream(StreamAction op) throws IOException {
+ if (!blockOutputStreamEntryPool.isEmpty()) {
+ List<BlockOutputStreamEntry> allStreamEntries =
+ blockOutputStreamEntryPool.getStreamEntries();
+ for (int i = 0; i < allStreamEntries.size(); i++) {
+ while (true) {
+ try {
+ BlockOutputStreamEntry entry = allStreamEntries.get(i);
+ if (entry != null) {
+ try {
+ handleStreamAction(entry, op);
+ } catch (IOException ioe) {
+ handleException(entry, ioe);
+ continue;
+ }
+ }
+ return;
+ } catch (Exception e) {
+ markStreamClosed();
+ throw e;
+ }
+ }
+ }
+ }
+ }
+
+ private void handleStreamAction(BlockOutputStreamEntry entry, StreamAction
op)
+ throws IOException {
+ Collection<DatanodeDetails> failedServers = entry.getFailedServers();
+ // failed servers can be null in case there is no data written in
+ // the stream
+ if (!failedServers.isEmpty()) {
+ blockOutputStreamEntryPool.getExcludeList().addDatanodes(failedServers);
+ }
+ switch (op) {
+ case CLOSE:
+ entry.close();
+ break;
+ case FULL:
+ if (entry.getRemaining() == 0) {
+ entry.close();
+ }
+ break;
+ case FLUSH:
+ entry.flush();
+ break;
+ default:
+ throw new IOException("Invalid Operation");
+ }
+ }
+
+ /**
+ * Commit the key to OM, this will add the blocks as the new key blocks.
+ *
+ * @throws IOException
+ */
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ try {
+ handleFlushOrCloseAllStreams(StreamAction.CLOSE);
+ if (!isException) {
+ Preconditions.checkArgument(writeOffset == offset);
+ }
+ blockOutputStreamEntryPool.endECBlock();
+ blockOutputStreamEntryPool.commitKey(offset);
+ } finally {
+ blockOutputStreamEntryPool.cleanupAll();
+ }
+ ecChunkBufferCache.release();
+ }
+
+ public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
+ return blockOutputStreamEntryPool.getCommitUploadPartInfo();
+ }
+
+ public FileEncryptionInfo getFileEncryptionInfo() {
+ return feInfo;
+ }
+
+ @VisibleForTesting
+ public ExcludeList getExcludeList() {
+ return blockOutputStreamEntryPool.getExcludeList();
+ }
+
+ /**
+ * Builder class of ECKeyOutputStream.
+ */
+ public static class Builder {
+ private OpenKeySession openHandler;
+ private XceiverClientFactory xceiverManager;
+ private OzoneManagerProtocol omClient;
+ private int chunkSize;
+ private String requestID;
+ private String multipartUploadID;
+ private int multipartNumber;
+ private boolean isMultipartKey;
+ private boolean unsafeByteBufferConversion;
+ private OzoneClientConfig clientConfig;
+ private ReplicationConfig replicationConfig;
+
+ public Builder setMultipartUploadID(String uploadID) {
+ this.multipartUploadID = uploadID;
+ return this;
+ }
+
+ public Builder setMultipartNumber(int partNumber) {
+ this.multipartNumber = partNumber;
+ return this;
+ }
+
+ public Builder setHandler(OpenKeySession handler) {
+ this.openHandler = handler;
+ return this;
+ }
+
+ public Builder setXceiverClientManager(XceiverClientFactory manager) {
+ this.xceiverManager = manager;
+ return this;
+ }
+
+ public Builder setOmClient(OzoneManagerProtocol client) {
+ this.omClient = client;
+ return this;
+ }
+
+ public Builder setChunkSize(int size) {
+ this.chunkSize = size;
+ return this;
+ }
+
+ public Builder setRequestID(String id) {
+ this.requestID = id;
+ return this;
+ }
+
+ public Builder setIsMultipartKey(boolean isMultipart) {
+ this.isMultipartKey = isMultipart;
+ return this;
+ }
+
+ public Builder setConfig(OzoneClientConfig config) {
+ this.clientConfig = config;
+ return this;
+ }
+
+ public Builder enableUnsafeByteBufferConversion(boolean enabled) {
+ this.unsafeByteBufferConversion = enabled;
+ return this;
+ }
+
+ public ECKeyOutputStream.Builder setReplicationConfig(
+ ReplicationConfig replConfig) {
+ this.replicationConfig = replConfig;
+ return this;
+ }
+
+ public ECKeyOutputStream build() {
+ return new ECKeyOutputStream(clientConfig, openHandler, xceiverManager,
+ omClient, chunkSize, requestID, replicationConfig, multipartUploadID,
+ multipartNumber, isMultipartKey, unsafeByteBufferConversion);
+ }
+ }
+
+ /**
+ * Verify that the output stream is open. Non blocking; this gives
+ * the last state of the volatile {@link #closed} field.
+ *
+ * @throws IOException if the connection is closed.
+ */
+ private void checkNotClosed() throws IOException {
+ if (closed) {
+ throw new IOException(
+ ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: "
+ + blockOutputStreamEntryPool.getKeyName());
+ }
+ }
+
+ private static class ECChunkBuffers {
+ private final ByteBuffer[] dataBuffers;
+ private final ByteBuffer[] parityBuffers;
+ private final int dataBlks;
+ private final int parityBlks;
+ private int cellSize;
+
+ ECChunkBuffers(int cellSize, int numData, int numParity) {
+ this.cellSize = cellSize;
+ this.parityBlks = numParity;
+ this.dataBlks = numData;
+ dataBuffers = new ByteBuffer[this.dataBlks];
+ parityBuffers = new ByteBuffer[this.parityBlks];
+ allocateBuffers(cellSize, dataBuffers);
+ allocateBuffers(cellSize, parityBuffers);
+ }
+
+ private ByteBuffer[] getDataBuffers() {
+ return dataBuffers;
+ }
+
+ private ByteBuffer[] getParityBuffers() {
+ return parityBuffers;
+ }
+
+ private int addToDataBuffer(int i, byte[] b, int off, int len) {
+ final ByteBuffer buf = dataBuffers[i];
+ final int pos = buf.position() + len;
+ Preconditions.checkState(pos <= cellSize);
+ buf.put(b, off, len);
+ return pos;
+ }
+
+ private void clear() {
+ clearBuffers(cellSize, dataBuffers);
+ clearBuffers(cellSize, parityBuffers);
+ }
+
+ private void release() {
+ releaseBuffers(dataBuffers);
+ releaseBuffers(parityBuffers);
+ }
+
+ private static void allocateBuffers(int cellSize, ByteBuffer[] buffers) {
+ for (int i = 0; i < buffers.length; i++) {
+ buffers[i] = BUFFER_POOL.getBuffer(false, cellSize);
+ buffers[i].limit(cellSize);
+ }
+ }
+
+ private static void clearBuffers(int cellSize, ByteBuffer[] buffers) {
+ for (int i = 0; i < buffers.length; i++) {
+ buffers[i].clear();
+ buffers[i].limit(cellSize);
+ }
+ }
+
+ private static void releaseBuffers(ByteBuffer[] buffers) {
+ for (int i = 0; i < buffers.length; i++) {
+ if (buffers[i] != null) {
+ BUFFER_POOL.putBuffer(buffers[i]);
+ buffers[i] = null;
+ }
+ }
+ }
+ }
+}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index b49d05d..87da492 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -71,6 +71,7 @@ import
org.apache.hadoop.ozone.client.OzoneMultipartUploadList;
import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.VolumeArgs;
+import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
import org.apache.hadoop.ozone.client.io.KeyInputStream;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.LengthInputStream;
@@ -1365,16 +1366,25 @@ public class RpcClient implements ClientProtocol {
private OzoneOutputStream createOutputStream(OpenKeySession openKey,
String requestId, ReplicationConfig replicationConfig)
throws IOException {
- KeyOutputStream keyOutputStream =
- new KeyOutputStream.Builder()
- .setHandler(openKey)
- .setXceiverClientManager(xceiverClientManager)
- .setOmClient(ozoneManagerClient)
- .setRequestID(requestId)
- .setReplicationConfig(replicationConfig)
- .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
- .setConfig(clientConfig)
- .build();
+ KeyOutputStream keyOutputStream = null;
+
+ if (openKey.getKeyInfo().getReplicationConfig()
+ .getReplicationType() == HddsProtos.ReplicationType.EC) {
+ keyOutputStream = new ECKeyOutputStream.Builder().setHandler(openKey)
+ .setXceiverClientManager(xceiverClientManager)
+ .setOmClient(ozoneManagerClient).setRequestID(requestId)
+ .setReplicationConfig(replicationConfig)
+ .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
+ .setConfig(clientConfig).build();
+ } else {
+ keyOutputStream = new KeyOutputStream.Builder().setHandler(openKey)
+ .setXceiverClientManager(xceiverClientManager)
+ .setOmClient(ozoneManagerClient).setRequestID(requestId)
+ .setReplicationConfig(replicationConfig)
+ .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
+ .setConfig(clientConfig).build();
+ }
+
keyOutputStream
.addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(),
openKey.getOpenVersion());
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
index cb9875b..73ab96b 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockDatanodeStorage.java
@@ -48,7 +48,8 @@ public class MockDatanodeStorage {
public void writeChunk(
DatanodeBlockID blockID,
ChunkInfo chunkInfo, ByteString bytes) {
- data.put(createKey(blockID, chunkInfo), bytes);
+ data.put(createKey(blockID, chunkInfo),
+ ByteString.copyFrom(bytes.toByteArray()));
chunks.put(createKey(blockID, chunkInfo), chunkInfo);
}
@@ -70,4 +71,8 @@ public class MockDatanodeStorage {
+ chunkInfo.getChunkName() + "_" + chunkInfo.getOffset();
}
+ public Map<String, ByteString> getAllBlockData(){
+ return this.data;
+ }
+
}
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
index 17a7f6b..f38ee35 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
@@ -258,6 +258,10 @@ public class MockOmTransport implements OmTransport {
return CreateBucketResponse.newBuilder().build();
}
+ public Map<String, Map<String, Map<String, KeyInfo>>> getKeys(){
+ return this.keys;
+ }
+
@Override
public Text getDelegationTokenService() {
return null;
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
index a5fa2bb..da72651 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientFactory.java
@@ -67,6 +67,13 @@ public class MockXceiverClientFactory
boolean b) {
}
-};
+
+ /**
+ * Returns data nodes details.
+ */
+ public Map<DatanodeDetails, MockDatanodeStorage> getStorages() {
+ return this.storage;
+ }
+}
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
new file mode 100644
index 0000000..7ba8adc
--- /dev/null
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MultiNodePipelineBlockAllocator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Allocates the block with required number of nodes in the pipeline.
+ */
+public class MultiNodePipelineBlockAllocator implements MockBlockAllocator {
+ private long blockId;
+ private HddsProtos.Pipeline pipeline;
+ private int requiredNodes;
+
+ public MultiNodePipelineBlockAllocator(int requiredNodes) {
+ this.requiredNodes = requiredNodes;
+ }
+
+ @Override
+ public Iterable<? extends OzoneManagerProtocolProtos.KeyLocation>
+ allocateBlock(OzoneManagerProtocolProtos.KeyArgs keyArgs) {
+ if (pipeline == null) {
+ HddsProtos.Pipeline.Builder builder =
+ HddsProtos.Pipeline.newBuilder().setFactor(keyArgs.getFactor())
+ .setType(keyArgs.getType()).setId(
+ HddsProtos.PipelineID.newBuilder().setUuid128(
+ HddsProtos.UUID.newBuilder().setLeastSigBits(1L)
+ .setMostSigBits(1L).build()).build());
+
+ for (int i = 1; i <= requiredNodes; i++) {
+ builder.addMembers(HddsProtos.DatanodeDetailsProto.newBuilder()
+ .setUuid128(HddsProtos.UUID.newBuilder().setLeastSigBits(i)
+ .setMostSigBits(i).build()).setHostName("localhost")
+ .setIpAddress("1.2.3.4").addPorts(
+ HddsProtos.Port.newBuilder().setName("RATIS").setValue(1234 +
i)
+ .build()).build());
+ }
+ pipeline = builder.build();
+ }
+
+ List<OzoneManagerProtocolProtos.KeyLocation> results = new ArrayList<>();
+ results.add(OzoneManagerProtocolProtos.KeyLocation.newBuilder()
+ .setPipeline(pipeline).setBlockID(
+ HddsProtos.BlockID.newBuilder().setBlockCommitSequenceId(1L)
+ .setContainerBlockID(
+ HddsProtos.ContainerBlockID.newBuilder().setContainerID(1L)
+ .setLocalID(blockId++).build()).build()).setOffset(0L)
+ .setLength(keyArgs.getDataSize()).build());
+ return results;
+ }
+}
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
new file mode 100644
index 0000000..56e7d34
--- /dev/null
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.InMemoryConfiguration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.ErasureCodecOptions;
+import org.apache.hadoop.io.erasurecode.codec.RSErasureCodec;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.client.rpc.RpcClient;
+import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Real unit test for OzoneECClient.
+ * <p>
+ * Used for testing Ozone client without external network calls.
+ */
+public class TestOzoneECClient {
+ private int chunkSize = 1024;
+ private int dataBlocks = 3;
+ private int parityBlocks = 2;
+ private OzoneClient client;
+ private ObjectStore store;
+ private String keyName = UUID.randomUUID().toString();
+ private byte[][] inputChunks = new byte[dataBlocks][chunkSize];
+ private final XceiverClientFactory factoryStub =
+ new MockXceiverClientFactory();
+ private final MockOmTransport transportStub = new MockOmTransport(
+ new MultiNodePipelineBlockAllocator(dataBlocks + parityBlocks));
+ private ECSchema schema = new ECSchema("rs", dataBlocks, parityBlocks);
+ private ErasureCodecOptions options = new ErasureCodecOptions(schema);
+ private OzoneConfiguration conf = new OzoneConfiguration();
+ private RSErasureCodec codec = new RSErasureCodec(conf, options);
+ private final RawErasureEncoder encoder = CodecUtil.createRawEncoder(conf,
+ SystemErasureCodingPolicies.getPolicies().get(1).getCodecName(),
+ codec.getCoderOptions());
+
+ @Before
+ public void init() throws IOException {
+ ConfigurationSource config = new InMemoryConfiguration();
+ client = new OzoneClient(config, new RpcClient(config, null) {
+
+ @Override
+ protected OmTransport createOmTransport(String omServiceId)
+ throws IOException {
+ return transportStub;
+ }
+
+ @Override
+ protected XceiverClientFactory createXceiverClientFactory(
+ List<X509Certificate> x509Certificates) throws IOException {
+ return factoryStub;
+ }
+ });
+
+ store = client.getObjectStore();
+ initInputChunks();
+ }
+
+ private void initInputChunks() {
+ for (int i = 0; i < dataBlocks; i++) {
+ inputChunks[i] = getBytesWith(i + 1, chunkSize);
+ }
+ }
+
+ private byte[] getBytesWith(int singleDigitNumber, int total) {
+ StringBuilder builder = new StringBuilder(singleDigitNumber);
+ for (int i = 1; i <= total; i++) {
+ builder.append(singleDigitNumber);
+ }
+ return builder.toString().getBytes(UTF_8);
+ }
+
+ @After
+ public void close() throws IOException {
+ client.close();
+ }
+
+ @Test
+ public void testPutECKeyAndCheckDNStoredData() throws IOException {
+ OzoneBucket bucket = writeIntoECKey(inputChunks, keyName);
+ OzoneKey key = bucket.getKey(keyName);
+ Assert.assertEquals(keyName, key.getName());
+ Map<DatanodeDetails, MockDatanodeStorage> storages =
+ ((MockXceiverClientFactory) factoryStub).getStorages();
+ DatanodeDetails[] dnDetails =
+ storages.keySet().toArray(new DatanodeDetails[storages.size()]);
+ Arrays.sort(dnDetails);
+ for (int i = 0; i < inputChunks.length; i++) {
+ MockDatanodeStorage datanodeStorage = storages.get(dnDetails[i]);
+ Assert.assertEquals(1, datanodeStorage.getAllBlockData().size());
+ ByteString content =
+ datanodeStorage.getAllBlockData().values().iterator().next();
+ Assert.assertEquals(new String(inputChunks[i], UTF_8),
+ content.toStringUtf8());
+ }
+ }
+
+ @Test
+ public void testPutECKeyAndCheckParityData() throws IOException {
+ OzoneBucket bucket = writeIntoECKey(inputChunks, keyName);
+ final ByteBuffer[] dataBuffers = new ByteBuffer[3];
+ for (int i = 0; i < inputChunks.length; i++) {
+ dataBuffers[i] = ByteBuffer.wrap(inputChunks[i]);
+ }
+ final ByteBuffer[] parityBuffers = new ByteBuffer[parityBlocks];
+ for (int i = 0; i < parityBlocks; i++) {
+ parityBuffers[i] = ByteBuffer.allocate(1024);
+ }
+ encoder.encode(dataBuffers, parityBuffers);
+ OzoneKey key = bucket.getKey(keyName);
+ Assert.assertEquals(keyName, key.getName());
+ Map<DatanodeDetails, MockDatanodeStorage> storages =
+ ((MockXceiverClientFactory) factoryStub).getStorages();
+ DatanodeDetails[] dnDetails =
+ storages.keySet().toArray(new DatanodeDetails[storages.size()]);
+ Arrays.sort(dnDetails);
+
+ for (int i = dataBlocks; i < parityBlocks + dataBlocks; i++) {
+ MockDatanodeStorage datanodeStorage = storages.get(dnDetails[i]);
+ Assert.assertEquals(1, datanodeStorage.getAllBlockData().size());
+ ByteString content =
+ datanodeStorage.getAllBlockData().values().iterator().next();
+ Assert.assertEquals(
+ new String(parityBuffers[i - dataBlocks].array(), UTF_8),
+ content.toStringUtf8());
+ }
+
+ }
+
+ @Test
+ public void testPutECKeyAndReadContent() throws IOException {
+ OzoneBucket bucket = writeIntoECKey(inputChunks, keyName);
+ OzoneKey key = bucket.getKey(keyName);
+ Assert.assertEquals(keyName, key.getName());
+ try (OzoneInputStream is = bucket.readKey(keyName)) {
+ byte[] fileContent = new byte[1024];
+ Assert.assertEquals(inputChunks[0].length, is.read(fileContent));
+ Assert.assertEquals(new String(inputChunks[0], UTF_8),
+ new String(fileContent, UTF_8));
+ }
+
+ // Since EC read is not ready yet, let's use the regular read by
+ // tweaking the pipeline.
+ // Remove first node in EC pipeline. So, regular read will hit the
+ // first node in pipeline and assert for second chunk in EC data.
+ updatePipelineToKeepSingleNode(2);
+ try (OzoneInputStream is = bucket.readKey(keyName)) {
+ byte[] fileContent = new byte[1024];
+ Assert.assertEquals(inputChunks[1].length, is.read(fileContent));
+ Assert.assertEquals(new String(inputChunks[1], UTF_8),
+ new String(fileContent, UTF_8));
+ }
+
+ updatePipelineToKeepSingleNode(3);
+ try (OzoneInputStream is = bucket.readKey(keyName)) {
+ byte[] fileContent = new byte[1024];
+ Assert.assertEquals(inputChunks[2].length, is.read(fileContent));
+ Assert.assertEquals(new String(inputChunks[2], UTF_8),
+ new String(fileContent, UTF_8));
+ }
+ }
+
+ private OzoneBucket writeIntoECKey(byte[][] chunks, String key)
+ throws IOException {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ try (OzoneOutputStream out = bucket.createKey(key, 2000,
+ new ECReplicationConfig(dataBlocks, parityBlocks), new HashMap<>())) {
+ for (int i = 0; i < chunks.length; i++) {
+ out.write(chunks[i]);
+ }
+ }
+ return bucket;
+ }
+
+ private void updatePipelineToKeepSingleNode(int keepingNodeIndex) {
+ Map<String, Map<String, Map<String, OzoneManagerProtocolProtos.KeyInfo>>>
+ keys = ((MockOmTransport) transportStub).getKeys();
+ Map<String, Map<String, OzoneManagerProtocolProtos.KeyInfo>> vol =
+ keys.get(keys.keySet().iterator().next());
+
+ Map<String, OzoneManagerProtocolProtos.KeyInfo> buck =
+ vol.get(vol.keySet().iterator().next());
+ OzoneManagerProtocolProtos.KeyInfo keyInfo =
+ buck.get(buck.keySet().iterator().next());
+ HddsProtos.Pipeline.Builder builder =
+ HddsProtos.Pipeline.newBuilder().setFactor(keyInfo.getFactor())
+
.setType(keyInfo.getType()).setId(HddsProtos.PipelineID.newBuilder()
+ .setUuid128(HddsProtos.UUID.newBuilder().setLeastSigBits(1L)
+ .setMostSigBits(1L).build()).build());
+
+ // Keeping only the given position node in pipeline.
+ builder.addMembers(HddsProtos.DatanodeDetailsProto.newBuilder().setUuid128(
+ HddsProtos.UUID.newBuilder().setLeastSigBits(keepingNodeIndex)
+ .setMostSigBits(keepingNodeIndex).build()).setHostName("localhost")
+ .setIpAddress("1.2.3.4").addPorts(
+ HddsProtos.Port.newBuilder().setName("EC")
+ .setValue(1234 + keepingNodeIndex).build()).build());
+
+ HddsProtos.Pipeline pipeline = builder.build();
+ List<OzoneManagerProtocolProtos.KeyLocation> results = new ArrayList<>();
+ results.add(OzoneManagerProtocolProtos.KeyLocation.newBuilder()
+ .setPipeline(pipeline).setBlockID(
+ HddsProtos.BlockID.newBuilder().setBlockCommitSequenceId(1L)
+ .setContainerBlockID(
+ HddsProtos.ContainerBlockID.newBuilder().setContainerID(1L)
+ .setLocalID(0L).build()).build()).setOffset(0L)
+ .setLength(keyInfo.getDataSize()).build());
+
+ final OzoneManagerProtocolProtos.KeyInfo keyInfo1 =
+ OzoneManagerProtocolProtos.KeyInfo.newBuilder()
+ .setVolumeName(keyInfo.getVolumeName())
+ .setBucketName(keyInfo.getBucketName())
+ .setKeyName(keyInfo.getKeyName())
+ .setCreationTime(keyInfo.getCreationTime())
+ .setModificationTime(keyInfo.getModificationTime())
+ .setType(keyInfo.getType()).setFactor(keyInfo.getFactor())
+ .setDataSize(keyInfo.getDataSize()).setLatestVersion(0L)
+ .addKeyLocationList(
+ OzoneManagerProtocolProtos.KeyLocationList.newBuilder()
+ .addAllKeyLocations(results)).build();
+ buck.put(keyInfo.getKeyName(), keyInfo1);
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]