This is an automated email from the ASF dual-hosted git repository. elek pushed a commit to branch HDDS-2281 in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit a4013be81d703e7eee8343b25e1f0e76b2802415 Author: Shashikant Banerjee <shashik...@apache.org> AuthorDate: Fri Oct 11 01:53:44 2019 +0530 HDDS-2281. ContainerStateMachine#handleWriteChunk should ignore close container exception. --- .../server/ratis/ContainerStateMachine.java | 12 ++- .../rpc/TestContainerStateMachineFailures.java | 108 +++++++++++++++++++++ .../ozone/container/ContainerTestHelper.java | 19 +++- 3 files changed, 137 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index b89ec73..a124448 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -460,6 +460,10 @@ public class ContainerStateMachine extends BaseStateMachine { LOG.error(gid + ": writeChunk writeStateMachineData failed: blockId" + write.getBlockID() + " logIndex " + entryIndex + " chunkName " + write.getChunkData().getChunkName() + e); + metrics.incNumWriteDataFails(); + // write chunks go in parallel. It's possible that one write chunk + // see the stateMachine is marked unhealthy by other parallel thread. + stateMachineHealthy.set(false); raftFuture.completeExceptionally(e); throw e; } @@ -474,7 +478,9 @@ public class ContainerStateMachine extends BaseStateMachine { // Remove the future once it finishes execution from the // writeChunkFutureMap. writeChunkFuture.thenApply(r -> { - if (r.getResult() != ContainerProtos.Result.SUCCESS) { + if (r.getResult() != ContainerProtos.Result.SUCCESS + && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN + && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) { StorageContainerException sce = new StorageContainerException(r.getMessage(), r.getResult()); LOG.error(gid + ": writeChunk writeStateMachineData failed: blockId" + @@ -482,6 +488,7 @@ public class ContainerStateMachine extends BaseStateMachine { write.getChunkData().getChunkName() + " Error message: " + r.getMessage() + " Container Result: " + r.getResult()); metrics.incNumWriteDataFails(); + stateMachineHealthy.set(false); raftFuture.completeExceptionally(sce); } else { metrics.incNumBytesWrittenCount( @@ -584,6 +591,7 @@ public class ContainerStateMachine extends BaseStateMachine { LOG.error("gid {} : ReadStateMachine failed. cmd {} logIndex {} msg : " + "{} Container Result: {}", gid, response.getCmdType(), index, response.getMessage(), response.getResult()); + stateMachineHealthy.set(false); throw sce; } @@ -739,6 +747,8 @@ public class ContainerStateMachine extends BaseStateMachine { LOG.error("gid {} : ApplyTransaction failed. cmd {} logIndex " + "{} exception {}", gid, requestProto.getCmdType(), index, e); + stateMachineHealthy.compareAndSet(true, false); + metrics.incNumApplyTransactionsFails(); applyTransactionFuture.completeExceptionally(e); throw e; } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java index 9ac45b8..8dde3f7 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java @@ -26,6 +26,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -62,7 +64,10 @@ import java.io.IOException; import java.nio.file.Path; import java.util.HashMap; import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.hadoop.hdds.HddsConfigKeys. HDDS_COMMAND_STATUS_REPORT_INTERVAL; @@ -78,6 +83,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys. OZONE_SCM_PIPELINE_DESTROY_TIMEOUT; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; /** * Tests the containerStateMachine failure handling. @@ -418,6 +424,108 @@ public class TestContainerStateMachineFailures { Assert.assertFalse(snapshot.getPath().equals(latestSnapshot.getPath())); } + // The test injects multiple write chunk requests along with closed container + // request thereby inducing a situation where a writeStateMachine call + // gets executed when the closed container apply completes thereby + // failing writeStateMachine call. In any case, our stateMachine should + // not be marked unhealthy and pipeline should not fail if container gets + // closed here. + @Test + public void testWriteStateMachineDataIdempotencyWithClosedContainer() + throws Exception { + OzoneOutputStream key = + objectStore.getVolume(volumeName).getBucket(bucketName) + .createKey("ratis-1", 1024, ReplicationType.RATIS, + ReplicationFactor.ONE, new HashMap<>()); + // First write and flush creates a container in the datanode + key.write("ratis".getBytes()); + key.flush(); + key.write("ratis".getBytes()); + KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream(); + List<OmKeyLocationInfo> locationInfoList = + groupOutputStream.getLocationInfoList(); + Assert.assertEquals(1, locationInfoList.size()); + OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0); + ContainerData containerData = + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() + .getContainer().getContainerSet() + .getContainer(omKeyLocationInfo.getContainerID()) + .getContainerData(); + Assert.assertTrue(containerData instanceof KeyValueContainerData); + key.close(); + ContainerStateMachine stateMachine = + (ContainerStateMachine) ContainerTestHelper.getStateMachine(cluster); + SimpleStateMachineStorage storage = + (SimpleStateMachineStorage) stateMachine.getStateMachineStorage(); + Path parentPath = storage.findLatestSnapshot().getFile().getPath(); + // Since the snapshot threshold is set to 1, since there are + // applyTransactions, we should see snapshots + Assert.assertTrue(parentPath.getParent().toFile().listFiles().length > 0); + FileInfo snapshot = storage.findLatestSnapshot().getFile(); + Assert.assertNotNull(snapshot); + long containerID = omKeyLocationInfo.getContainerID(); + Pipeline pipeline = cluster.getStorageContainerLocationClient() + .getContainerWithPipeline(containerID).getPipeline(); + XceiverClientSpi xceiverClient = + xceiverClientManager.acquireClient(pipeline); + CountDownLatch latch = new CountDownLatch(100); + int count = 0; + AtomicInteger failCount = new AtomicInteger(0); + Runnable r1 = () -> { + try { + ContainerProtos.ContainerCommandRequestProto.Builder request = + ContainerProtos.ContainerCommandRequestProto.newBuilder(); + request.setDatanodeUuid(pipeline.getFirstNode().getUuidString()); + request.setCmdType(ContainerProtos.Type.CloseContainer); + request.setContainerID(containerID); + request.setCloseContainer( + ContainerProtos.CloseContainerRequestProto.getDefaultInstance()); + xceiverClient.sendCommand(request.build()); + } catch (IOException e) { + failCount.incrementAndGet(); + } + }; + Runnable r2 = () -> { + try { + xceiverClient.sendCommand(ContainerTestHelper + .getWriteChunkRequest(pipeline, omKeyLocationInfo.getBlockID(), + 1024, new Random().nextInt())); + latch.countDown(); + } catch (IOException e) { + latch.countDown(); + if (!(HddsClientUtils + .checkForException(e) instanceof ContainerNotOpenException)) { + failCount.incrementAndGet(); + } + } + }; + + for (int i=0 ; i < 100; i++) { + count++; + new Thread(r2).start(); + } + + new Thread(r1).start(); + latch.await(600, TimeUnit.SECONDS); + if (failCount.get() > 0) { + fail("testWriteStateMachineDataIdempotencyWithClosedContainer failed"); + } + Assert.assertTrue( + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() + .getContainer().getContainerSet().getContainer(containerID) + .getContainerState() + == ContainerProtos.ContainerDataProto.State.CLOSED); + Assert.assertTrue(stateMachine.isStateMachineHealthy()); + try { + stateMachine.takeSnapshot(); + } catch (IOException ioe) { + Assert.fail("Exception should not be thrown"); + } + FileInfo latestSnapshot = storage.findLatestSnapshot().getFile(); + Assert.assertFalse(snapshot.getPath().equals(latestSnapshot.getPath())); + + } + @Test public void testValidateBCSIDOnDnRestart() throws Exception { OzoneOutputStream key = diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 395bda0..0b8dffc 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -227,6 +227,23 @@ public final class ContainerTestHelper { Pipeline pipeline, BlockID blockID, int datalen) throws IOException { LOG.trace("writeChunk {} (blockID={}) to pipeline=", datalen, blockID, pipeline); + return getWriteChunkRequest(pipeline, blockID, datalen, 0); + } + + /** + * Returns a writeChunk Request. + * + * @param pipeline - A set of machines where this container lives. + * @param blockID - Block ID of the chunk. + * @param datalen - Length of data. + * @return ContainerCommandRequestProto + * @throws IOException + * @throws NoSuchAlgorithmException + */ + public static ContainerCommandRequestProto getWriteChunkRequest( + Pipeline pipeline, BlockID blockID, int datalen, int seq) throws IOException { + LOG.trace("writeChunk {} (blockID={}) to pipeline=", + datalen, blockID, pipeline); ContainerProtos.WriteChunkRequestProto.Builder writeRequest = ContainerProtos.WriteChunkRequestProto .newBuilder(); @@ -234,7 +251,7 @@ public final class ContainerTestHelper { writeRequest.setBlockID(blockID.getDatanodeBlockIDProtobuf()); ByteBuffer data = getData(datalen); - ChunkInfo info = getChunk(blockID.getLocalID(), 0, 0, datalen); + ChunkInfo info = getChunk(blockID.getLocalID(), seq, 0, datalen); setDataChecksum(info, data); writeRequest.setChunkData(info.getProtoBufMessage()); --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-commits-h...@hadoop.apache.org