This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new 60a41bc HDDS-6249. EC: Fix todo items in TestECKeyOutputStream (#3102)
60a41bc is described below
commit 60a41bc85e1657344a2b2029ad21e7fab741b711
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Thu Feb 17 22:55:16 2022 +0000
HDDS-6249. EC: Fix todo items in TestECKeyOutputStream (#3102)
---
.../SCMContainerPlacementRackScatter.java | 39 +++++++-------
.../hadoop/ozone/client/io/ECKeyOutputStream.java | 1 -
.../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 25 +++++----
.../ozone/client/rpc/TestECKeyOutputStream.java | 62 ++++++++++------------
4 files changed, 63 insertions(+), 64 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
index 7915a5b..8503662 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
@@ -242,30 +242,26 @@ public final class SCMContainerPlacementRackScatter
while (true) {
metrics.incrDatanodeChooseAttemptCount();
Node node = networkTopology.chooseRandom(scope, excludedNodes);
- if (node == null) {
- // cannot find the node which meets all constrains
- LOG.warn("Failed to find the datanode for container. excludedNodes:" +
- (excludedNodes == null ? "" : excludedNodes.toString()) +
- ", rack:" + scope);
- return null;
- }
-
- DatanodeDetails datanodeDetails = (DatanodeDetails)node;
- if (isValidNode(datanodeDetails, metadataSizeRequired,
- dataSizeRequired)) {
- metrics.incrDatanodeChooseSuccessCount();
- return node;
+ if (node != null) {
+ DatanodeDetails datanodeDetails = (DatanodeDetails) node;
+ if (isValidNode(datanodeDetails, metadataSizeRequired,
+ dataSizeRequired)) {
+ metrics.incrDatanodeChooseSuccessCount();
+ return node;
+ }
+ // exclude the unavailable node for the following retries.
+ excludedNodes.add(node);
+ } else {
+ LOG.debug("Failed to find the datanode for container. excludedNodes: "
+
+ "{}, rack {}", excludedNodes, scope);
}
- // exclude the unavailable node for the following retries.
- excludedNodes.add(node);
-
maxRetry--;
if (maxRetry == 0) {
// avoid the infinite loop
- String errMsg = "No satisfied datanode to meet the space constrains. "
- + "metadatadata size required: " + metadataSizeRequired +
- " data size required: " + dataSizeRequired;
- LOG.info(errMsg);
+ LOG.info("No satisfied datanode to meet the constraints. "
+ + "Metadatadata size required: {} Data size required: {}, scope "
+ + "{}, excluded nodes {}",
+ metadataSizeRequired, dataSizeRequired, scope, excludedNodes);
return null;
}
}
@@ -304,6 +300,9 @@ public final class SCMContainerPlacementRackScatter
List<DatanodeDetails> excludedNodes) {
Set<Node> lessPreferredRacks = excludedNodes.stream()
.map(node -> networkTopology.getAncestor(node, RACK_LEVEL))
+ // Dead Nodes have been removed from the topology and so have a
+ // null rack. We need to exclude those from the rack list.
+ .filter(node -> node != null)
.collect(Collectors.toSet());
List <Node> result = new ArrayList<>();
for (Node rack : racks) {
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index b6836af..8f4330e 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -532,7 +532,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
closeCurrentStreamEntry();
Preconditions.checkArgument(writeOffset == offset);
- blockOutputStreamEntryPool.getCurrentStreamEntry().close();
blockOutputStreamEntryPool.commitKey(offset);
} finally {
blockOutputStreamEntryPool.cleanup();
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 919b9b2..ec23cbb 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -48,6 +48,8 @@ import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.ha.SCMHANodeDetails;
import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.proxy.SCMClientConfig;
@@ -364,16 +366,21 @@ public class MiniOzoneClusterImpl implements
MiniOzoneCluster {
startRecon();
}
- private void waitForHddsDatanodesStop() throws TimeoutException,
- InterruptedException {
+ private void waitForHddsDatanodeToStop(DatanodeDetails dn)
+ throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> {
- final int healthy = scm.getNodeCount(HEALTHY);
- boolean isReady = healthy == hddsDatanodes.size();
- if (!isReady) {
- LOG.info("Waiting on {} datanodes out of {} to be marked unhealthy.",
- healthy, hddsDatanodes.size());
+ NodeStatus status;
+ try {
+ status = scm.getScmNodeManager().getNodeStatus(dn);
+ } catch (NodeNotFoundException e) {
+ return true;
+ }
+ if (status.equals(NodeStatus.inServiceHealthy())) {
+ LOG.info("Waiting on datanode to be marked stale.");
+ return false;
+ } else {
+ return true;
}
- return isReady;
}, 1000, waitForClusterToBeReadyTimeout);
}
@@ -398,7 +405,7 @@ public class MiniOzoneClusterImpl implements
MiniOzoneCluster {
hddsDatanodes.remove(i);
if (waitForDatanode) {
// wait for node to be removed from SCM healthy node list.
- waitForHddsDatanodesStop();
+ waitForHddsDatanodeToStop(datanodeService.getDatanodeDetails());
}
String[] args = new String[] {};
HddsDatanodeService service =
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
index c3f0765..610a34c 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
@@ -28,8 +28,8 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.BucketArgs;
@@ -54,14 +54,13 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
/**
@@ -103,7 +102,8 @@ public class TestECKeyOutputStream {
// closed state and all in progress writes will get exception. To avoid
// that, we are just keeping higher timeout and none of the tests depending
// on deadnode detection timeout currently.
- conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 300, TimeUnit.SECONDS);
+ conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
+ conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 60, TimeUnit.SECONDS);
conf.setTimeDuration("hdds.ratis.raft.server.rpc.slowness.timeout", 300,
TimeUnit.SECONDS);
conf.setTimeDuration(
@@ -248,31 +248,30 @@ public class TestECKeyOutputStream {
final OzoneBucket bucket = getOzoneBucket();
ContainerOperationClient containerOperationClient =
new ContainerOperationClient(conf);
- List<ContainerInfo> containerInfos =
- containerOperationClient.listContainer(1, 100);
- Map<ContainerID, Long> containerKeys = new HashMap<>();
- for (ContainerInfo info : containerInfos) {
- containerKeys.put(info.containerID(),
- containerKeys.getOrDefault(info.containerID(), 0L) + 1);
+
+ ECReplicationConfig repConfig = new ECReplicationConfig(
+ 3, 2, ECReplicationConfig.EcCodec.RS, chunkSize);
+ // Close all EC pipelines so we must get a fresh pipeline and hence
+ // container for this test.
+ PipelineManager pm =
+ cluster.getStorageContainerManager().getPipelineManager();
+ for (Pipeline p : pm.getPipelines(repConfig)) {
+ pm.closePipeline(p, true);
}
String keyName = UUID.randomUUID().toString();
try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
- new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
- chunkSize), new HashMap<>())) {
+ repConfig, new HashMap<>())) {
out.write(inputData);
}
OzoneKeyDetails key = bucket.getKey(keyName);
long currentKeyContainerID =
key.getOzoneKeyLocations().get(0).getContainerID();
- Long priorKeys = containerKeys.get(new ContainerID(currentKeyContainerID));
- long expectedKeys = (priorKeys != null ? priorKeys : 0) + 1L;
GenericTestUtils.waitFor(() -> {
try {
- return containerOperationClient
- .listContainer(currentKeyContainerID, 100).get(0)
- .getNumberOfKeys() == expectedKeys;
+ return containerOperationClient.getContainer(currentKeyContainerID)
+ .getNumberOfKeys() == 1;
} catch (IOException exception) {
Assert.fail("Unexpected exception " + exception);
return false;
@@ -324,13 +323,14 @@ public class TestECKeyOutputStream {
byte[] inputData = getInputBytes(numChunks);
final OzoneBucket bucket = getOzoneBucket();
String keyName = "testWriteShouldSucceedWhenDNKilled" + numChunks;
+ DatanodeDetails nodeToKill = null;
try {
try (OzoneOutputStream out = bucket.createKey(keyName, 1024,
new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
chunkSize), new HashMap<>())) {
out.write(inputData);
// Kill a node from first pipeline
- DatanodeDetails nodeToKill =
+ nodeToKill =
((ECKeyOutputStream) out.getOutputStream()).getStreamEntries()
.get(0).getPipeline().getFirstNode();
cluster.shutdownHddsDatanode(nodeToKill);
@@ -344,26 +344,20 @@ public class TestECKeyOutputStream {
}
try (OzoneInputStream is = bucket.readKey(keyName)) {
- // TODO: this skip can be removed once read handles online recovery.
- long skip = is.skip(inputData.length);
- Assert.assertTrue(skip == inputData.length);
- // All nodes available in second block group. So, lets assert.
- byte[] fileContent = new byte[inputData.length];
- Assert.assertEquals(inputData.length, is.read(fileContent));
- Assert.assertEquals(new String(inputData, UTF_8),
- new String(fileContent, UTF_8));
+ // We wrote "inputData" twice, so do two reads and ensure the correct
+ // data comes back.
+ for (int i = 0; i < 2; i++) {
+ byte[] fileContent = new byte[inputData.length];
+ Assert.assertEquals(inputData.length, is.read(fileContent));
+ Assert.assertEquals(new String(inputData, UTF_8),
+ new String(fileContent, UTF_8));
+ }
}
} finally {
- // TODO: optimize to just start the killed DN back.
- resetCluster();
+ cluster.restartHddsDatanode(nodeToKill, true);
}
}
- private void resetCluster() throws Exception {
- cluster.shutdown();
- init();
- }
-
private byte[] getInputBytes(int numChunks) {
byte[] inputData = new byte[numChunks * chunkSize];
for (int i = 0; i < numChunks; i++) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]