This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-2823 by this push:
new c957e2b HDDS-4532. Update pipeline db when pipeline state is changed.
(#1785)
c957e2b is described below
commit c957e2bb4ce0b9811e7080d41746881fd13b314c
Author: bshashikant <[email protected]>
AuthorDate: Tue Jan 19 07:53:37 2021 +0530
HDDS-4532. Update pipeline db when pipeline state is changed. (#1785)
---
.../hdds/scm/pipeline/PipelineManagerV2Impl.java | 5 +++
.../scm/pipeline/PipelineStateManagerV2Impl.java | 44 +++++++++++++++++-----
.../hdds/scm/pipeline/SCMPipelineManager.java | 6 +--
.../hdds/scm/pipeline/TestPipelineManagerImpl.java | 20 +++++++---
4 files changed, 56 insertions(+), 19 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
index 7d6c88a..823c9bd 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -511,6 +511,11 @@ public final class PipelineManagerV2Impl implements
PipelineManager {
// shutdown pipeline provider.
pipelineFactory.shutdown();
+ try {
+ stateManager.close();
+ } catch (Exception ex) {
+ LOG.error("PipelineStateManager close failed", ex);
+ }
}
@Override
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
index c9fb002..06bae4c 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
@@ -51,7 +51,7 @@ public class PipelineStateManagerV2Impl implements
StateManager {
private final PipelineStateMap pipelineStateMap;
private final NodeManager nodeManager;
- private final Table<PipelineID, Pipeline> pipelineStore;
+ private Table<PipelineID, Pipeline> pipelineStore;
// Protect potential contentions between RaftServer and PipelineManager.
// See https://issues.apache.org/jira/browse/HDDS-4560
@@ -89,10 +89,12 @@ public class PipelineStateManagerV2Impl implements
StateManager {
lock.writeLock().lock();
try {
Pipeline pipeline = Pipeline.getFromProtobuf(pipelineProto);
- pipelineStore.put(pipeline.getId(), pipeline);
- pipelineStateMap.addPipeline(pipeline);
- nodeManager.addPipeline(pipeline);
- LOG.info("Created pipeline {}.", pipeline);
+ if (pipelineStore != null) {
+ pipelineStore.put(pipeline.getId(), pipeline);
+ pipelineStateMap.addPipeline(pipeline);
+ nodeManager.addPipeline(pipeline);
+ LOG.info("Created pipeline {}.", pipeline);
+ }
} finally {
lock.writeLock().unlock();
}
@@ -216,7 +218,9 @@ public class PipelineStateManagerV2Impl implements
StateManager {
lock.writeLock().lock();
try {
PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineIDProto);
- pipelineStore.delete(pipelineID);
+ if (pipelineStore != null) {
+ pipelineStore.delete(pipelineID);
+ }
Pipeline pipeline = pipelineStateMap.removePipeline(pipelineID);
nodeManager.removePipeline(pipeline);
LOG.info("Pipeline {} removed.", pipeline);
@@ -241,11 +245,23 @@ public class PipelineStateManagerV2Impl implements
StateManager {
public void updatePipelineState(
HddsProtos.PipelineID pipelineIDProto, HddsProtos.PipelineState newState)
throws IOException {
+ PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineIDProto);
+ Pipeline.PipelineState oldState =
+ getPipeline(pipelineID).getPipelineState();
lock.writeLock().lock();
try {
- pipelineStateMap.updatePipelineState(
- PipelineID.getFromProtobuf(pipelineIDProto),
- Pipeline.PipelineState.fromProtobuf(newState));
+ // null check is here to prevent the case where SCM store
+ // is closed but the staleNode handlers/pipeline creations
+ // still try to access it.
+ if (pipelineStore != null) {
+ pipelineStateMap.updatePipelineState(pipelineID,
+ Pipeline.PipelineState.fromProtobuf(newState));
+ pipelineStore.put(pipelineID, getPipeline(pipelineID));
+ }
+ } catch (IOException ex) {
+ LOG.warn("Pipeline {} state update failed", pipelineID);
+ // revert back to old state in memory
+ pipelineStateMap.updatePipelineState(pipelineID, oldState);
} finally {
lock.writeLock().unlock();
}
@@ -253,7 +269,15 @@ public class PipelineStateManagerV2Impl implements
StateManager {
@Override
public void close() throws Exception {
- pipelineStore.close();
+ lock.writeLock().lock();
+ try {
+ pipelineStore.close();
+ pipelineStore = null;
+ } catch (Exception ex) {
+ LOG.error("Pipeline store close failed", ex);
+ } finally {
+ lock.writeLock().unlock();
+ }
}
// TODO Remove legacy
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 1e5d505..dcae32e 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -407,7 +407,7 @@ public class SCMPipelineManager implements PipelineManager {
}
private void updatePipelineStateInDb(PipelineID pipelineId,
- Pipeline.PipelineState state)
+ Pipeline.PipelineState oldState)
throws IOException {
// null check is here to prevent the case where SCM store
// is closed but the staleNode handlers/pipleine creations
@@ -416,9 +416,9 @@ public class SCMPipelineManager implements PipelineManager {
try {
pipelineStore.put(pipelineId, getPipeline(pipelineId));
} catch (IOException ex) {
- LOG.info("Pipeline {} state update failed", pipelineId);
+ LOG.warn("Pipeline {} state update failed", pipelineId);
// revert back to old state in memory
- stateManager.updatePipelineState(pipelineId, state);
+ stateManager.updatePipelineState(pipelineId, oldState);
}
}
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index 2415b2b..10d978e 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -34,6 +34,7 @@ import
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.test.GenericTestUtils;
@@ -119,13 +120,13 @@ public class TestPipelineManagerImpl {
PipelineManagerV2Impl pipelineManager2 = createPipelineManager(true);
// Should be able to load previous pipelines.
- Assert.assertFalse(pipelineManager.getPipelines().isEmpty());
+ Assert.assertFalse(pipelineManager2.getPipelines().isEmpty());
Assert.assertEquals(2, pipelineManager.getPipelines().size());
- pipelineManager.allowPipelineCreation();
- Pipeline pipeline3 = pipelineManager.createPipeline(
+ pipelineManager2.allowPipelineCreation();
+ Pipeline pipeline3 = pipelineManager2.createPipeline(
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
- Assert.assertEquals(3, pipelineManager.getPipelines().size());
- Assert.assertTrue(pipelineManager.containsPipeline(pipeline3.getId()));
+ Assert.assertEquals(3, pipelineManager2.getPipelines().size());
+ Assert.assertTrue(pipelineManager2.containsPipeline(pipeline3.getId()));
pipelineManager2.close();
}
@@ -149,12 +150,16 @@ public class TestPipelineManagerImpl {
@Test
public void testUpdatePipelineStates() throws Exception {
PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+ Table<PipelineID, Pipeline> pipelineStore =
+ SCMDBDefinition.PIPELINES.getTable(dbStore);
pipelineManager.allowPipelineCreation();
Pipeline pipeline = pipelineManager.createPipeline(
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
Assert.assertEquals(1, pipelineManager.getPipelines().size());
Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId()));
Assert.assertEquals(ALLOCATED, pipeline.getPipelineState());
+ Assert.assertEquals(ALLOCATED,
+ pipelineStore.get(pipeline.getId()).getPipelineState());
PipelineID pipelineID = pipeline.getId();
pipelineManager.openPipeline(pipelineID);
@@ -163,10 +168,13 @@ public class TestPipelineManagerImpl {
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE,
Pipeline.PipelineState.OPEN).contains(pipeline));
+ Assert.assertTrue(pipelineStore.get(pipeline.getId()).isOpen());
pipelineManager.deactivatePipeline(pipeline.getId());
Assert.assertEquals(Pipeline.PipelineState.DORMANT,
pipelineManager.getPipeline(pipelineID).getPipelineState());
+ Assert.assertEquals(Pipeline.PipelineState.DORMANT,
+ pipelineStore.get(pipeline.getId()).getPipelineState());
Assert.assertFalse(pipelineManager
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE,
@@ -177,7 +185,7 @@ public class TestPipelineManagerImpl {
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE,
Pipeline.PipelineState.OPEN).contains(pipeline));
-
+ Assert.assertTrue(pipelineStore.get(pipeline.getId()).isOpen());
pipelineManager.close();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]