This is an automated email from the ASF dual-hosted git repository.
sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new ebacba1a49f HDDS-14040. Ozone client hang for data write in failure
scenario (#9401)
ebacba1a49f is described below
commit ebacba1a49ffb744e61772c8f3619c8dd7e6d9c5
Author: Sumit Agrawal <[email protected]>
AuthorDate: Fri Dec 5 12:01:05 2025 +0530
HDDS-14040. Ozone client hang for data write in failure scenario (#9401)
---
.../server/ratis/ContainerStateMachine.java | 14 ++---
.../server/ratis/TestContainerStateMachine.java | 43 ++++++++-------
.../rpc/TestContainerStateMachineFailures.java | 62 +++++++++++++++++++++-
3 files changed, 93 insertions(+), 26 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index d5773b5abe5..1ec30914e2c 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -582,7 +582,8 @@ private CompletableFuture<Message> writeStateMachineData(
try {
validateLongRunningWrite();
} catch (StorageContainerException e) {
- return completeExceptionally(e);
+ ContainerCommandResponseProto result =
ContainerUtils.logAndReturnError(LOG, e, requestProto);
+ return CompletableFuture.completedFuture(result::toByteString);
}
final WriteChunkRequestProto write = requestProto.getWriteChunk();
RaftServer server = ratisServer.getServer();
@@ -631,8 +632,11 @@ private CompletableFuture<Message> writeStateMachineData(
// see the stateMachine is marked unhealthy by other parallel
thread
unhealthyContainers.add(write.getBlockID().getContainerID());
stateMachineHealthy.set(false);
- raftFuture.completeExceptionally(e);
- throw e;
+ StorageContainerException sce = new
StorageContainerException("Failed to write chunk data",
+ e, ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
+ ContainerCommandResponseProto result =
ContainerUtils.logAndReturnError(LOG, sce, requestProto);
+ raftFuture.complete(result::toByteString);
+ return result;
} finally {
// Remove the future once it finishes execution from the
writeChunkFutureMap.remove(entryIndex);
@@ -657,8 +661,6 @@ private void
handleCommandResult(ContainerCommandRequestProto requestProto, long
// After concurrent flushes are allowed on the same key, chunk file
inconsistencies can happen and
// that should not crash the pipeline.
&& r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
- StorageContainerException sce =
- new StorageContainerException(r.getMessage(), r.getResult());
LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed:
blockId" +
write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
write.getChunkData().getChunkName() + " Error message: " +
@@ -669,7 +671,7 @@ private void
handleCommandResult(ContainerCommandRequestProto requestProto, long
// handling the entry for the write chunk in cache.
stateMachineHealthy.set(false);
unhealthyContainers.add(write.getBlockID().getContainerID());
- raftFuture.completeExceptionally(sce);
+ raftFuture.complete(r::toByteString);
} else {
metrics.incNumBytesWrittenCount(
requestProto.getWriteChunk().getChunkData().getLen());
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java
index d0b2dc5358e..fd869c6127e 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java
@@ -22,6 +22,7 @@
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@@ -57,6 +58,7 @@
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -120,7 +122,8 @@ public void shutdown() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testWriteFailure(boolean failWithException) throws
ExecutionException, InterruptedException {
+ public void testWriteFailure(boolean failWithException)
+ throws ExecutionException, InterruptedException,
InvalidProtocolBufferException {
RaftProtos.LogEntryProto entry = mock(RaftProtos.LogEntryProto.class);
when(entry.getTerm()).thenReturn(1L);
when(entry.getIndex()).thenReturn(1L);
@@ -134,23 +137,28 @@ public void testWriteFailure(boolean failWithException)
throws ExecutionExceptio
setUpMockDispatcherReturn(failWithException);
setUpMockRequestProtoReturn(context, 1, 1);
- ThrowableCatcher catcher = new ThrowableCatcher();
-
- stateMachine.write(entry, trx).exceptionally(catcher.asSetter()).get();
+ Message message = stateMachine.write(entry, trx).get();
verify(dispatcher,
times(1)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class),
any(DispatcherContext.class));
reset(dispatcher);
- assertNotNull(catcher.getReceived());
- assertResults(failWithException, catcher.getCaught());
+ ContainerProtos.ContainerCommandResponseProto
containerCommandResponseProto =
+
ContainerProtos.ContainerCommandResponseProto.parseFrom(message.getContent());
+ if (failWithException) {
+ assertEquals("Failed to write chunk data",
containerCommandResponseProto.getMessage());
+ assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
containerCommandResponseProto.getResult());
+ } else {
+ // If dispatcher returned failure response, the state machine should
propagate the same failure.
+ assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
containerCommandResponseProto.getResult());
+ }
// Writing data to another container(containerId 2) should also fail.
setUpMockRequestProtoReturn(context, 2, 1);
- stateMachine.write(entryNext, trx).exceptionally(catcher.asSetter()).get();
+ message = stateMachine.write(entryNext, trx).get();
verify(dispatcher,
times(0)).dispatch(any(ContainerProtos.ContainerCommandRequestProto.class),
any(DispatcherContext.class));
- assertInstanceOf(StorageContainerException.class, catcher.getReceived());
- StorageContainerException sce = (StorageContainerException)
catcher.getReceived();
- assertEquals(ContainerProtos.Result.CONTAINER_UNHEALTHY, sce.getResult());
+ containerCommandResponseProto
+ =
ContainerProtos.ContainerCommandResponseProto.parseFrom(message.getContent());
+ assertTrue(containerCommandResponseProto.getMessage().contains("failed,
stopping all writes to container"));
}
@ParameterizedTest
@@ -224,20 +232,17 @@ public void testWriteTimout() throws Exception {
}).when(dispatcher).dispatch(any(), any());
setUpMockRequestProtoReturn(context, 1, 1);
- ThrowableCatcher catcher = new ThrowableCatcher();
CompletableFuture<Message> firstWrite = stateMachine.write(entry, trx);
Thread.sleep(2000);
CompletableFuture<Message> secondWrite = stateMachine.write(entryNext,
trx);
- firstWrite.exceptionally(catcher.asSetter()).get();
- assertNotNull(catcher.getCaught());
- assertInstanceOf(InterruptedException.class, catcher.getReceived());
+ ContainerProtos.ContainerCommandResponseProto containerCommandResponseProto
+ =
ContainerProtos.ContainerCommandResponseProto.parseFrom(firstWrite.get().getContent());
+ assertEquals("Failed to write chunk data",
containerCommandResponseProto.getMessage());
- secondWrite.exceptionally(catcher.asSetter()).get();
- assertNotNull(catcher.getReceived());
- assertInstanceOf(StorageContainerException.class, catcher.getReceived());
- StorageContainerException sce = (StorageContainerException)
catcher.getReceived();
- assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
sce.getResult());
+ containerCommandResponseProto
+ =
ContainerProtos.ContainerCommandResponseProto.parseFrom(secondWrite.get().getContent());
+ assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
containerCommandResponseProto.getResult());
}
private void setUpMockDispatcherReturn(boolean failWithException) {
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index c5ff85010bd..22d3a2b5450 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -53,6 +53,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.ReplicationConfig;
@@ -87,10 +88,12 @@
import org.apache.hadoop.ozone.container.TestHelper;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -98,6 +101,7 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.LambdaTestUtils;
import org.apache.ozone.test.tag.Flaky;
@@ -135,7 +139,7 @@ public static void init() throws Exception {
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
- conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 200,
+ conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 2000,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 200, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
@@ -764,9 +768,14 @@ void testContainerStateMachineSingleFailureRetry()
assertEquals(1, locationInfoList.size());
OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+ ContainerSet containerSet =
cluster.getHddsDatanode(omKeyLocationInfo.getPipeline().getLeaderNode())
+ .getDatanodeStateMachine().getContainer().getContainerSet();
induceFollowerFailure(omKeyLocationInfo, 2);
key.flush();
+ // wait for container close for failure in flush for both followers
applyTransaction failure
+ GenericTestUtils.waitFor(() ->
containerSet.getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
+
.getState().equals(ContainerProtos.ContainerDataProto.State.CLOSED), 100,
30000);
key.write("ratis".getBytes(UTF_8));
key.flush();
}
@@ -805,6 +814,57 @@ void testContainerStateMachineDualFailureRetry()
validateData("ratis1", 2, "ratisratisratisratis");
}
+ @Test
+ void testContainerStateMachineAllNodeFailure()
+ throws Exception {
+ // mark all dn volume as full to induce failure
+ List<Pair<StorageVolume, Long>> increasedVolumeSpace = new ArrayList<>();
+ cluster.getHddsDatanodes().forEach(
+ dn -> {
+ List<StorageVolume> volumesList =
dn.getDatanodeStateMachine().getContainer().getVolumeSet().getVolumesList();
+ volumesList.forEach(sv -> {
+ if (sv.getVolumeUsage().isPresent()) {
+ increasedVolumeSpace.add(Pair.of(sv,
sv.getCurrentUsage().getAvailable()));
+
sv.getVolumeUsage().get().incrementUsedSpace(sv.getCurrentUsage().getAvailable());
+ }
+ });
+ }
+ );
+
+ long startTime = Time.monotonicNow();
+ ReplicationConfig replicationConfig =
ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS,
+ ReplicationFactor.THREE);
+ try (OzoneOutputStream key =
objectStore.getVolume(volumeName).getBucket(bucketName).createKey(
+ "testkey1", 1024, replicationConfig, new HashMap<>())) {
+
+ key.write("ratis".getBytes(UTF_8));
+ key.flush();
+ fail();
+ } catch (IOException ex) {
+ assertTrue(ex.getMessage().contains("Retry request failed. retries get
failed due to exceeded" +
+ " maximum allowed retries number: 5"), ex.getMessage());
+ } finally {
+ increasedVolumeSpace.forEach(e -> e.getLeft().getVolumeUsage().ifPresent(
+ p -> p.decrementUsedSpace(e.getRight())));
+ // test execution is less than 2 sec but to be safe putting 30 sec as
without fix, taking more than 60 sec
+ assertTrue(Time.monotonicNow() - startTime < 30000, "Operation took
longer than expected: "
+ + (Time.monotonicNow() - startTime));
+ }
+
+ // previous pipeline gets closed due to disk full failure, so created a
new pipeline and write should succeed,
+ // and this ensures later test case can pass (should not fail due to
pipeline unavailability as timeout is 200ms
+ // for pipeline creation which can fail in testcase later on)
+ Pipeline pipeline =
cluster.getStorageContainerManager().getPipelineManager().createPipeline(replicationConfig);
+
cluster.getStorageContainerManager().getPipelineManager().waitPipelineReady(pipeline.getId(),
60000);
+
+ try (OzoneOutputStream key =
objectStore.getVolume(volumeName).getBucket(bucketName).createKey(
+ "testkey2", 1024, replicationConfig, new HashMap<>())) {
+
+ key.write("ratis".getBytes(UTF_8));
+ key.flush();
+ }
+ }
+
private void induceFollowerFailure(OmKeyLocationInfo omKeyLocationInfo,
int failureCount) {
DatanodeID leader = omKeyLocationInfo.getPipeline().getLeaderId();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]