This is an automated email from the ASF dual-hosted git repository.

devesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 2aa561783d HDDS-10880. Duplicate Pipeline ID Detected in 
ReconContainerManager. (#6742)
2aa561783d is described below

commit 2aa561783d025ea2528eca56d1592f2a4466511f
Author: Arafat2198 <[email protected]>
AuthorDate: Thu Jun 13 08:00:12 2024 +0530

    HDDS-10880. Duplicate Pipeline ID Detected in ReconContainerManager. (#6742)
    
    Thanks @adoroszlai , @dombizita @raju-balpande for review. Thanks 
@ArafatKhan2198 for working on this patch.
---
 .../ozone/recon/scm/ReconContainerManager.java     |  8 +--
 .../ozone/recon/scm/ReconPipelineManager.java      | 79 +++++++++++-----------
 .../recon/scm/ReconPipelineReportHandler.java      |  7 +-
 .../ozone/recon/scm/TestReconPipelineManager.java  | 36 ++++++++--
 4 files changed, 78 insertions(+), 52 deletions(-)

diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
index 9d7c88dfc4..7bf51fcc4d 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
@@ -229,11 +229,9 @@ public class ReconContainerManager extends 
ContainerManagerImpl {
     try {
       if (containerInfo.getState().equals(HddsProtos.LifeCycleState.OPEN)) {
         PipelineID pipelineID = containerWithPipeline.getPipeline().getId();
-        // Check if the pipeline is present in Recon
-        if (!pipelineManager.containsPipeline(pipelineID)) {
-          // Pipeline is not present, add it first.
-          LOG.info("Adding new pipeline {} from SCM.", pipelineID);
-          
reconPipelineManager.addPipeline(containerWithPipeline.getPipeline());
+        // Check if the pipeline is present in Recon if not add it.
+        if 
(reconPipelineManager.addPipeline(containerWithPipeline.getPipeline())) {
+          LOG.info("Added new pipeline {} to Recon pipeline metadata from 
SCM.", pipelineID);
         }
 
         getContainerStateManager().addContainer(containerInfo.getProtobuf());
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
index fab30cf20b..6bc54f3e6e 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
@@ -104,12 +104,11 @@ public final class ReconPipelineManager extends 
PipelineManagerImpl {
       List<Pipeline> pipelinesInHouse = getPipelines();
       LOG.info("Recon has {} pipelines in house.", pipelinesInHouse.size());
       for (Pipeline pipeline : pipelinesFromScm) {
-        if (!containsPipeline(pipeline.getId())) {
-          // New pipeline got from SCM. Recon does not know anything about it,
-          // so let's add it.
-          LOG.info("Adding new pipeline {} from SCM.", pipeline.getId());
-          addPipeline(pipeline);
+        // New pipeline got from SCM. Validate If it doesn't exist at Recon, 
try adding it.
+        if (addPipeline(pipeline)) {
+          LOG.info("Added new pipeline {} from SCM.", pipeline.getId());
         } else {
+          LOG.warn("Pipeline {} already exists in Recon pipeline metadata.", 
pipeline.getId());
           // Recon already has this pipeline. Just update state and creation
           // time.
           getStateManager().updatePipelineState(
@@ -118,60 +117,60 @@ public final class ReconPipelineManager extends 
PipelineManagerImpl {
           getPipeline(pipeline.getId()).setCreationTimestamp(
               pipeline.getCreationTimestamp());
         }
-        removeInvalidPipelines(pipelinesFromScm);
       }
+      removeInvalidPipelines(pipelinesFromScm);
     } finally {
       releaseWriteLock();
     }
   }
 
   public void removeInvalidPipelines(List<Pipeline> pipelinesFromScm) {
-    acquireWriteLock();
-    try {
-      List<Pipeline> pipelinesInHouse = getPipelines();
-      // Removing pipelines in Recon that are no longer in SCM.
-      // TODO Recon may need to track inactive pipelines as well. So this can 
be
-      // removed in a followup JIRA.
-      List<Pipeline> invalidPipelines = pipelinesInHouse
-          .stream()
-          .filter(p -> !pipelinesFromScm.contains(p))
-          .collect(Collectors.toList());
-      invalidPipelines.forEach(p -> {
-        PipelineID pipelineID = p.getId();
-        if (!p.getPipelineState().equals(CLOSED)) {
-          try {
-            getStateManager().updatePipelineState(
-                pipelineID.getProtobuf(),
-                HddsProtos.PipelineState.PIPELINE_CLOSED);
-          } catch (IOException e) {
-            LOG.warn("Pipeline {} not found while updating state. ",
-                p.getId(), e);
-          }
-        }
+    List<Pipeline> pipelinesInHouse = getPipelines();
+    // Removing pipelines in Recon that are no longer in SCM.
+    // TODO Recon may need to track inactive pipelines as well. So this can be
+    // removed in a followup JIRA.
+    List<Pipeline> invalidPipelines = pipelinesInHouse
+        .stream()
+        .filter(p -> !pipelinesFromScm.contains(p))
+        .collect(Collectors.toList());
+    invalidPipelines.forEach(p -> {
+      PipelineID pipelineID = p.getId();
+      if (!p.getPipelineState().equals(CLOSED)) {
         try {
-          LOG.info("Removing invalid pipeline {} from Recon.", pipelineID);
-          closePipeline(p.getId());
-          deletePipeline(p.getId());
+          getStateManager().updatePipelineState(
+              pipelineID.getProtobuf(),
+              HddsProtos.PipelineState.PIPELINE_CLOSED);
         } catch (IOException e) {
-          LOG.warn("Unable to remove pipeline {}", pipelineID, e);
+          LOG.warn("Pipeline {} not found while updating state. ",
+              p.getId(), e);
         }
-      });
-    } finally {
-      releaseWriteLock();
-    }
+      }
+      try {
+        LOG.info("Removing invalid pipeline {} from Recon.", pipelineID);
+        closePipeline(p.getId());
+        deletePipeline(p.getId());
+      } catch (IOException e) {
+        LOG.warn("Unable to remove pipeline {}", pipelineID, e);
+      }
+    });
   }
+
   /**
    * Add a new pipeline to the pipeline metadata.
    * @param pipeline pipeline
+   * @return true if the pipeline was added, false if it already existed
    * @throws IOException
    */
   @VisibleForTesting
-  public void addPipeline(Pipeline pipeline)
-      throws IOException {
+  public boolean addPipeline(Pipeline pipeline) throws IOException {
     acquireWriteLock();
     try {
-      getStateManager().addPipeline(
-          pipeline.getProtobufMessage(ClientVersion.CURRENT_VERSION));
+      // Check if the pipeline already exists
+      if (containsPipeline(pipeline.getId())) {
+        return false;
+      }
+      
getStateManager().addPipeline(pipeline.getProtobufMessage(ClientVersion.CURRENT_VERSION));
+      return true;
     } finally {
       releaseWriteLock();
     }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineReportHandler.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineReportHandler.java
index bf280b37e7..0f3ea4d9ca 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineReportHandler.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineReportHandler.java
@@ -83,9 +83,10 @@ public class ReconPipelineReportHandler extends 
PipelineReportHandler {
         throw ex;
       }
 
-      LOG.info("Adding new pipeline {} to Recon pipeline metadata.",
-          pipelineFromScm);
-      reconPipelineManager.addPipeline(pipelineFromScm);
+      if (reconPipelineManager.addPipeline(pipelineFromScm)) {
+        LOG.info("Pipeline {} verified from SCM and added to Recon pipeline 
metadata.",
+            pipelineFromScm);
+      }
     }
 
     Pipeline pipeline;
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java
index 92af652c25..d723ee75e8 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java
@@ -58,16 +58,17 @@ import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
 import static 
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandomPipeline;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -208,6 +209,33 @@ public class TestReconPipelineManager {
     assertTrue(reconPipelineManager.containsPipeline(pipeline.getId()));
   }
 
+  @Test
+  public void testDuplicatePipelineHandling() throws IOException {
+    Pipeline pipeline = getRandomPipeline();
+    NetworkTopology clusterMap = new NetworkTopologyImpl(conf);
+    EventQueue eventQueue = new EventQueue();
+    versionManager = mock(HDDSLayoutVersionManager.class);
+    
when(versionManager.getMetadataLayoutVersion()).thenReturn(maxLayoutVersion());
+    
when(versionManager.getSoftwareLayoutVersion()).thenReturn(maxLayoutVersion());
+    NodeManager nodeManager =
+        new SCMNodeManager(conf, scmStorageConfig, eventQueue, clusterMap,
+            SCMContext.emptyContext(), versionManager);
+
+    ReconPipelineManager reconPipelineManager =
+        ReconPipelineManager.newReconPipelineManager(conf, nodeManager,
+            ReconSCMDBDefinition.PIPELINES.getTable(store), eventQueue,
+            scmhaManager, scmContext);
+
+    // Add the pipeline for the first time
+    reconPipelineManager.addPipeline(pipeline);
+
+    // Attempt to add the same pipeline again and ensure no exception is thrown
+    assertDoesNotThrow(() -> {
+      reconPipelineManager.addPipeline(pipeline);
+    }, "Exception was thrown when adding a duplicate pipeline.");
+  }
+
+
   @Test
   public void testStubbedReconPipelineFactory() throws IOException {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to