This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new ab923a3538 HDDS-6794. EC: Analyze and add putBlock even on non writing
node in the case of partial single stripe. (#3514)
ab923a3538 is described below
commit ab923a3538f10727631d1e044299965294eb6864
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Thu Jul 7 08:49:29 2022 -0700
HDDS-6794. EC: Analyze and add putBlock even on non writing node in the
case of partial single stripe. (#3514)
---
.../hadoop/hdds/scm/storage/BlockInputStream.java | 13 +++++++--
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 24 ++++++++++++----
.../hdds/scm/storage/ECBlockOutputStream.java | 1 +
.../hadoop/ozone/client/io/ECBlockInputStream.java | 4 ++-
.../hdds/scm/storage/ContainerProtocolCalls.java | 4 +--
.../container/common/impl/HddsDispatcher.java | 16 +++++++++--
.../container/keyvalue/KeyValueContainerCheck.java | 10 +++++--
.../ozone/container/keyvalue/KeyValueHandler.java | 6 +++-
.../replication/ECContainerReplicaCount.java | 3 +-
.../ozone/client/io/ECBlockOutputStreamEntry.java | 32 ++++++++++------------
.../hadoop/ozone/client/TestOzoneECClient.java | 7 ++++-
.../hdds/scm/storage/TestContainerCommandsEC.java | 7 +++--
.../ozone/client/rpc/TestECKeyOutputStream.java | 7 +++--
13 files changed, 92 insertions(+), 42 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 07a444a248..a0a210dd58 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -229,10 +229,17 @@ public class BlockInputStream extends
BlockExtendedInputStream {
blockID.getContainerID());
}
- DatanodeBlockID datanodeBlockID = blockID
- .getDatanodeBlockIDProtobuf();
+ DatanodeBlockID.Builder blkIDBuilder =
+ DatanodeBlockID.newBuilder().setContainerID(blockID.getContainerID())
+ .setLocalID(blockID.getLocalID())
+ .setBlockCommitSequenceId(blockID.getBlockCommitSequenceId());
+
+ int replicaIndex = pipeline.getReplicaIndex(pipeline.getClosestNode());
+ if (replicaIndex > 0) {
+ blkIDBuilder.setReplicaIndex(replicaIndex);
+ }
GetBlockResponseProto response = ContainerProtocolCalls
- .getBlock(xceiverClient, datanodeBlockID, token);
+ .getBlock(xceiverClient, blkIDBuilder.build(), token);
chunks = response.getBlockData().getChunksList();
success = true;
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index b86464c097..036b1007d7 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -140,17 +140,24 @@ public class BlockOutputStream extends OutputStream {
this.xceiverClientFactory = xceiverClientManager;
this.config = config;
this.blockID = new AtomicReference<>(blockID);
+ replicationIndex = pipeline.getReplicaIndex(pipeline.getClosestNode());
KeyValue keyValue =
KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build();
- this.containerBlockData =
- BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf())
- .addMetadata(keyValue);
+
+ ContainerProtos.DatanodeBlockID.Builder blkIDBuilder =
+ ContainerProtos.DatanodeBlockID.newBuilder()
+ .setContainerID(blockID.getContainerID())
+ .setLocalID(blockID.getLocalID())
+ .setBlockCommitSequenceId(blockID.getBlockCommitSequenceId());
+ if (replicationIndex > 0) {
+ blkIDBuilder.setReplicaIndex(replicationIndex);
+ }
+ this.containerBlockData = BlockData.newBuilder().setBlockID(
+ blkIDBuilder.build()).addMetadata(keyValue);
this.xceiverClient = xceiverClientManager.acquireClient(pipeline);
this.bufferPool = bufferPool;
this.token = token;
- replicationIndex = pipeline.getReplicaIndex(pipeline.getClosestNode());
-
//number of buffers used before doing a flush
refreshCurrentBuffer();
flushPeriod = (int) (config.getStreamBufferFlushSize() / config
@@ -268,11 +275,15 @@ public class BlockOutputStream extends OutputStream {
writeChunkIfNeeded();
off += writeLen;
len -= writeLen;
- writtenDataLength += writeLen;
+ updateWrittenDataLength(writeLen);
doFlushOrWatchIfNeeded();
}
}
+ public void updateWrittenDataLength(int writeLen) {
+ writtenDataLength += writeLen;
+ }
+
private void doFlushOrWatchIfNeeded() throws IOException {
if (currentBufferRemaining == 0) {
if (bufferPool.getNumberOfUsedBuffers() % flushPeriod == 0) {
@@ -611,6 +622,7 @@ public class BlockOutputStream extends OutputStream {
if (ioe == null) {
IOException exception = new IOException(EXCEPTION_MSG + e.toString(),
e);
ioException.compareAndSet(null, exception);
+ LOG.debug("Exception: for block ID: " + blockID, e);
} else {
LOG.debug("Previous request had already failed with {} " +
"so subsequent request also encounters " +
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index 174e507829..9f6bf05950 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -78,6 +78,7 @@ public class ECBlockOutputStream extends BlockOutputStream {
public void write(byte[] b, int off, int len) throws IOException {
this.currentChunkRspFuture =
writeChunkToContainer(ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)));
+ updateWrittenDataLength(len);
}
public CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
write(
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index 89be7839ac..40d454a0a1 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.client.io;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
@@ -167,7 +168,8 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
.setReplicationConfig(StandaloneReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.ONE))
.setNodes(Arrays.asList(dataLocations[locationIndex]))
- .setId(PipelineID.randomId())
+ .setId(PipelineID.randomId()).setReplicaIndexes(
+ ImmutableMap.of(dataLocations[locationIndex], locationIndex + 1))
.setState(Pipeline.PipelineState.CLOSED)
.build();
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 16bef69712..e024d79b9a 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -229,8 +229,8 @@ public final class ContainerProtocolCalls {
* @throws InterruptedException
* @throws ExecutionException
*/
- public static XceiverClientReply putBlockAsync(
- XceiverClientSpi xceiverClient, BlockData containerBlockData, boolean
eof,
+ public static XceiverClientReply putBlockAsync(XceiverClientSpi
xceiverClient,
+ BlockData containerBlockData, boolean eof,
Token<? extends TokenIdentifier> token)
throws IOException, InterruptedException, ExecutionException {
PutBlockRequestProto.Builder createBlockRequest =
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 802104a171..60bb8e59bf 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -250,9 +250,14 @@ public class HddsDispatcher implements
ContainerDispatcher, Auditor {
/**
* Create Container should happen only as part of Write_Data phase of
* writeChunk.
+ * In EC, we are doing empty putBlock. In the partial stripe writes, if
+ * file size is less than chunkSize*(ECData-1), we are making empty block
+ * to get the container created in non writing nodes. If replica index is
+ * >0 then we know it's for ec container.
*/
if (container == null && ((isWriteStage || isCombinedStage)
- || cmdType == Type.PutSmallFile)) {
+ || cmdType == Type.PutSmallFile
+ || cmdType == Type.PutBlock)) {
// If container does not exist, create one for WriteChunk and
// PutSmallFile request
responseProto = createContainer(msg);
@@ -264,7 +269,8 @@ public class HddsDispatcher implements ContainerDispatcher,
Auditor {
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
Preconditions.checkArgument(isWriteStage && container2BCSIDMap != null
- || dispatcherContext == null);
+ || dispatcherContext == null
+ || cmdType == Type.PutBlock);
if (container2BCSIDMap != null) {
// adds this container to list of containers created in the pipeline
// with initial BCSID recorded as 0.
@@ -416,6 +422,12 @@ public class HddsDispatcher implements
ContainerDispatcher, Auditor {
containerRequest.getWriteChunk().getBlockID().getReplicaIndex());
}
+ if (containerRequest.hasPutBlock()) {
+ createRequest.setReplicaIndex(
+ containerRequest.getPutBlock().getBlockData().getBlockID()
+ .getReplicaIndex());
+ }
+
ContainerCommandRequestProto.Builder requestBuilder =
ContainerCommandRequestProto.newBuilder()
.setCmdType(Type.CreateContainer)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
index c560aabbe6..90c392bc01 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
@@ -250,9 +250,13 @@ public class KeyValueContainerCheck {
BlockData bdata = db.getStore()
.getBlockDataTable()
.get(blockKey);
- if (bdata != null) {
- throw new IOException("Missing chunk file "
- + chunkFile.getAbsolutePath());
+ // In EC, client may write empty putBlock in padding block nodes.
+ // So, we need to make sure, chunk length > 0, before declaring
+ // the missing chunk file.
+ if (bdata != null && bdata.getChunks().size() > 0 && bdata
+ .getChunks().get(0).getLen() > 0) {
+ throw new IOException(
+ "Missing chunk file " + chunkFile.getAbsolutePath());
}
} else if (chunk.getChecksumData().getType()
!= ContainerProtos.ChecksumType.NONE) {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 9590711650..1c79683326 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -465,7 +465,11 @@ public class KeyValueHandler extends Handler {
boolean endOfBlock = false;
if (!request.getPutBlock().hasEof() || request.getPutBlock().getEof()) {
- chunkManager.finishWriteChunks(kvContainer, blockData);
+ // in EC, we will be doing empty put block. So, there may not be dat
+ // a available. So, let's flush only when data size is > 0.
+ if (request.getPutBlock().getBlockData().getSize() > 0) {
+ chunkManager.finishWriteChunks(kvContainer, blockData);
+ }
endOfBlock = true;
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerReplicaCount.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerReplicaCount.java
index ff0a75053a..f62afc8b77 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerReplicaCount.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECContainerReplicaCount.java
@@ -423,7 +423,8 @@ public class ECContainerReplicaCount implements
ContainerReplicaCount {
if (index < 1 || index > repConfig.getRequiredNodes()) {
throw new IllegalArgumentException("Replica Index in " + setName
+ " for containerID " + containerInfo.getContainerID()
- + "must be between 1 and " + repConfig.getRequiredNodes());
+ + "must be between 1 and " + repConfig.getRequiredNodes()
+ + ". But the given index is: " + index);
}
}
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index 99cebf8217..13c6a8647a 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -90,26 +90,17 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry {
if (!isInitialized()) {
blockOutputStreams =
new ECBlockOutputStream[replicationConfig.getRequiredNodes()];
- }
- if (blockOutputStreams[currentStreamIdx] == null) {
- createOutputStream();
+ for (int i = currentStreamIdx; i < replicationConfig
+ .getRequiredNodes(); i++) {
+ List<DatanodeDetails> nodes = getPipeline().getNodes();
+ blockOutputStreams[i] =
+ new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
+ createSingleECBlockPipeline(getPipeline(), nodes.get(i), i +
1),
+ getBufferPool(), getConf(), getToken());
+ }
}
}
- @Override
- void createOutputStream() throws IOException {
- Pipeline ecPipeline = getPipeline();
- List<DatanodeDetails> nodes = getPipeline().getNodes();
- blockOutputStreams[currentStreamIdx] = new ECBlockOutputStream(
- getBlockID(),
- getXceiverClientManager(),
- createSingleECBlockPipeline(
- ecPipeline, nodes.get(currentStreamIdx), currentStreamIdx + 1),
- getBufferPool(),
- getConf(),
- getToken());
- }
-
@Override
public OutputStream getOutputStream() {
if (!isInitialized()) {
@@ -318,6 +309,13 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry {
List<ECBlockOutputStream> failedStreams = new ArrayList<>();
while (iter.hasNext()) {
final ECBlockOutputStream stream = iter.next();
+ if (!forPutBlock && stream.getWrittenDataLength() <= 0) {
+ // If we did not write any data to this stream yet, let's not consider
+ // for failure checking. But we should do failure checking for putBlock
+ // though. In the case of padding stripes, we do send empty put blocks
+ // for creating empty containers at DNs ( Refer: HDDS-6794).
+ continue;
+ }
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
responseFuture = null;
if (forPutBlock) {
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index 3438e3f2f9..5003f8e5e6 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -384,7 +384,9 @@ public class TestOzoneECClient {
keyDetails.getOzoneKeyLocations().get(0).getContainerID())
.setLocalID(
keyDetails.getOzoneKeyLocations().get(0).getLocalID())
- .setBlockCommitSequenceId(1).build());
+ .setBlockCommitSequenceId(1).setReplicaIndex(
+ blockList.getKeyLocations(0).getPipeline()
+ .getMemberReplicaIndexes(i)).build());
List<ContainerProtos.KeyValue> metadataList =
block.getMetadataList().stream().filter(kv -> kv.getKey()
@@ -560,6 +562,9 @@ public class TestOzoneECClient {
OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT);
int dataBlks = 10;
int parityBlks = 4;
+ MultiNodePipelineBlockAllocator blkAllocator =
+ new MultiNodePipelineBlockAllocator(conf, dataBlks + parityBlks, 14);
+ createNewClient(conf, blkAllocator);
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
index e57ec400cc..0b037af8c1 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
@@ -230,9 +230,12 @@ public class TestContainerCommandsEC {
continue;
}
ListBlockResponseProto response = ContainerProtocolCalls
- .listBlock(clients.get(i), containerID, null, numExpectedBlocks + 1,
+ .listBlock(clients.get(i), containerID, null, Integer.MAX_VALUE,
containerToken);
- Assertions.assertEquals(numExpectedBlocks, response.getBlockDataCount(),
+ Assertions.assertEquals(numExpectedBlocks,
+ response.getBlockDataList().stream().filter(
+ k -> k.getChunksCount() > 0 && k.getChunks(0).getLen() > 0)
+ .collect(Collectors.toList()).size(),
"blocks count doesn't match on DN " + i);
Assertions.assertEquals(numExpectedChunks,
response.getBlockDataList().stream()
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
index 948c481435..4f0a091771 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
@@ -292,7 +292,7 @@ public class TestECKeyOutputStream {
}
@Test
- public void testECContainerKeysCount()
+ public void testECContainerKeysCountAndNumContainerReplicas()
throws IOException, InterruptedException, TimeoutException {
byte[] inputData = getInputBytes(1);
final OzoneBucket bucket = getOzoneBucket();
@@ -320,8 +320,9 @@ public class TestECKeyOutputStream {
GenericTestUtils.waitFor(() -> {
try {
- return containerOperationClient.getContainer(currentKeyContainerID)
- .getNumberOfKeys() == 1;
+ return (containerOperationClient.getContainer(currentKeyContainerID)
+ .getNumberOfKeys() == 1) && (containerOperationClient
+ .getContainerReplicas(currentKeyContainerID).size() == 5);
} catch (IOException exception) {
Assert.fail("Unexpected exception " + exception);
return false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]