This is an automated email from the ASF dual-hosted git repository.

ritesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new a5aa549dc8 HDDS-7829. SCM to reject adding container to closed 
pipeline (#4212)
a5aa549dc8 is described below

commit a5aa549dc84433ac74aecf08024b4081c5a55574
Author: Duong Nguyen <[email protected]>
AuthorDate: Thu Feb 9 20:03:43 2023 -0800

    HDDS-7829. SCM to reject adding container to closed pipeline (#4212)
---
 .../hadoop/hdds/scm/exceptions/SCMException.java   |   2 +
 .../pipeline/DuplicatedPipelineIdException.java    |  31 +++++
 .../pipeline/InvalidPipelineStateException.java    |  30 +++++
 .../src/main/proto/ScmServerProtocol.proto         |   2 +
 .../apache/hadoop/hdds/scm/ha/SCMStateMachine.java |  17 ++-
 .../scm/pipeline/PipelineStateManagerImpl.java     |   4 +-
 .../hadoop/hdds/scm/pipeline/PipelineStateMap.java |  23 ++--
 .../hdds/scm/pipeline/TestPipelineManagerImpl.java |  10 +-
 .../container/TestScmApplyTransactionFailure.java  | 147 +++++++++++++++++++++
 9 files changed, 246 insertions(+), 20 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
index cd1ea08ac0..7ded5f869a 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
@@ -133,5 +133,7 @@ public class SCMException extends IOException {
     CONTAINER_REPLICA_NOT_FOUND,
     FAILED_TO_CONNECT_TO_CRL_SERVICE,
     FAILED_TO_ADD_CRL_CLIENT,
+    INVALID_PIPELINE_STATE,
+    DUPLICATED_PIPELINE_ID
   }
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/DuplicatedPipelineIdException.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/DuplicatedPipelineIdException.java
new file mode 100644
index 0000000000..dce3b7bbfc
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/DuplicatedPipelineIdException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+
+/**
+ * Signals that a pipeline id is found duplicated.
+ */
+public class DuplicatedPipelineIdException extends SCMException {
+
+  public DuplicatedPipelineIdException(String message) {
+    super(message, ResultCodes.DUPLICATED_PIPELINE_ID);
+  }
+}
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/InvalidPipelineStateException.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/InvalidPipelineStateException.java
new file mode 100644
index 0000000000..040db21ba2
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/InvalidPipelineStateException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+
+/**
+ * Signals that a pipeline state is not valid for an operation.
+ */
+public class InvalidPipelineStateException extends SCMException {
+  public InvalidPipelineStateException(String message) {
+    super(message, ResultCodes.INVALID_PIPELINE_STATE);
+  }
+}
diff --git 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
index 90ce7f7bbf..6df2db8569 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
@@ -130,6 +130,8 @@ enum Status {
   CONTAINER_REPLICA_NOT_FOUND = 37;
   FAILED_TO_CONNECT_TO_CRL_SERVICE = 38;
   FAILED_TO_ADD_CRL_CLIENT = 39;
+  INVALID_PIPELINE_STATE = 40;
+  DUPLICATED_PIPELINE_ID = 41;
 }
 
 /**
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index 3d0f226460..5e5298a967 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -34,6 +34,8 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
 import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
@@ -136,7 +138,20 @@ public class SCMStateMachine extends BaseStateMachine {
     try {
       final SCMRatisRequest request = SCMRatisRequest.decode(
           Message.valueOf(trx.getStateMachineLogEntry().getLogData()));
-      applyTransactionFuture.complete(process(request));
+
+      try {
+        applyTransactionFuture.complete(process(request));
+      } catch (SCMException ex) {
+        // For SCM exceptions while applying a transaction, if the error
+        // code indicate a FATAL issue, let it crash SCM.
+        if (ex.getResult() == ResultCodes.INTERNAL_ERROR
+            || ex.getResult() == ResultCodes.IO_EXCEPTION) {
+          throw ex;
+        }
+        // Otherwise, it's considered as a logical rejection and is returned to
+        // Ratis client, leaving SCM intact.
+        applyTransactionFuture.completeExceptionally(ex);
+      }
 
       // After previous term transactions are applied, still in safe mode,
       // perform refreshAndValidate to update the safemode rule state.
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerImpl.java
index 29a8f05712..8d9b7744c1 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerImpl.java
@@ -96,10 +96,10 @@ public class PipelineStateManagerImpl implements 
PipelineStateManager {
     try {
       Pipeline pipeline = Pipeline.getFromProtobuf(pipelineProto);
       if (pipelineStore != null) {
-        transactionBuffer
-            .addToBuffer(pipelineStore, pipeline.getId(), pipeline);
         pipelineStateMap.addPipeline(pipeline);
         nodeManager.addPipeline(pipeline);
+        transactionBuffer
+            .addToBuffer(pipelineStore, pipeline.getId(), pipeline);
         LOG.info("Created pipeline {}.", pipeline);
       }
     } finally {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
index 5db887b674..a2c9488eef 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
@@ -39,6 +39,8 @@ import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 
+import static java.lang.String.format;
+
 /**
  * Holds the data structures which maintain the information about pipeline and
  * its state.
@@ -80,8 +82,8 @@ class PipelineStateMap {
 
     if (pipelineMap.putIfAbsent(pipeline.getId(), pipeline) != null) {
       LOG.warn("Duplicate pipeline ID detected. {}", pipeline.getId());
-      throw new IOException(String
-          .format("Duplicate pipeline ID %s detected.", pipeline.getId()));
+      throw new DuplicatedPipelineIdException(
+          format("Duplicate pipeline ID %s detected.", pipeline.getId()));
     }
     pipeline2container.put(pipeline.getId(), new TreeSet<>());
     if (pipeline.getPipelineState() == PipelineState.OPEN) {
@@ -106,9 +108,8 @@ class PipelineStateMap {
 
     Pipeline pipeline = getPipeline(pipelineID);
     if (pipeline.isClosed()) {
-      LOG.warn("Adding container {} to pipeline={} in CLOSED state." +
-          " This happens only for some exceptional cases." +
-          " Check for the previous exceptions.", containerID, pipelineID);
+      throw new InvalidPipelineStateException(format(
+          "Cannot add container to pipeline=%s in closed state", pipelineID));
     }
     pipeline2container.get(pipelineID).add(containerID);
   }
@@ -154,7 +155,7 @@ class PipelineStateMap {
     Pipeline pipeline = pipelineMap.get(pipelineID);
     if (pipeline == null) {
       throw new PipelineNotFoundException(
-          String.format("%s not found", pipelineID));
+          format("%s not found", pipelineID));
     }
     return pipeline;
   }
@@ -314,7 +315,7 @@ class PipelineStateMap {
     NavigableSet<ContainerID> containerIDs = 
pipeline2container.get(pipelineID);
     if (containerIDs == null) {
       throw new PipelineNotFoundException(
-          String.format("%s not found", pipelineID));
+          format("%s not found", pipelineID));
     }
     return new TreeSet<>(containerIDs);
   }
@@ -334,7 +335,7 @@ class PipelineStateMap {
     Set<ContainerID> containerIDs = pipeline2container.get(pipelineID);
     if (containerIDs == null) {
       throw new PipelineNotFoundException(
-          String.format("%s not found", pipelineID));
+          format("%s not found", pipelineID));
     }
     return containerIDs.size();
   }
@@ -350,8 +351,8 @@ class PipelineStateMap {
 
     Pipeline pipeline = getPipeline(pipelineID);
     if (!pipeline.isClosed()) {
-      throw new IOException(
-          String.format("Pipeline with %s is not yet closed", pipelineID));
+      throw new InvalidPipelineStateException(
+          format("Pipeline with %s is not yet closed", pipelineID));
     }
 
     pipelineMap.remove(pipelineID);
@@ -377,7 +378,7 @@ class PipelineStateMap {
     Set<ContainerID> containerIDs = pipeline2container.get(pipelineID);
     if (containerIDs == null) {
       throw new PipelineNotFoundException(
-          String.format("%s not found", pipelineID));
+          format("%s not found", pipelineID));
     }
     containerIDs.remove(containerID);
   }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index c75cb937e5..338fdfbdd3 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -88,6 +88,7 @@ import static 
org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.ALLOCAT
 import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -788,8 +789,6 @@ public class TestPipelineManagerImpl {
 
   @Test
   public void testAddContainerWithClosedPipeline() throws Exception {
-    GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.
-        captureLogs(LoggerFactory.getLogger(PipelineStateMap.class));
     SCMHADBTransactionBuffer buffer = new 
SCMHADBTransactionBufferStub(dbStore);
     PipelineManagerImpl pipelineManager =
         createPipelineManager(true, buffer);
@@ -804,10 +803,9 @@ public class TestPipelineManagerImpl {
         pipelineID.getProtobuf(), HddsProtos.PipelineState.PIPELINE_CLOSED);
     buffer.flush();
     Assertions.assertTrue(pipelineStore.get(pipelineID).isClosed());
-    pipelineManager.addContainerToPipeline(pipelineID,
-        ContainerID.valueOf(2));
-    assertTrue(logCapturer.getOutput().contains(
-        "Adding container #2 to pipeline=" + pipelineID + " in CLOSED 
state."));
+    assertThrows(InvalidPipelineStateException.class,
+        () -> pipelineManager.addContainerToPipeline(pipelineID,
+        ContainerID.valueOf(2)));
   }
 
   @Test
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestScmApplyTransactionFailure.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestScmApplyTransactionFailure.java
new file mode 100644
index 0000000000..6486307697
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestScmApplyTransactionFailure.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.pipeline.DuplicatedPipelineIdException;
+import org.apache.hadoop.hdds.scm.pipeline.InvalidPipelineStateException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerImpl;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.ClientVersion.CURRENT_VERSION;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test-cases to verify SCMStateMachine.applyTransaction failure scenarios.
+ */
+@Timeout(300)
+public class TestScmApplyTransactionFailure {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+  private static StorageContainerManager scm;
+  private static ContainerManager containerManager;
+  private static PipelineManagerImpl pipelineManager;
+
+
+  @BeforeAll
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    cluster = MiniOzoneCluster.newHABuilder(conf).setSCMServiceId("test")
+        .setNumDatanodes(3).build();
+    conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 1000,
+        TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
+        1000, TimeUnit.MILLISECONDS);
+    cluster.waitForClusterToBeReady();
+    scm = cluster.getStorageContainerManager();
+    containerManager = scm.getContainerManager();
+    pipelineManager = (PipelineManagerImpl) scm.getPipelineManager();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterAll
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testAddContainerToClosedPipeline() throws Exception {
+    RatisReplicationConfig replication =
+        RatisReplicationConfig.getInstance(ReplicationFactor.THREE);
+    List<Pipeline> pipelines =
+        pipelineManager.getPipelines(replication, PipelineState.OPEN);
+    Pipeline pipeline = pipelines.get(0);
+
+    // if testing for not-found pipeline, remove pipeline when closing.
+    pipelineManager.closePipeline(pipeline, true);
+
+    // adding container to a closed pipeline should yield an error.
+    ContainerInfoProto containerInfo = createContainer(pipeline);
+    StateMachineException ex = assertThrows(StateMachineException.class,
+        () -> containerManager.getContainerStateManager()
+            .addContainer(containerInfo));
+    assertTrue(ex.getCause() instanceof InvalidPipelineStateException);
+    assertThrows(ContainerNotFoundException.class,
+        () -> containerManager.getContainer(
+            new ContainerID(containerInfo.getContainerID())));
+
+    // verify that SCMStateMachine is still functioning after the rejected
+    // transaction.
+    assertNotNull(containerManager.allocateContainer(replication, "test"));
+  }
+
+  @Test
+  public void testAddDuplicatePipelineId()
+      throws Exception {
+    RatisReplicationConfig replication =
+        RatisReplicationConfig.getInstance(ReplicationFactor.THREE);
+    Pipeline existing = pipelineManager.getPipelines(
+        replication, PipelineState.OPEN).get(0);
+
+    HddsProtos.Pipeline pipelineToCreate =
+        existing.getProtobufMessage(CURRENT_VERSION);
+    StateMachineException ex = assertThrows(StateMachineException.class,
+        () -> pipelineManager.getStateManager().addPipeline(
+            pipelineToCreate));
+    assertTrue(ex.getCause() instanceof DuplicatedPipelineIdException);
+  }
+
+  private ContainerInfoProto createContainer(Pipeline pipeline) {
+    final ContainerInfoProto.Builder containerInfoBuilder =
+        ContainerInfoProto.newBuilder()
+            .setState(HddsProtos.LifeCycleState.OPEN)
+            .setPipelineID(pipeline.getId().getProtobuf())
+            .setUsedBytes(0)
+            .setNumberOfKeys(0)
+            .setStateEnterTime(Time.now())
+            .setOwner("test")
+            .setContainerID(1)
+            .setDeleteTransactionId(0)
+            .setReplicationType(pipeline.getType());
+
+    containerInfoBuilder.setReplicationFactor(
+        ReplicationConfig.getLegacyFactor(pipeline.getReplicationConfig()));
+    return containerInfoBuilder.build();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to