This is an automated email from the ASF dual-hosted git repository.
devesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 2aa561783d HDDS-10880. Duplicate Pipeline ID Detected in
ReconContainerManager. (#6742)
2aa561783d is described below
commit 2aa561783d025ea2528eca56d1592f2a4466511f
Author: Arafat2198 <[email protected]>
AuthorDate: Thu Jun 13 08:00:12 2024 +0530
HDDS-10880. Duplicate Pipeline ID Detected in ReconContainerManager. (#6742)
Thanks @adoroszlai , @dombizita @raju-balpande for review. Thanks
@ArafatKhan2198 for working on this patch.
---
.../ozone/recon/scm/ReconContainerManager.java | 8 +--
.../ozone/recon/scm/ReconPipelineManager.java | 79 +++++++++++-----------
.../recon/scm/ReconPipelineReportHandler.java | 7 +-
.../ozone/recon/scm/TestReconPipelineManager.java | 36 ++++++++--
4 files changed, 78 insertions(+), 52 deletions(-)
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
index 9d7c88dfc4..7bf51fcc4d 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
@@ -229,11 +229,9 @@ public class ReconContainerManager extends
ContainerManagerImpl {
try {
if (containerInfo.getState().equals(HddsProtos.LifeCycleState.OPEN)) {
PipelineID pipelineID = containerWithPipeline.getPipeline().getId();
- // Check if the pipeline is present in Recon
- if (!pipelineManager.containsPipeline(pipelineID)) {
- // Pipeline is not present, add it first.
- LOG.info("Adding new pipeline {} from SCM.", pipelineID);
-
reconPipelineManager.addPipeline(containerWithPipeline.getPipeline());
+ // Check if the pipeline is present in Recon if not add it.
+ if
(reconPipelineManager.addPipeline(containerWithPipeline.getPipeline())) {
+ LOG.info("Added new pipeline {} to Recon pipeline metadata from
SCM.", pipelineID);
}
getContainerStateManager().addContainer(containerInfo.getProtobuf());
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
index fab30cf20b..6bc54f3e6e 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
@@ -104,12 +104,11 @@ public final class ReconPipelineManager extends
PipelineManagerImpl {
List<Pipeline> pipelinesInHouse = getPipelines();
LOG.info("Recon has {} pipelines in house.", pipelinesInHouse.size());
for (Pipeline pipeline : pipelinesFromScm) {
- if (!containsPipeline(pipeline.getId())) {
- // New pipeline got from SCM. Recon does not know anything about it,
- // so let's add it.
- LOG.info("Adding new pipeline {} from SCM.", pipeline.getId());
- addPipeline(pipeline);
+ // New pipeline got from SCM. Validate If it doesn't exist at Recon,
try adding it.
+ if (addPipeline(pipeline)) {
+ LOG.info("Added new pipeline {} from SCM.", pipeline.getId());
} else {
+ LOG.warn("Pipeline {} already exists in Recon pipeline metadata.",
pipeline.getId());
// Recon already has this pipeline. Just update state and creation
// time.
getStateManager().updatePipelineState(
@@ -118,60 +117,60 @@ public final class ReconPipelineManager extends
PipelineManagerImpl {
getPipeline(pipeline.getId()).setCreationTimestamp(
pipeline.getCreationTimestamp());
}
- removeInvalidPipelines(pipelinesFromScm);
}
+ removeInvalidPipelines(pipelinesFromScm);
} finally {
releaseWriteLock();
}
}
public void removeInvalidPipelines(List<Pipeline> pipelinesFromScm) {
- acquireWriteLock();
- try {
- List<Pipeline> pipelinesInHouse = getPipelines();
- // Removing pipelines in Recon that are no longer in SCM.
- // TODO Recon may need to track inactive pipelines as well. So this can
be
- // removed in a followup JIRA.
- List<Pipeline> invalidPipelines = pipelinesInHouse
- .stream()
- .filter(p -> !pipelinesFromScm.contains(p))
- .collect(Collectors.toList());
- invalidPipelines.forEach(p -> {
- PipelineID pipelineID = p.getId();
- if (!p.getPipelineState().equals(CLOSED)) {
- try {
- getStateManager().updatePipelineState(
- pipelineID.getProtobuf(),
- HddsProtos.PipelineState.PIPELINE_CLOSED);
- } catch (IOException e) {
- LOG.warn("Pipeline {} not found while updating state. ",
- p.getId(), e);
- }
- }
+ List<Pipeline> pipelinesInHouse = getPipelines();
+ // Removing pipelines in Recon that are no longer in SCM.
+ // TODO Recon may need to track inactive pipelines as well. So this can be
+ // removed in a followup JIRA.
+ List<Pipeline> invalidPipelines = pipelinesInHouse
+ .stream()
+ .filter(p -> !pipelinesFromScm.contains(p))
+ .collect(Collectors.toList());
+ invalidPipelines.forEach(p -> {
+ PipelineID pipelineID = p.getId();
+ if (!p.getPipelineState().equals(CLOSED)) {
try {
- LOG.info("Removing invalid pipeline {} from Recon.", pipelineID);
- closePipeline(p.getId());
- deletePipeline(p.getId());
+ getStateManager().updatePipelineState(
+ pipelineID.getProtobuf(),
+ HddsProtos.PipelineState.PIPELINE_CLOSED);
} catch (IOException e) {
- LOG.warn("Unable to remove pipeline {}", pipelineID, e);
+ LOG.warn("Pipeline {} not found while updating state. ",
+ p.getId(), e);
}
- });
- } finally {
- releaseWriteLock();
- }
+ }
+ try {
+ LOG.info("Removing invalid pipeline {} from Recon.", pipelineID);
+ closePipeline(p.getId());
+ deletePipeline(p.getId());
+ } catch (IOException e) {
+ LOG.warn("Unable to remove pipeline {}", pipelineID, e);
+ }
+ });
}
+
/**
* Add a new pipeline to the pipeline metadata.
* @param pipeline pipeline
+ * @return true if the pipeline was added, false if it already existed
* @throws IOException
*/
@VisibleForTesting
- public void addPipeline(Pipeline pipeline)
- throws IOException {
+ public boolean addPipeline(Pipeline pipeline) throws IOException {
acquireWriteLock();
try {
- getStateManager().addPipeline(
- pipeline.getProtobufMessage(ClientVersion.CURRENT_VERSION));
+ // Check if the pipeline already exists
+ if (containsPipeline(pipeline.getId())) {
+ return false;
+ }
+
getStateManager().addPipeline(pipeline.getProtobufMessage(ClientVersion.CURRENT_VERSION));
+ return true;
} finally {
releaseWriteLock();
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineReportHandler.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineReportHandler.java
index bf280b37e7..0f3ea4d9ca 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineReportHandler.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineReportHandler.java
@@ -83,9 +83,10 @@ public class ReconPipelineReportHandler extends
PipelineReportHandler {
throw ex;
}
- LOG.info("Adding new pipeline {} to Recon pipeline metadata.",
- pipelineFromScm);
- reconPipelineManager.addPipeline(pipelineFromScm);
+ if (reconPipelineManager.addPipeline(pipelineFromScm)) {
+ LOG.info("Pipeline {} verified from SCM and added to Recon pipeline
metadata.",
+ pipelineFromScm);
+ }
}
Pipeline pipeline;
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java
index 92af652c25..d723ee75e8 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java
@@ -58,16 +58,17 @@ import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandomPipeline;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -208,6 +209,33 @@ public class TestReconPipelineManager {
assertTrue(reconPipelineManager.containsPipeline(pipeline.getId()));
}
+ @Test
+ public void testDuplicatePipelineHandling() throws IOException {
+ Pipeline pipeline = getRandomPipeline();
+ NetworkTopology clusterMap = new NetworkTopologyImpl(conf);
+ EventQueue eventQueue = new EventQueue();
+ versionManager = mock(HDDSLayoutVersionManager.class);
+
when(versionManager.getMetadataLayoutVersion()).thenReturn(maxLayoutVersion());
+
when(versionManager.getSoftwareLayoutVersion()).thenReturn(maxLayoutVersion());
+ NodeManager nodeManager =
+ new SCMNodeManager(conf, scmStorageConfig, eventQueue, clusterMap,
+ SCMContext.emptyContext(), versionManager);
+
+ ReconPipelineManager reconPipelineManager =
+ ReconPipelineManager.newReconPipelineManager(conf, nodeManager,
+ ReconSCMDBDefinition.PIPELINES.getTable(store), eventQueue,
+ scmhaManager, scmContext);
+
+ // Add the pipeline for the first time
+ reconPipelineManager.addPipeline(pipeline);
+
+ // Attempt to add the same pipeline again and ensure no exception is thrown
+ assertDoesNotThrow(() -> {
+ reconPipelineManager.addPipeline(pipeline);
+ }, "Exception was thrown when adding a duplicate pipeline.");
+ }
+
+
@Test
public void testStubbedReconPipelineFactory() throws IOException {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]