This is an automated email from the ASF dual-hosted git repository.
ritesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new a5aa549dc8 HDDS-7829. SCM to reject adding container to closed
pipeline (#4212)
a5aa549dc8 is described below
commit a5aa549dc84433ac74aecf08024b4081c5a55574
Author: Duong Nguyen <[email protected]>
AuthorDate: Thu Feb 9 20:03:43 2023 -0800
HDDS-7829. SCM to reject adding container to closed pipeline (#4212)
---
.../hadoop/hdds/scm/exceptions/SCMException.java | 2 +
.../pipeline/DuplicatedPipelineIdException.java | 31 +++++
.../pipeline/InvalidPipelineStateException.java | 30 +++++
.../src/main/proto/ScmServerProtocol.proto | 2 +
.../apache/hadoop/hdds/scm/ha/SCMStateMachine.java | 17 ++-
.../scm/pipeline/PipelineStateManagerImpl.java | 4 +-
.../hadoop/hdds/scm/pipeline/PipelineStateMap.java | 23 ++--
.../hdds/scm/pipeline/TestPipelineManagerImpl.java | 10 +-
.../container/TestScmApplyTransactionFailure.java | 147 +++++++++++++++++++++
9 files changed, 246 insertions(+), 20 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
index cd1ea08ac0..7ded5f869a 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
@@ -133,5 +133,7 @@ public class SCMException extends IOException {
CONTAINER_REPLICA_NOT_FOUND,
FAILED_TO_CONNECT_TO_CRL_SERVICE,
FAILED_TO_ADD_CRL_CLIENT,
+ INVALID_PIPELINE_STATE,
+ DUPLICATED_PIPELINE_ID
}
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/DuplicatedPipelineIdException.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/DuplicatedPipelineIdException.java
new file mode 100644
index 0000000000..dce3b7bbfc
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/DuplicatedPipelineIdException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+
+/**
+ * Signals that a pipeline id is found duplicated.
+ */
+public class DuplicatedPipelineIdException extends SCMException {
+
+ public DuplicatedPipelineIdException(String message) {
+ super(message, ResultCodes.DUPLICATED_PIPELINE_ID);
+ }
+}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/InvalidPipelineStateException.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/InvalidPipelineStateException.java
new file mode 100644
index 0000000000..040db21ba2
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/InvalidPipelineStateException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+
+/**
+ * Signals that a pipeline state is not valid for an operation.
+ */
+public class InvalidPipelineStateException extends SCMException {
+ public InvalidPipelineStateException(String message) {
+ super(message, ResultCodes.INVALID_PIPELINE_STATE);
+ }
+}
diff --git
a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
index 90ce7f7bbf..6df2db8569 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
@@ -130,6 +130,8 @@ enum Status {
CONTAINER_REPLICA_NOT_FOUND = 37;
FAILED_TO_CONNECT_TO_CRL_SERVICE = 38;
FAILED_TO_ADD_CRL_CLIENT = 39;
+ INVALID_PIPELINE_STATE = 40;
+ DUPLICATED_PIPELINE_ID = 41;
}
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index 3d0f226460..5e5298a967 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -34,6 +34,8 @@ import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
@@ -136,7 +138,20 @@ public class SCMStateMachine extends BaseStateMachine {
try {
final SCMRatisRequest request = SCMRatisRequest.decode(
Message.valueOf(trx.getStateMachineLogEntry().getLogData()));
- applyTransactionFuture.complete(process(request));
+
+ try {
+ applyTransactionFuture.complete(process(request));
+ } catch (SCMException ex) {
+ // For SCM exceptions while applying a transaction, if the error
+ // code indicate a FATAL issue, let it crash SCM.
+ if (ex.getResult() == ResultCodes.INTERNAL_ERROR
+ || ex.getResult() == ResultCodes.IO_EXCEPTION) {
+ throw ex;
+ }
+ // Otherwise, it's considered as a logical rejection and is returned to
+ // Ratis client, leaving SCM intact.
+ applyTransactionFuture.completeExceptionally(ex);
+ }
// After previous term transactions are applied, still in safe mode,
// perform refreshAndValidate to update the safemode rule state.
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerImpl.java
index 29a8f05712..8d9b7744c1 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerImpl.java
@@ -96,10 +96,10 @@ public class PipelineStateManagerImpl implements
PipelineStateManager {
try {
Pipeline pipeline = Pipeline.getFromProtobuf(pipelineProto);
if (pipelineStore != null) {
- transactionBuffer
- .addToBuffer(pipelineStore, pipeline.getId(), pipeline);
pipelineStateMap.addPipeline(pipeline);
nodeManager.addPipeline(pipeline);
+ transactionBuffer
+ .addToBuffer(pipelineStore, pipeline.getId(), pipeline);
LOG.info("Created pipeline {}.", pipeline);
}
} finally {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
index 5db887b674..a2c9488eef 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
@@ -39,6 +39,8 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import static java.lang.String.format;
+
/**
* Holds the data structures which maintain the information about pipeline and
* its state.
@@ -80,8 +82,8 @@ class PipelineStateMap {
if (pipelineMap.putIfAbsent(pipeline.getId(), pipeline) != null) {
LOG.warn("Duplicate pipeline ID detected. {}", pipeline.getId());
- throw new IOException(String
- .format("Duplicate pipeline ID %s detected.", pipeline.getId()));
+ throw new DuplicatedPipelineIdException(
+ format("Duplicate pipeline ID %s detected.", pipeline.getId()));
}
pipeline2container.put(pipeline.getId(), new TreeSet<>());
if (pipeline.getPipelineState() == PipelineState.OPEN) {
@@ -106,9 +108,8 @@ class PipelineStateMap {
Pipeline pipeline = getPipeline(pipelineID);
if (pipeline.isClosed()) {
- LOG.warn("Adding container {} to pipeline={} in CLOSED state." +
- " This happens only for some exceptional cases." +
- " Check for the previous exceptions.", containerID, pipelineID);
+ throw new InvalidPipelineStateException(format(
+ "Cannot add container to pipeline=%s in closed state", pipelineID));
}
pipeline2container.get(pipelineID).add(containerID);
}
@@ -154,7 +155,7 @@ class PipelineStateMap {
Pipeline pipeline = pipelineMap.get(pipelineID);
if (pipeline == null) {
throw new PipelineNotFoundException(
- String.format("%s not found", pipelineID));
+ format("%s not found", pipelineID));
}
return pipeline;
}
@@ -314,7 +315,7 @@ class PipelineStateMap {
NavigableSet<ContainerID> containerIDs =
pipeline2container.get(pipelineID);
if (containerIDs == null) {
throw new PipelineNotFoundException(
- String.format("%s not found", pipelineID));
+ format("%s not found", pipelineID));
}
return new TreeSet<>(containerIDs);
}
@@ -334,7 +335,7 @@ class PipelineStateMap {
Set<ContainerID> containerIDs = pipeline2container.get(pipelineID);
if (containerIDs == null) {
throw new PipelineNotFoundException(
- String.format("%s not found", pipelineID));
+ format("%s not found", pipelineID));
}
return containerIDs.size();
}
@@ -350,8 +351,8 @@ class PipelineStateMap {
Pipeline pipeline = getPipeline(pipelineID);
if (!pipeline.isClosed()) {
- throw new IOException(
- String.format("Pipeline with %s is not yet closed", pipelineID));
+ throw new InvalidPipelineStateException(
+ format("Pipeline with %s is not yet closed", pipelineID));
}
pipelineMap.remove(pipelineID);
@@ -377,7 +378,7 @@ class PipelineStateMap {
Set<ContainerID> containerIDs = pipeline2container.get(pipelineID);
if (containerIDs == null) {
throw new PipelineNotFoundException(
- String.format("%s not found", pipelineID));
+ format("%s not found", pipelineID));
}
containerIDs.remove(containerID);
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index c75cb937e5..338fdfbdd3 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -88,6 +88,7 @@ import static
org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.ALLOCAT
import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -788,8 +789,6 @@ public class TestPipelineManagerImpl {
@Test
public void testAddContainerWithClosedPipeline() throws Exception {
- GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.
- captureLogs(LoggerFactory.getLogger(PipelineStateMap.class));
SCMHADBTransactionBuffer buffer = new
SCMHADBTransactionBufferStub(dbStore);
PipelineManagerImpl pipelineManager =
createPipelineManager(true, buffer);
@@ -804,10 +803,9 @@ public class TestPipelineManagerImpl {
pipelineID.getProtobuf(), HddsProtos.PipelineState.PIPELINE_CLOSED);
buffer.flush();
Assertions.assertTrue(pipelineStore.get(pipelineID).isClosed());
- pipelineManager.addContainerToPipeline(pipelineID,
- ContainerID.valueOf(2));
- assertTrue(logCapturer.getOutput().contains(
- "Adding container #2 to pipeline=" + pipelineID + " in CLOSED
state."));
+ assertThrows(InvalidPipelineStateException.class,
+ () -> pipelineManager.addContainerToPipeline(pipelineID,
+ ContainerID.valueOf(2)));
}
@Test
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestScmApplyTransactionFailure.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestScmApplyTransactionFailure.java
new file mode 100644
index 0000000000..6486307697
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestScmApplyTransactionFailure.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.pipeline.DuplicatedPipelineIdException;
+import org.apache.hadoop.hdds.scm.pipeline.InvalidPipelineStateException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerImpl;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.ClientVersion.CURRENT_VERSION;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test-cases to verify SCMStateMachine.applyTransaction failure scenarios.
+ */
+@Timeout(300)
+public class TestScmApplyTransactionFailure {
+
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration conf;
+ private static StorageContainerManager scm;
+ private static ContainerManager containerManager;
+ private static PipelineManagerImpl pipelineManager;
+
+
+ @BeforeAll
+ public static void init() throws Exception {
+ conf = new OzoneConfiguration();
+ cluster = MiniOzoneCluster.newHABuilder(conf).setSCMServiceId("test")
+ .setNumDatanodes(3).build();
+ conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 1000,
+ TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
+ 1000, TimeUnit.MILLISECONDS);
+ cluster.waitForClusterToBeReady();
+ scm = cluster.getStorageContainerManager();
+ containerManager = scm.getContainerManager();
+ pipelineManager = (PipelineManagerImpl) scm.getPipelineManager();
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ @AfterAll
+ public static void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testAddContainerToClosedPipeline() throws Exception {
+ RatisReplicationConfig replication =
+ RatisReplicationConfig.getInstance(ReplicationFactor.THREE);
+ List<Pipeline> pipelines =
+ pipelineManager.getPipelines(replication, PipelineState.OPEN);
+ Pipeline pipeline = pipelines.get(0);
+
+ // if testing for not-found pipeline, remove pipeline when closing.
+ pipelineManager.closePipeline(pipeline, true);
+
+ // adding container to a closed pipeline should yield an error.
+ ContainerInfoProto containerInfo = createContainer(pipeline);
+ StateMachineException ex = assertThrows(StateMachineException.class,
+ () -> containerManager.getContainerStateManager()
+ .addContainer(containerInfo));
+ assertTrue(ex.getCause() instanceof InvalidPipelineStateException);
+ assertThrows(ContainerNotFoundException.class,
+ () -> containerManager.getContainer(
+ new ContainerID(containerInfo.getContainerID())));
+
+ // verify that SCMStateMachine is still functioning after the rejected
+ // transaction.
+ assertNotNull(containerManager.allocateContainer(replication, "test"));
+ }
+
+ @Test
+ public void testAddDuplicatePipelineId()
+ throws Exception {
+ RatisReplicationConfig replication =
+ RatisReplicationConfig.getInstance(ReplicationFactor.THREE);
+ Pipeline existing = pipelineManager.getPipelines(
+ replication, PipelineState.OPEN).get(0);
+
+ HddsProtos.Pipeline pipelineToCreate =
+ existing.getProtobufMessage(CURRENT_VERSION);
+ StateMachineException ex = assertThrows(StateMachineException.class,
+ () -> pipelineManager.getStateManager().addPipeline(
+ pipelineToCreate));
+ assertTrue(ex.getCause() instanceof DuplicatedPipelineIdException);
+ }
+
+ private ContainerInfoProto createContainer(Pipeline pipeline) {
+ final ContainerInfoProto.Builder containerInfoBuilder =
+ ContainerInfoProto.newBuilder()
+ .setState(HddsProtos.LifeCycleState.OPEN)
+ .setPipelineID(pipeline.getId().getProtobuf())
+ .setUsedBytes(0)
+ .setNumberOfKeys(0)
+ .setStateEnterTime(Time.now())
+ .setOwner("test")
+ .setContainerID(1)
+ .setDeleteTransactionId(0)
+ .setReplicationType(pipeline.getType());
+
+ containerInfoBuilder.setReplicationFactor(
+ ReplicationConfig.getLegacyFactor(pipeline.getReplicationConfig()));
+ return containerInfoBuilder.build();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]