This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-2823 by this push:
     new c957e2b  HDDS-4532. Update pipeline db when pipeline state is changed. 
(#1785)
c957e2b is described below

commit c957e2bb4ce0b9811e7080d41746881fd13b314c
Author: bshashikant <[email protected]>
AuthorDate: Tue Jan 19 07:53:37 2021 +0530

    HDDS-4532. Update pipeline db when pipeline state is changed. (#1785)
---
 .../hdds/scm/pipeline/PipelineManagerV2Impl.java   |  5 +++
 .../scm/pipeline/PipelineStateManagerV2Impl.java   | 44 +++++++++++++++++-----
 .../hdds/scm/pipeline/SCMPipelineManager.java      |  6 +--
 .../hdds/scm/pipeline/TestPipelineManagerImpl.java | 20 +++++++---
 4 files changed, 56 insertions(+), 19 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
index 7d6c88a..823c9bd 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -511,6 +511,11 @@ public final class PipelineManagerV2Impl implements 
PipelineManager {
 
     // shutdown pipeline provider.
     pipelineFactory.shutdown();
+    try {
+      stateManager.close();
+    } catch (Exception ex) {
+      LOG.error("PipelineStateManager close failed", ex);
+    }
   }
 
   @Override
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
index c9fb002..06bae4c 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
@@ -51,7 +51,7 @@ public class PipelineStateManagerV2Impl implements 
StateManager {
 
   private final PipelineStateMap pipelineStateMap;
   private final NodeManager nodeManager;
-  private final Table<PipelineID, Pipeline> pipelineStore;
+  private Table<PipelineID, Pipeline> pipelineStore;
 
   // Protect potential contentions between RaftServer and PipelineManager.
   // See https://issues.apache.org/jira/browse/HDDS-4560
@@ -89,10 +89,12 @@ public class PipelineStateManagerV2Impl implements 
StateManager {
     lock.writeLock().lock();
     try {
       Pipeline pipeline = Pipeline.getFromProtobuf(pipelineProto);
-      pipelineStore.put(pipeline.getId(), pipeline);
-      pipelineStateMap.addPipeline(pipeline);
-      nodeManager.addPipeline(pipeline);
-      LOG.info("Created pipeline {}.", pipeline);
+      if (pipelineStore != null) {
+        pipelineStore.put(pipeline.getId(), pipeline);
+        pipelineStateMap.addPipeline(pipeline);
+        nodeManager.addPipeline(pipeline);
+        LOG.info("Created pipeline {}.", pipeline);
+      }
     } finally {
       lock.writeLock().unlock();
     }
@@ -216,7 +218,9 @@ public class PipelineStateManagerV2Impl implements 
StateManager {
     lock.writeLock().lock();
     try {
       PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineIDProto);
-      pipelineStore.delete(pipelineID);
+      if (pipelineStore != null) {
+        pipelineStore.delete(pipelineID);
+      }
       Pipeline pipeline = pipelineStateMap.removePipeline(pipelineID);
       nodeManager.removePipeline(pipeline);
       LOG.info("Pipeline {} removed.", pipeline);
@@ -241,11 +245,23 @@ public class PipelineStateManagerV2Impl implements 
StateManager {
   public void updatePipelineState(
       HddsProtos.PipelineID pipelineIDProto, HddsProtos.PipelineState newState)
       throws IOException {
+    PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineIDProto);
+    Pipeline.PipelineState oldState =
+        getPipeline(pipelineID).getPipelineState();
     lock.writeLock().lock();
     try {
-      pipelineStateMap.updatePipelineState(
-          PipelineID.getFromProtobuf(pipelineIDProto),
-          Pipeline.PipelineState.fromProtobuf(newState));
+      // null check is here to prevent the case where SCM store
+      // is closed but the staleNode handlers/pipeline creations
+      // still try to access it.
+      if (pipelineStore != null) {
+        pipelineStateMap.updatePipelineState(pipelineID,
+            Pipeline.PipelineState.fromProtobuf(newState));
+        pipelineStore.put(pipelineID, getPipeline(pipelineID));
+      }
+    } catch (IOException ex) {
+      LOG.warn("Pipeline {} state update failed", pipelineID);
+      // revert back to old state in memory
+      pipelineStateMap.updatePipelineState(pipelineID, oldState);
     } finally {
       lock.writeLock().unlock();
     }
@@ -253,7 +269,15 @@ public class PipelineStateManagerV2Impl implements 
StateManager {
 
   @Override
   public void close() throws Exception {
-    pipelineStore.close();
+    lock.writeLock().lock();
+    try {
+      pipelineStore.close();
+      pipelineStore = null;
+    } catch (Exception ex) {
+      LOG.error("Pipeline  store close failed", ex);
+    } finally {
+      lock.writeLock().unlock();
+    }
   }
 
   // TODO Remove legacy
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 1e5d505..dcae32e 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -407,7 +407,7 @@ public class SCMPipelineManager implements PipelineManager {
   }
 
   private void updatePipelineStateInDb(PipelineID pipelineId,
-                                       Pipeline.PipelineState state)
+                                       Pipeline.PipelineState oldState)
           throws IOException {
     // null check is here to prevent the case where SCM store
     // is closed but the staleNode handlers/pipleine creations
@@ -416,9 +416,9 @@ public class SCMPipelineManager implements PipelineManager {
       try {
         pipelineStore.put(pipelineId, getPipeline(pipelineId));
       } catch (IOException ex) {
-        LOG.info("Pipeline {} state update failed", pipelineId);
+        LOG.warn("Pipeline {} state update failed", pipelineId);
         // revert back to old state in memory
-        stateManager.updatePipelineState(pipelineId, state);
+        stateManager.updatePipelineState(pipelineId, oldState);
       }
     }
   }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index 2415b2b..10d978e 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -34,6 +34,7 @@ import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -119,13 +120,13 @@ public class TestPipelineManagerImpl {
 
     PipelineManagerV2Impl pipelineManager2 = createPipelineManager(true);
     // Should be able to load previous pipelines.
-    Assert.assertFalse(pipelineManager.getPipelines().isEmpty());
+    Assert.assertFalse(pipelineManager2.getPipelines().isEmpty());
     Assert.assertEquals(2, pipelineManager.getPipelines().size());
-    pipelineManager.allowPipelineCreation();
-    Pipeline pipeline3 = pipelineManager.createPipeline(
+    pipelineManager2.allowPipelineCreation();
+    Pipeline pipeline3 = pipelineManager2.createPipeline(
         HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
-    Assert.assertEquals(3, pipelineManager.getPipelines().size());
-    Assert.assertTrue(pipelineManager.containsPipeline(pipeline3.getId()));
+    Assert.assertEquals(3, pipelineManager2.getPipelines().size());
+    Assert.assertTrue(pipelineManager2.containsPipeline(pipeline3.getId()));
 
     pipelineManager2.close();
   }
@@ -149,12 +150,16 @@ public class TestPipelineManagerImpl {
   @Test
   public void testUpdatePipelineStates() throws Exception {
     PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+    Table<PipelineID, Pipeline> pipelineStore =
+        SCMDBDefinition.PIPELINES.getTable(dbStore);
     pipelineManager.allowPipelineCreation();
     Pipeline pipeline = pipelineManager.createPipeline(
         HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
     Assert.assertEquals(1, pipelineManager.getPipelines().size());
     Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId()));
     Assert.assertEquals(ALLOCATED, pipeline.getPipelineState());
+    Assert.assertEquals(ALLOCATED,
+        pipelineStore.get(pipeline.getId()).getPipelineState());
     PipelineID pipelineID = pipeline.getId();
 
     pipelineManager.openPipeline(pipelineID);
@@ -163,10 +168,13 @@ public class TestPipelineManagerImpl {
         .getPipelines(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE,
             Pipeline.PipelineState.OPEN).contains(pipeline));
+    Assert.assertTrue(pipelineStore.get(pipeline.getId()).isOpen());
 
     pipelineManager.deactivatePipeline(pipeline.getId());
     Assert.assertEquals(Pipeline.PipelineState.DORMANT,
         pipelineManager.getPipeline(pipelineID).getPipelineState());
+    Assert.assertEquals(Pipeline.PipelineState.DORMANT,
+        pipelineStore.get(pipeline.getId()).getPipelineState());
     Assert.assertFalse(pipelineManager
         .getPipelines(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE,
@@ -177,7 +185,7 @@ public class TestPipelineManagerImpl {
         .getPipelines(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE,
             Pipeline.PipelineState.OPEN).contains(pipeline));
-
+    Assert.assertTrue(pipelineStore.get(pipeline.getId()).isOpen());
     pipelineManager.close();
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to