This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new dfa8141 HDDS-6237: EC: putBlock should pass close flag true on end of
block group/close file. (#3026)
dfa8141 is described below
commit dfa814193706de44a18c1eb1764aecd3e1de0ed9
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Wed Feb 2 10:22:39 2022 -0800
HDDS-6237: EC: putBlock should pass close flag true on end of block
group/close file. (#3026)
Co-authored-by: Uma Maheswara Rao G <[email protected]>
---
.../ozone/client/io/ECBlockOutputStreamEntry.java | 4 +-
.../hadoop/ozone/client/io/ECKeyOutputStream.java | 32 +++++++------
.../ozone/client/rpc/TestECKeyOutputStream.java | 54 +++++++++++++++++++++-
3 files changed, 72 insertions(+), 18 deletions(-)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index 69c3735..1f9d82c 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -265,7 +265,7 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry{
.build();
}
- void executePutBlock() throws IOException {
+ void executePutBlock(boolean isClose) {
if (!isInitialized()) {
return;
}
@@ -274,7 +274,7 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry{
continue;
}
try {
- stream.executePutBlock(false, true);
+ stream.executePutBlock(isClose, true);
} catch (Exception e) {
stream.setIoException(e);
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 1e99210..db51a65 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -199,8 +199,8 @@ public class ECKeyOutputStream extends KeyOutputStream {
writeOffset += len;
}
- private StripeWriteStatus rewriteStripeToNewBlockGroup(int chunkSize,
- int failedStripeDataSize, boolean allocateBlockIfFull)
+ private StripeWriteStatus rewriteStripeToNewBlockGroup(
+ int failedStripeDataSize, boolean allocateBlockIfFull, boolean close)
throws IOException {
long[] failedDataStripeChunkLens = new long[numDataBlks];
long[] failedParityStripeChunkLens = new long[numParityBlks];
@@ -250,7 +250,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
if (hasWriteFailure()) {
return StripeWriteStatus.FAILED;
}
- currentStreamEntry.executePutBlock();
+ currentStreamEntry.executePutBlock(close);
if (hasPutBlockFailure()) {
return StripeWriteStatus.FAILED;
@@ -285,10 +285,11 @@ public class ECKeyOutputStream extends KeyOutputStream {
int currentStreamIdx = currentStreamEntry.getCurrentStreamIdx();
if (currentStreamIdx == numDataBlks && lastDataBuffPos == ecChunkSize) {
//Lets encode and write
- if (handleParityWrites(ecChunkSize,
- allocateBlockIfFull) == StripeWriteStatus.FAILED) {
- handleStripeFailure(numDataBlks * ecChunkSize, ecChunkSize,
- allocateBlockIfFull);
+ boolean shouldClose = currentStreamEntry.getRemaining() <= 0;
+ if (handleParityWrites(ecChunkSize, allocateBlockIfFull,
+ shouldClose) == StripeWriteStatus.FAILED) {
+ handleStripeFailure(numDataBlks * ecChunkSize, allocateBlockIfFull,
+ shouldClose);
} else {
// At this stage stripe write is successful.
currentStreamEntry.updateBlockGroupToAckedPosition(
@@ -299,7 +300,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
}
private StripeWriteStatus handleParityWrites(int parityCellSize,
- boolean allocateBlockIfFull)
+ boolean allocateBlockIfFull, boolean isLastStripe)
throws IOException {
writeParityCells(parityCellSize);
if (hasWriteFailure()) {
@@ -311,7 +312,8 @@ public class ECKeyOutputStream extends KeyOutputStream {
// TODO: we should alter the put block calls to share CRC to each stream.
ECBlockOutputStreamEntry streamEntry =
blockOutputStreamEntryPool.getCurrentStreamEntry();
- streamEntry.executePutBlock();
+ streamEntry
+ .executePutBlock(isLastStripe);
if (hasPutBlockFailure()) {
return StripeWriteStatus.FAILED;
@@ -528,8 +530,8 @@ public class ECKeyOutputStream extends KeyOutputStream {
lastStripeSize < ecChunkSize ? lastStripeSize : ecChunkSize;
addPadding(parityCellSize);
if (handleParityWrites(parityCellSize,
- false) == StripeWriteStatus.FAILED) {
- handleStripeFailure(lastStripeSize, parityCellSize, false);
+ false, true) == StripeWriteStatus.FAILED) {
+ handleStripeFailure(lastStripeSize, false, true);
} else {
blockOutputStreamEntryPool.getCurrentStreamEntry()
.updateBlockGroupToAckedPosition(
@@ -549,14 +551,14 @@ public class ECKeyOutputStream extends KeyOutputStream {
ecChunkBufferCache.release();
}
- private void handleStripeFailure(int lastStripeSize, int parityCellSize,
- boolean allocateBlockIfFull)
+ private void handleStripeFailure(int lastStripeSize,
+ boolean allocateBlockIfFull, boolean isClose)
throws IOException {
StripeWriteStatus stripeWriteStatus;
for (int i = 0; i < this.config.getMaxECStripeWriteRetries(); i++) {
stripeWriteStatus =
- rewriteStripeToNewBlockGroup(parityCellSize, lastStripeSize,
- allocateBlockIfFull);
+ rewriteStripeToNewBlockGroup(lastStripeSize,
+ allocateBlockIfFull, isClose);
if (stripeWriteStatus == StripeWriteStatus.SUCCESS) {
return;
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
index 73e02a5..cc4c0ae 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.client.rpc;
import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
@@ -26,6 +27,9 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.BucketArgs;
@@ -34,12 +38,14 @@ import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneKey;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.TestHelper;
+import org.apache.ozone.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -48,8 +54,11 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
@@ -103,7 +112,10 @@ public class TestECKeyOutputStream {
conf.setQuietMode(false);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
StorageUnit.MB);
-
+ conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 500,
+ TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1,
+ TimeUnit.SECONDS);
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(10)
.setTotalPipelineNumLimit(10).setBlockSize(blockSize)
.setChunkSize(chunkSize).setStreamBufferFlushSize(flushSize)
@@ -229,6 +241,46 @@ public class TestECKeyOutputStream {
validateContent(inputData, bucket, bucket.getKey(keyName));
}
+ @Test
+ public void testECContainerKeysCount()
+ throws IOException, InterruptedException, TimeoutException {
+ byte[] inputData = getInputBytes(1);
+ final OzoneBucket bucket = getOzoneBucket();
+ ContainerOperationClient containerOperationClient =
+ new ContainerOperationClient(conf);
+ List<ContainerInfo> containerInfos =
+ containerOperationClient.listContainer(1, 100);
+ Map<ContainerID, Long> containerKeys = new HashMap<>();
+ for (ContainerInfo info : containerInfos) {
+ containerKeys.put(info.containerID(),
+ containerKeys.getOrDefault(info.containerID(), 0L) + 1);
+ }
+
+ String keyName = UUID.randomUUID().toString();
+ try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
+ new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ chunkSize), new HashMap<>())) {
+ out.write(inputData);
+ }
+ OzoneKeyDetails key = bucket.getKey(keyName);
+ long currentKeyContainerID =
+ key.getOzoneKeyLocations().get(0).getContainerID();
+ Long priorKeys = containerKeys.get(new ContainerID(currentKeyContainerID));
+ long expectedKeys = (priorKeys != null ? priorKeys : 0) + 1L;
+
+ GenericTestUtils.waitFor(() -> {
+ try {
+ return containerOperationClient
+ .listContainer(currentKeyContainerID, 100).get(0)
+ .getNumberOfKeys() == expectedKeys;
+ } catch (IOException exception) {
+ Assert.fail("Unexpected exception " + exception);
+ return false;
+ }
+ }, 100, 10000);
+ validateContent(inputData, bucket, key);
+ }
+
private void validateContent(byte[] inputData, OzoneBucket bucket,
OzoneKey key) throws IOException {
try (OzoneInputStream is = bucket.readKey(key.getName())) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]