umamaheswararao commented on code in PR #3939:
URL: https://github.com/apache/ozone/pull/3939#discussion_r1021929352
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java:
##########
@@ -81,6 +81,14 @@ public void setRecoveringTimeout(long recoveringTimeout) {
this.recoveringTimeout = recoveringTimeout;
}
+ @VisibleForTesting
+ public long getRecoveringTimeout() {
+ return recoveringTimeout;
+ }
+
Review Comment:
Please remove the below unnecessary lines
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java:
##########
@@ -239,6 +254,96 @@ public void
testContainerRecoveryOverReplicationProcessing()
waitForContainerCount(5, container.containerID(), scm);
}
+ @Test
+ public void testECContainerRecoveryWithTimedOutRecovery() throws Exception {
+ byte[] inputData = getInputBytes(3);
+ final OzoneBucket bucket = getOzoneBucket();
+ String keyName = UUID.randomUUID().toString();
+ final Pipeline pipeline;
+ ECReplicationConfig repConfig =
+ new ECReplicationConfig(3, 2,
+ ECReplicationConfig.EcCodec.RS, chunkSize);
+ try (OzoneOutputStream out = bucket
+ .createKey(keyName, 1024, repConfig, new HashMap<>())) {
+ out.write(inputData);
+ pipeline = ((ECKeyOutputStream) out.getOutputStream())
+ .getStreamEntries().get(0).getPipeline();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ List<ContainerInfo> containers =
+ cluster.getStorageContainerManager().getContainerManager()
+ .getContainers();
+ ContainerInfo container = null;
+ for (ContainerInfo info : containers) {
+ if (info.getPipelineID().getId().equals(pipeline.getId().getId())) {
+ container = info;
+ }
+ }
+ StorageContainerManager scm = cluster.getStorageContainerManager();
+ AtomicReference<HddsDatanodeService> reconstructedDN =
+ new AtomicReference<>();
+ ContainerInfo finalContainer = container;
+ Map<HddsDatanodeService, Long> recoveryTimeoutMap = new HashMap<>();
+ for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
+ recoveryTimeoutMap.put(dn, dn.getDatanodeStateMachine().getContainer()
+ .getContainerSet().getRecoveringTimeout());
+ dn.getDatanodeStateMachine().getContainer()
+ .getContainerSet().setRecoveringTimeout(100);
+
+ ECReconstructionSupervisor ecReconstructionSupervisor =
+ GenericTestUtils.getFieldReflection(dn.getDatanodeStateMachine(),
+ "ecReconstructionSupervisor");
+ ECReconstructionCoordinator coordinator = GenericTestUtils
+ .mockFieldReflection(ecReconstructionSupervisor,
+ "reconstructionCoordinator");
+
+ Mockito.doAnswer(invocation -> {
+ GenericTestUtils.waitFor(() ->
+ dn.getDatanodeStateMachine()
+ .getContainer()
+ .getContainerSet()
+ .getContainer(finalContainer.getContainerID())
+ .getContainerState() ==
+ ContainerProtos.ContainerDataProto.State.UNHEALTHY,
+ 1000, 100000);
+ reconstructedDN.set(dn);
+ invocation.callRealMethod();
+ return null;
+ }).when(coordinator).reconstructECBlockGroup(Mockito.any(),
Mockito.any(),
+ Mockito.any());
+ }
+
Review Comment:
Refer: TestContainerCommandsEC#testCreateRecoveryContainer
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java:
##########
@@ -81,6 +81,14 @@ public void setRecoveringTimeout(long recoveringTimeout) {
this.recoveringTimeout = recoveringTimeout;
}
+ @VisibleForTesting
+ public long getRecoveringTimeout() {
+ return recoveringTimeout;
+ }
+
Review Comment:
Isn't this time out configured and take from configuration?
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/StaleRecoveringContainerScrubbingService.java:
##########
@@ -84,9 +84,8 @@ static class RecoveringContainerScrubbingTask implements
BackgroundTask {
@Override
public BackgroundTaskResult call() throws Exception {
- containerSet.getContainer(containerID).delete();
- containerSet.removeContainer(containerID);
- LOG.info("Delete stale recovering container {}", containerID);
+ containerSet.getContainer(containerID).markContainerUnhealthy();
Review Comment:
@errose28 could you please check if you have any concerns if any container
moved to unhealthy randomly.
This container was in RECOVERING state and moved to unhealthy.
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java:
##########
@@ -239,6 +254,96 @@ public void
testContainerRecoveryOverReplicationProcessing()
waitForContainerCount(5, container.containerID(), scm);
}
+ @Test
+ public void testECContainerRecoveryWithTimedOutRecovery() throws Exception {
+ byte[] inputData = getInputBytes(3);
+ final OzoneBucket bucket = getOzoneBucket();
+ String keyName = UUID.randomUUID().toString();
+ final Pipeline pipeline;
+ ECReplicationConfig repConfig =
+ new ECReplicationConfig(3, 2,
+ ECReplicationConfig.EcCodec.RS, chunkSize);
+ try (OzoneOutputStream out = bucket
+ .createKey(keyName, 1024, repConfig, new HashMap<>())) {
+ out.write(inputData);
+ pipeline = ((ECKeyOutputStream) out.getOutputStream())
+ .getStreamEntries().get(0).getPipeline();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ List<ContainerInfo> containers =
+ cluster.getStorageContainerManager().getContainerManager()
+ .getContainers();
+ ContainerInfo container = null;
+ for (ContainerInfo info : containers) {
+ if (info.getPipelineID().getId().equals(pipeline.getId().getId())) {
+ container = info;
+ }
+ }
+ StorageContainerManager scm = cluster.getStorageContainerManager();
+ AtomicReference<HddsDatanodeService> reconstructedDN =
+ new AtomicReference<>();
+ ContainerInfo finalContainer = container;
+ Map<HddsDatanodeService, Long> recoveryTimeoutMap = new HashMap<>();
+ for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
+ recoveryTimeoutMap.put(dn, dn.getDatanodeStateMachine().getContainer()
+ .getContainerSet().getRecoveringTimeout());
+ dn.getDatanodeStateMachine().getContainer()
+ .getContainerSet().setRecoveringTimeout(100);
+
+ ECReconstructionSupervisor ecReconstructionSupervisor =
+ GenericTestUtils.getFieldReflection(dn.getDatanodeStateMachine(),
+ "ecReconstructionSupervisor");
+ ECReconstructionCoordinator coordinator = GenericTestUtils
+ .mockFieldReflection(ecReconstructionSupervisor,
+ "reconstructionCoordinator");
+
+ Mockito.doAnswer(invocation -> {
+ GenericTestUtils.waitFor(() ->
+ dn.getDatanodeStateMachine()
+ .getContainer()
+ .getContainerSet()
+ .getContainer(finalContainer.getContainerID())
+ .getContainerState() ==
+ ContainerProtos.ContainerDataProto.State.UNHEALTHY,
+ 1000, 100000);
+ reconstructedDN.set(dn);
+ invocation.callRealMethod();
+ return null;
+ }).when(coordinator).reconstructECBlockGroup(Mockito.any(),
Mockito.any(),
+ Mockito.any());
+ }
+
Review Comment:
So, basically below reconstruction trigger has nothing to do with Unhealthy
containers above. Basically you are just testing lower timeout moving container
to unhealthy, instead of deleting.
I am wondering if we can create a recovering container explicitly and make
scrubber to trigger and convert the container unhealthy? then if you write
bytes into that containers, that should not move the container with out
replicaIndex.
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java:
##########
@@ -239,6 +254,96 @@ public void
testContainerRecoveryOverReplicationProcessing()
waitForContainerCount(5, container.containerID(), scm);
}
+ @Test
+ public void testECContainerRecoveryWithTimedOutRecovery() throws Exception {
+ byte[] inputData = getInputBytes(3);
+ final OzoneBucket bucket = getOzoneBucket();
+ String keyName = UUID.randomUUID().toString();
+ final Pipeline pipeline;
+ ECReplicationConfig repConfig =
+ new ECReplicationConfig(3, 2,
+ ECReplicationConfig.EcCodec.RS, chunkSize);
+ try (OzoneOutputStream out = bucket
+ .createKey(keyName, 1024, repConfig, new HashMap<>())) {
+ out.write(inputData);
+ pipeline = ((ECKeyOutputStream) out.getOutputStream())
+ .getStreamEntries().get(0).getPipeline();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ List<ContainerInfo> containers =
+ cluster.getStorageContainerManager().getContainerManager()
+ .getContainers();
+ ContainerInfo container = null;
+ for (ContainerInfo info : containers) {
+ if (info.getPipelineID().getId().equals(pipeline.getId().getId())) {
+ container = info;
+ }
+ }
+ StorageContainerManager scm = cluster.getStorageContainerManager();
+ AtomicReference<HddsDatanodeService> reconstructedDN =
+ new AtomicReference<>();
+ ContainerInfo finalContainer = container;
+ Map<HddsDatanodeService, Long> recoveryTimeoutMap = new HashMap<>();
+ for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
+ recoveryTimeoutMap.put(dn, dn.getDatanodeStateMachine().getContainer()
+ .getContainerSet().getRecoveringTimeout());
+ dn.getDatanodeStateMachine().getContainer()
+ .getContainerSet().setRecoveringTimeout(100);
+
+ ECReconstructionSupervisor ecReconstructionSupervisor =
+ GenericTestUtils.getFieldReflection(dn.getDatanodeStateMachine(),
+ "ecReconstructionSupervisor");
+ ECReconstructionCoordinator coordinator = GenericTestUtils
+ .mockFieldReflection(ecReconstructionSupervisor,
+ "reconstructionCoordinator");
+
+ Mockito.doAnswer(invocation -> {
+ GenericTestUtils.waitFor(() ->
+ dn.getDatanodeStateMachine()
+ .getContainer()
+ .getContainerSet()
+ .getContainer(finalContainer.getContainerID())
+ .getContainerState() ==
+ ContainerProtos.ContainerDataProto.State.UNHEALTHY,
+ 1000, 100000);
+ reconstructedDN.set(dn);
+ invocation.callRealMethod();
+ return null;
+ }).when(coordinator).reconstructECBlockGroup(Mockito.any(),
Mockito.any(),
+ Mockito.any());
+ }
+
Review Comment:
typo: sown -> down
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java:
##########
@@ -126,25 +130,29 @@ public void closeContainer(long containerID,
DatanodeDetails dn,
* @param repConfig - Replication config.
* @param encodedToken - Token
*/
- public void deleteRecoveringContainer(long containerID, DatanodeDetails dn,
- ECReplicationConfig repConfig, String encodedToken) throws IOException {
+ public void deleteContainerInState(long containerID, DatanodeDetails dn,
+ ECReplicationConfig repConfig, String encodedToken,
+ Set<State> acceptableStates) throws IOException {
XceiverClientSpi xceiverClient = this.xceiverClientManager
.acquireClient(singleNodePipeline(dn, repConfig));
try {
// Before deleting the recovering container, just make sure that state is
- // Recovering. There will be still race condition, but this will avoid
- // most usual case.
+ // Recovering & Unhealthy. There will be still race condition,
+ // but this will avoid most usual case.
ContainerProtos.ReadContainerResponseProto readContainerResponseProto =
ContainerProtocolCalls
.readContainer(xceiverClient, containerID, encodedToken);
- if (readContainerResponseProto
- .getContainerData()
- .getState() == ContainerProtos.ContainerDataProto.State.RECOVERING) {
- ContainerProtocolCalls
- .deleteContainer(xceiverClient, containerID, true, encodedToken);
+ State currentState =
+ readContainerResponseProto.getContainerData().getState();
+ if (!Objects.isNull(acceptableStates)
+ && acceptableStates.contains(currentState)) {
+ ContainerProtocolCalls.deleteContainer(xceiverClient, containerID,
+ true, encodedToken);
} else {
- LOG.warn("Container will not be deleted as it is not a recovering"
- + " container {}", containerID);
+ LOG.warn("Container {} will not be deleted as current state " +
+ "not in acceptable states. Current state: {}, " +
+ "Acceptable State: {}", containerID, currentState,
Review Comment:
Acceptable State: {} --> Acceptable States: {}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]