ashishkumar50 commented on code in PR #5978:
URL: https://github.com/apache/ozone/pull/5978#discussion_r1453101627


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -377,71 +377,67 @@ public TransactionContext 
startTransaction(RaftClientRequest request)
       ctxt.setException(ioe);
       return ctxt;
     }
+
+    boolean blockAlreadyFinalized = false;
     if (proto.getCmdType() == Type.PutBlock) {
-      TransactionContext ctxt = rejectRequest(request,
-          proto.getContainerID(), proto.getPutBlock().getBlockData()
-          .getBlockID().getLocalID());
-      if (ctxt != null) {
-        return ctxt;
-      }
+      blockAlreadyFinalized = 
shouldRejectRequest(proto.getPutBlock().getBlockData().getBlockID());
     } else if (proto.getCmdType() == Type.WriteChunk) {
       final WriteChunkRequestProto write = proto.getWriteChunk();
-      TransactionContext ctxt = rejectRequest(request,
-          proto.getContainerID(), write.getBlockID().getLocalID());
-      if (ctxt != null) {
-        return ctxt;
+      blockAlreadyFinalized = shouldRejectRequest(write.getBlockID());
+      if (!blockAlreadyFinalized) {
+        // create the log entry proto
+        final WriteChunkRequestProto commitWriteChunkProto =
+            WriteChunkRequestProto.newBuilder()
+                .setBlockID(write.getBlockID())
+                .setChunkData(write.getChunkData())
+                // skipping the data field as it is
+                // already set in statemachine data proto
+                .build();
+        ContainerCommandRequestProto commitContainerCommandProto =
+            ContainerCommandRequestProto
+                .newBuilder(proto)
+                .setWriteChunk(commitWriteChunkProto)
+                .setTraceID(proto.getTraceID())
+                .build();
+        Preconditions.checkArgument(write.hasData());
+        Preconditions.checkArgument(!write.getData().isEmpty());
+
+        return TransactionContext.newBuilder()
+            .setClientRequest(request)
+            .setStateMachine(this)
+            .setServerRole(RaftPeerRole.LEADER)
+            .setStateMachineContext(startTime)
+            .setStateMachineData(write.getData())
+            .setLogData(commitContainerCommandProto.toByteString())
+            .build();
       }
-      // create the log entry proto
-      final WriteChunkRequestProto commitWriteChunkProto =
-          WriteChunkRequestProto.newBuilder()
-              .setBlockID(write.getBlockID())
-              .setChunkData(write.getChunkData())
-              // skipping the data field as it is
-              // already set in statemachine data proto
-              .build();
-      ContainerCommandRequestProto commitContainerCommandProto =
-          ContainerCommandRequestProto
-              .newBuilder(proto)
-              .setWriteChunk(commitWriteChunkProto)
-              .setTraceID(proto.getTraceID())
-              .build();
-      Preconditions.checkArgument(write.hasData());
-      Preconditions.checkArgument(!write.getData().isEmpty());
-
-      return TransactionContext.newBuilder()
-          .setClientRequest(request)
-          .setStateMachine(this)
-          .setServerRole(RaftPeerRole.LEADER)
-          .setStateMachineContext(startTime)
-          .setStateMachineData(write.getData())
-          .setLogData(commitContainerCommandProto.toByteString())
-          .build();
     } else if (proto.getCmdType() == Type.FinalizeBlock) {
       containerController.addFinalizedBlock(proto.getContainerID(),
           proto.getFinalizeBlock().getBlockID().getLocalID());
     }
-    return TransactionContext.newBuilder()
-        .setClientRequest(request)
-        .setStateMachine(this)
-        .setServerRole(RaftPeerRole.LEADER)
-        .setStateMachineContext(startTime)
-        .setLogData(proto.toByteString())
-        .build();
-  }
-
-  @Nullable
-  private TransactionContext rejectRequest(RaftClientRequest request,
-              long containerId, long localId) {
-    if (containerController.isFinalizedBlockExist(containerId, localId)) {
+    if (blockAlreadyFinalized) {
       TransactionContext ctxt = TransactionContext.newBuilder()
           .setClientRequest(request)
           .setStateMachine(this)
           .setServerRole(RaftPeerRole.LEADER)
           .build();
-      ctxt.setException(new IOException("Block already finalized"));
+      ctxt.setException(new StorageContainerException("Block already 
finalized",
+          ContainerProtos.Result.BLOCK_ALREADY_FINALIZED));
       return ctxt;
+    } else {
+      return TransactionContext.newBuilder()
+          .setClientRequest(request)
+          .setStateMachine(this)
+          .setServerRole(RaftPeerRole.LEADER)
+          .setStateMachineContext(startTime)
+          .setLogData(proto.toByteString())
+          .build();
     }
-    return null;
+  }
+
+  @Nullable

Review Comment:
   @Nullable is not required



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java:
##########
@@ -175,11 +195,262 @@ public void testOBSRecoveryShouldFail() throws Exception 
{
         conf.get(OZONE_OM_ADDRESS_KEY));
     conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
 
-    final String dir = OZONE_ROOT + bucket.getVolumeName() +
+    final String directory = OZONE_ROOT + bucket.getVolumeName() +
         OZONE_URI_DELIMITER + bucket.getName();
-    final Path file = new Path(dir, "file");
+    final Path f = new Path(directory, "file");
 
     RootedOzoneFileSystem fs = (RootedOzoneFileSystem) FileSystem.get(conf);
-    assertThrows(IllegalArgumentException.class, () -> fs.recoverLease(file));
+    assertThrows(IllegalArgumentException.class, () -> fs.recoverLease(f));
+  }
+
+  @Test
+  public void testFinalizeBlockFailure() throws Exception {
+    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
+    int dataSize = 100;
+    final byte[] data = getData(dataSize);
+
+    final FSDataOutputStream stream = fs.create(file, true);
+    try {
+      stream.write(data);
+      stream.hsync();
+      assertFalse(fs.isFileClosed(file));
+
+      // write more data without hsync
+      stream.write(data);
+      stream.flush();
+
+      FaultInjectorImpl injector = new FaultInjectorImpl();
+      KeyValueHandler.setInjector(injector);
+      StorageContainerException sce = new StorageContainerException(
+          "Requested operation not allowed as ContainerState is CLOSED",
+          ContainerProtos.Result.CLOSED_CONTAINER_IO);
+      injector.setException(sce);
+      GenericTestUtils.LogCapturer logs =
+          
GenericTestUtils.LogCapturer.captureLogs(BasicRootedOzoneClientAdapterImpl.LOG);
+
+      fs.recoverLease(file);
+      assertTrue(logs.getOutput().contains("Failed to execute finalizeBlock 
command"));
+      assertTrue(logs.getOutput().contains("Requested operation not allowed as 
ContainerState is CLOSED"));
+
+      // The lease should have been recovered.
+      assertTrue("File should be closed", fs.isFileClosed(file));
+      FileStatus fileStatus = fs.getFileStatus(file);
+      assertEquals(dataSize * 2, fileStatus.getLen());
+    } finally {
+      closeIgnoringKeyNotFound(stream);
+    }
+
+    // open it again, make sure the data is correct
+    verifyData(data, dataSize * 2, file, fs);
+  }
+
+  @Test
+  public void testBlockPipelineClosed() throws Exception {
+    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
+    int dataSize = 100;
+    final byte[] data = getData(dataSize);
+
+    final FSDataOutputStream stream = fs.create(file, true);
+    try {
+      stream.write(data);
+      stream.hsync();
+      assertFalse(fs.isFileClosed(file));
+
+      // write more data without hsync
+      stream.write(data);
+      stream.flush();
+
+      // close the pipeline
+      StorageContainerManager scm = cluster.getStorageContainerManager();
+      ContainerInfo container = 
scm.getContainerManager().getContainers().get(0);
+      OzoneTestUtils.closeContainer(scm, container);
+      GenericTestUtils.waitFor(() -> {
+        try {
+          return 
scm.getPipelineManager().getPipeline(container.getPipelineID()).isClosed();

Review Comment:
   Do we need to wait here to check for pipeline is CLOSED or not? I see in 
other test cases we are not checking it. `closeContainer()` waits for container 
to go into CLOSED state.



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java:
##########
@@ -175,11 +195,262 @@ public void testOBSRecoveryShouldFail() throws Exception 
{
         conf.get(OZONE_OM_ADDRESS_KEY));
     conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
 
-    final String dir = OZONE_ROOT + bucket.getVolumeName() +
+    final String directory = OZONE_ROOT + bucket.getVolumeName() +
         OZONE_URI_DELIMITER + bucket.getName();
-    final Path file = new Path(dir, "file");
+    final Path f = new Path(directory, "file");
 
     RootedOzoneFileSystem fs = (RootedOzoneFileSystem) FileSystem.get(conf);
-    assertThrows(IllegalArgumentException.class, () -> fs.recoverLease(file));
+    assertThrows(IllegalArgumentException.class, () -> fs.recoverLease(f));
+  }
+
+  @Test
+  public void testFinalizeBlockFailure() throws Exception {
+    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
+    int dataSize = 100;
+    final byte[] data = getData(dataSize);
+
+    final FSDataOutputStream stream = fs.create(file, true);
+    try {
+      stream.write(data);
+      stream.hsync();
+      assertFalse(fs.isFileClosed(file));
+
+      // write more data without hsync
+      stream.write(data);
+      stream.flush();
+
+      FaultInjectorImpl injector = new FaultInjectorImpl();
+      KeyValueHandler.setInjector(injector);
+      StorageContainerException sce = new StorageContainerException(
+          "Requested operation not allowed as ContainerState is CLOSED",
+          ContainerProtos.Result.CLOSED_CONTAINER_IO);
+      injector.setException(sce);
+      GenericTestUtils.LogCapturer logs =
+          
GenericTestUtils.LogCapturer.captureLogs(BasicRootedOzoneClientAdapterImpl.LOG);
+
+      fs.recoverLease(file);
+      assertTrue(logs.getOutput().contains("Failed to execute finalizeBlock 
command"));
+      assertTrue(logs.getOutput().contains("Requested operation not allowed as 
ContainerState is CLOSED"));
+
+      // The lease should have been recovered.
+      assertTrue("File should be closed", fs.isFileClosed(file));
+      FileStatus fileStatus = fs.getFileStatus(file);
+      assertEquals(dataSize * 2, fileStatus.getLen());
+    } finally {
+      closeIgnoringKeyNotFound(stream);
+    }
+
+    // open it again, make sure the data is correct
+    verifyData(data, dataSize * 2, file, fs);
+  }
+
+  @Test
+  public void testBlockPipelineClosed() throws Exception {
+    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
+    int dataSize = 100;
+    final byte[] data = getData(dataSize);
+
+    final FSDataOutputStream stream = fs.create(file, true);
+    try {
+      stream.write(data);
+      stream.hsync();
+      assertFalse(fs.isFileClosed(file));
+
+      // write more data without hsync
+      stream.write(data);
+      stream.flush();
+
+      // close the pipeline
+      StorageContainerManager scm = cluster.getStorageContainerManager();
+      ContainerInfo container = 
scm.getContainerManager().getContainers().get(0);
+      OzoneTestUtils.closeContainer(scm, container);
+      GenericTestUtils.waitFor(() -> {
+        try {
+          return 
scm.getPipelineManager().getPipeline(container.getPipelineID()).isClosed();
+        } catch (PipelineNotFoundException e) {
+          throw new RuntimeException(e);
+        }
+      }, 200, 30000);
+
+      fs.recoverLease(file);
+
+      // The lease should have been recovered.
+      assertTrue("File should be closed", fs.isFileClosed(file));
+      FileStatus fileStatus = fs.getFileStatus(file);
+      assertEquals(dataSize * 2, fileStatus.getLen());
+    } finally {
+      closeIgnoringKeyNotFound(stream);
+    }
+
+    // open it again, make sure the data is correct
+    verifyData(data, dataSize * 2, file, fs);
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  public void testGetCommittedBlockLengthTimeout(boolean forceRecovery) throws 
Exception {
+    // reduce read timeout
+    conf.set(OZONE_CLIENT_READ_TIMEOUT, "2s");
+    // set force recovery
+    System.setProperty("OZONE.CLIENT.RECOVER.LEASE.FORCE", 
String.valueOf(forceRecovery));
+    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
+    int dataSize = 100;
+    final byte[] data = getData(dataSize);
+
+    final FSDataOutputStream stream = fs.create(file, true);
+    try {
+      stream.write(data);
+      stream.hsync();
+      assertFalse(fs.isFileClosed(file));
+
+      // write more data without hsync
+      stream.write(data);
+      stream.flush();
+
+      // close the pipeline and container
+      ContainerInfo container = 
cluster.getStorageContainerManager().getContainerManager().getContainers().get(0);
+      OzoneTestUtils.closeContainer(cluster.getStorageContainerManager(), 
container);
+      // pause getCommittedBlockLength handling on all DNs to make sure all 
getCommittedBlockLength will time out
+      FaultInjectorImpl injector = new FaultInjectorImpl();
+      KeyValueHandler.setInjector(injector);
+      GenericTestUtils.LogCapturer logs =
+          
GenericTestUtils.LogCapturer.captureLogs(XceiverClientGrpc.getLogger());
+      if (!forceRecovery) {
+        assertThrows(IOException.class, () -> fs.recoverLease(file));
+        return;
+      } else {
+        fs.recoverLease(file);
+      }
+      assertEquals(3, StringUtils.countMatches(logs.getOutput(),
+          "Executing command cmdType: GetCommittedBlockLength"));
+
+      // The lease should have been recovered.
+      assertTrue("File should be closed", fs.isFileClosed(file));
+      FileStatus fileStatus = fs.getFileStatus(file);
+      // Since all DNs are out, then the length in OM keyInfo will be used as 
the final file length
+      assertEquals(dataSize, fileStatus.getLen());
+    } finally {
+      closeIgnoringKeyNotFound(stream);
+      KeyValueHandler.setInjector(null);
+    }
+
+    // open it again, make sure the data is correct
+    verifyData(data, dataSize, file, fs);
+  }
+
+  @Test
+  public void testGetCommittedBlockLengthWithException() throws Exception {
+    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
+    int dataSize = 100;
+    final byte[] data = getData(dataSize);
+
+    final FSDataOutputStream stream = fs.create(file, true);
+    try {
+      stream.write(data);
+      stream.hsync();
+      assertFalse(fs.isFileClosed(file));
+
+      // write more data without hsync
+      stream.write(data);
+      stream.flush();
+
+      // close the pipeline and container
+      ContainerInfo container = 
cluster.getStorageContainerManager().getContainerManager().getContainers().get(0);
+      OzoneTestUtils.closeContainer(cluster.getStorageContainerManager(), 
container);
+      // throw exception on first DN getCommittedBlockLength handling
+      FaultInjectorImpl injector = new FaultInjectorImpl();
+      KeyValueHandler.setInjector(injector);
+      StorageContainerException sce = new StorageContainerException(
+          "ContainerID " + container.getContainerID() + " does not exist",
+          ContainerProtos.Result.CONTAINER_NOT_FOUND);
+      injector.setException(sce);
+
+      GenericTestUtils.LogCapturer logs =
+          
GenericTestUtils.LogCapturer.captureLogs(XceiverClientGrpc.getLogger());
+      fs.recoverLease(file);
+
+      assertEquals(2, StringUtils.countMatches(logs.getOutput(),
+          "Executing command cmdType: GetCommittedBlockLength"));
+      assertEquals(1, StringUtils.countMatches(logs.getOutput(),
+          "Failed to execute command cmdType: GetCommittedBlockLength"));
+
+      // The lease should have been recovered.
+      assertTrue("File should be closed", fs.isFileClosed(file));
+      FileStatus fileStatus = fs.getFileStatus(file);
+      assertEquals(dataSize * 2, fileStatus.getLen());
+    } finally {
+      closeIgnoringKeyNotFound(stream);
+      KeyValueHandler.setInjector(null);
+    }
+
+    // open it again, make sure the data is correct
+    verifyData(data, dataSize * 2, file, fs);
+  }
+
+  @Test
+  public void testOMConnectionFailure() throws Exception {
+    // reduce hadoop RPC retry max attempts
+    conf.setInt(OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 5);
+    conf.setLong(OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY, 100);
+    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
+    int dataSize = 100;
+    final byte[] data = getData(dataSize);
+
+    final FSDataOutputStream stream = fs.create(file, true);
+    try {
+      stream.write(data);
+      stream.hsync();
+      assertFalse(fs.isFileClosed(file));
+
+      // close OM
+      cluster.getOzoneManager().stop();
+      assertThrows(ConnectException.class, () -> fs.recoverLease(file));
+    } finally {
+      try {
+        stream.close();
+      } catch (Throwable e) {
+      }
+      cluster.getOzoneManager().restart();
+      cluster.waitForClusterToBeReady();

Review Comment:
   Can we verify after OM restart recovery works fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to