This is an automated email from the ASF dual-hosted git repository.

avijayan pushed a commit to branch HDDS-3698-nonrolling-upgrade
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3698-nonrolling-upgrade 
by this push:
     new c4b7e7b  HDDS-4828. SCM should go into "safe mode" until there is at 
least 1 pipeline to work with after finalization. (#2101)
c4b7e7b is described below

commit c4b7e7b9de9aec31478859968a1f1efa54ce21ac
Author: Ethan Rose <[email protected]>
AuthorDate: Fri Apr 9 17:44:04 2021 -0400

    HDDS-4828. SCM should go into "safe mode" until there is at least 1 
pipeline to work with after finalization. (#2101)
---
 .../common/statemachine/StateContext.java          |  3 +-
 .../states/endpoint/HeartbeatEndpointTask.java     |  4 ++
 .../hadoop/hdds/scm/node/SCMNodeManager.java       | 24 ++++---
 .../hdds/scm/pipeline/PipelineManagerV2Impl.java   |  9 +++
 .../hdds/scm/server/StorageContainerManager.java   | 21 ++++++
 .../hadoop/hdds/upgrade/TestHDDSUpgrade.java       | 76 +++++++++++++---------
 6 files changed, 97 insertions(+), 40 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 3051638..d45fef2 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -608,7 +608,7 @@ public class StateContext {
    * termOfLeaderSCM with the max term found in commandQueue.
    *
    * The init process also works for non-HA mode. In that case, term of all
-   * SCMCommands will be 0.
+   * SCMCommands will be SCMContext.INVALID_TERM.
    */
   private void initTermOfLeaderSCM() {
     // only init once
@@ -653,7 +653,6 @@ public class StateContext {
       LOG.error("should init termOfLeaderSCM before update it.");
       return;
     }
-
     termOfLeaderSCM = Optional.of(
         Long.max(termOfLeaderSCM.get(), command.getTerm()));
   }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index d3ffba8..a381c1b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -400,6 +400,10 @@ public class HeartbeatEndpointTask
         FinalizeNewLayoutVersionCommand finalizeNewLayoutVersionCommand =
             FinalizeNewLayoutVersionCommand.getFromProtobuf(
                 
commandResponseProto.getFinalizeNewLayoutVersionCommandProto());
+        if (commandResponseProto.hasTerm()) {
+          finalizeNewLayoutVersionCommand.setTerm(
+              commandResponseProto.getTerm());
+        }
         if (LOG.isDebugEnabled()) {
           LOG.debug("Received SCM finalize command {}",
               finalizeNewLayoutVersionCommand.getId());
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 2e10245..fced3c3 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -578,14 +578,22 @@ public class SCMNodeManager implements NodeManager {
               "MetadataLayoutVersion = {}",
           datanodeDetails.getHostName(), dnMlv, scmMlv);
 
-      // Send Finalize command to the data node. Its OK to
-      // send Finalize command multiple times.
-      scmNodeEventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
-          new CommandForDatanode<>(datanodeDetails.getUuid(),
-              new FinalizeNewLayoutVersionCommand(true,
-                  LayoutVersionProto.newBuilder()
-                      .setSoftwareLayoutVersion(dnSlv)
-                      .setMetadataLayoutVersion(dnSlv).build())));
+      FinalizeNewLayoutVersionCommand finalizeCmd =
+          new FinalizeNewLayoutVersionCommand(true,
+          LayoutVersionProto.newBuilder()
+              .setSoftwareLayoutVersion(dnSlv)
+              .setMetadataLayoutVersion(dnSlv).build());
+      try {
+        finalizeCmd.setTerm(scmContext.getTermOfLeader());
+
+        // Send Finalize command to the data node. Its OK to
+        // send Finalize command multiple times.
+        scmNodeEventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
+            new CommandForDatanode<>(datanodeDetails.getUuid(), finalizeCmd));
+      } catch(NotLeaderException ex) {
+        LOG.warn("Skip sending finalize upgrade command since current SCM is" +
+            "not leader.", ex);
+      }
     }
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
index 752aa9f..05a130e 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -105,6 +105,7 @@ public class PipelineManagerV2Impl implements 
PipelineManager {
         HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL,
         HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT,
         TimeUnit.MILLISECONDS);
+    this.freezePipelineCreation = new AtomicBoolean();
   }
 
   public static PipelineManagerV2Impl newPipelineManager(
@@ -151,6 +152,14 @@ public class PipelineManagerV2Impl implements 
PipelineManager {
       throw new IOException("Pipeline creation is not allowed as safe mode " +
           "prechecks have not yet passed");
     }
+
+    if (freezePipelineCreation.get()) {
+      String message = "Cannot create new pipelines while pipeline creation " +
+          "is frozen.";
+      LOG.info(message);
+      throw new IOException(message);
+    }
+
     lock.lock();
     try {
       Pipeline pipeline = pipelineFactory.create(type, factor);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index a3252c8..f6d4eb9 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -1745,6 +1745,27 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
      * find enough Healthy data nodes.
      */
     pipelineManager.resumePipelineCreation();
+
+    // Wait for at least one pipeline to be created before finishing
+    // finalization, so clients can write.
+    boolean hasPipeline = false;
+    while (!hasPipeline) {
+      int pipelineCount = pipelineManager.getPipelines(
+          HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
+          Pipeline.PipelineState.OPEN).size();
+
+      hasPipeline = (pipelineCount >= 1);
+      if (!hasPipeline) {
+        LOG.info("Waiting for at least one pipeline after SCM finalization.");
+        try {
+          Thread.sleep(5000);
+        } catch (InterruptedException e) {
+          // Try again on next loop iteration.
+        }
+      } else {
+        LOG.info("Pipeline found after SCM finalization");
+      }
+    }
   }
 
   public StatusAndMessages finalizeUpgrade(String upgradeClientID)
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHDDSUpgrade.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHDDSUpgrade.java
index 08977f6..b5543d1 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHDDSUpgrade.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHDDSUpgrade.java
@@ -22,7 +22,6 @@ import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVA
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.QUASI_CLOSED;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
-import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY_READONLY;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
@@ -32,11 +31,16 @@ import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATI
 import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.STARTING_FINALIZATION;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -54,6 +58,7 @@ import 
org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
@@ -62,7 +67,6 @@ import org.apache.hadoop.test.LambdaTestUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
@@ -102,6 +106,7 @@ public class TestHDDSUpgrade {
     conf = new OzoneConfiguration();
     conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 1000,
             TimeUnit.MILLISECONDS);
+
     conf.set(OZONE_DATANODE_PIPELINE_LIMIT, "1");
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(NUM_DATA_NODES)
@@ -142,6 +147,18 @@ public class TestHDDSUpgrade {
     Assert.assertEquals(scmVersionManager.getSoftwareLayoutVersion(),
         scmVersionManager.getMetadataLayoutVersion());
     Assert.assertTrue(scmVersionManager.getMetadataLayoutVersion() >= 1);
+
+    // SCM should not return from finalization until there is at least one
+    // pipeline to use.
+    int pipelineCount = scmPipelineManager.getPipelines(RATIS, THREE, OPEN)
+        .size();
+    Assert.assertTrue(pipelineCount >= 1);
+
+    // SCM will not return from finalization until there is at least one
+    // RATIS 3 pipeline. For this to exist, all three of our datanodes must
+    // be in the HEALTHY state.
+    testDataNodesStateOnSCM(HEALTHY);
+
     int countContainers = 0;
     for (ContainerInfo ci : scmContainerManager.getContainers()) {
       HddsProtos.LifeCycleState ciState = ci.getState();
@@ -218,17 +235,6 @@ public class TestHDDSUpgrade {
     Assert.assertTrue(countContainers >= 1);
   }
 
-  private void testPostUpgradePipelineCreation() throws IOException {
-    ratisPipeline1 = scmPipelineManager.createPipeline(RATIS, THREE);
-    scmPipelineManager.openPipeline(ratisPipeline1.getId());
-    Assert.assertEquals(0,
-        scmPipelineManager.getNumberOfContainers(ratisPipeline1.getId()));
-    PipelineID pid = scmContainerManager.allocateContainer(RATIS, THREE,
-        "Owner1").getPipelineID();
-    Assert.assertEquals(1, scmPipelineManager.getNumberOfContainers(pid));
-    Assert.assertEquals(pid, ratisPipeline1.getId());
-  }
-
   private void testDataNodesStateOnSCM(NodeState state) {
     int countNodes = 0;
     for (DatanodeDetails dn : scm.getScmNodeManager().getAllNodes()){
@@ -252,7 +258,6 @@ public class TestHDDSUpgrade {
     });
   }
 
-  @Ignore("Needs PipelineManager logic refactor after SCM HA merge.")
   @Test
   public void testFinalizationFromInitialVersionToLatestVersion()
       throws Exception {
@@ -275,6 +280,12 @@ public class TestHDDSUpgrade {
     testPreUpgradeConditionsSCM();
     testPreUpgradeConditionsDataNodes();
 
+    Set<PipelineID> preUpgradeOpenPipelines =
+        scmPipelineManager.getPipelines(RATIS, THREE, OPEN)
+            .stream()
+            .map(Pipeline::getId)
+            .collect(Collectors.toSet());
+
     // Trigger Finalization on the SCM
     StatusAndMessages status = scm.finalizeUpgrade("xyz");
     Assert.assertEquals(STARTING_FINALIZATION, status.status());
@@ -284,27 +295,32 @@ public class TestHDDSUpgrade {
       status = scm.queryUpgradeFinalizationProgress("xyz", false);
     }
 
+    Set<PipelineID> postUpgradeOpenPipelines =
+        scmPipelineManager.getPipelines(RATIS, THREE, OPEN)
+            .stream()
+            .map(Pipeline::getId)
+            .collect(Collectors.toSet());
+
+    // No pipelines from before the upgrade should still be open after the
+    // upgrade.
+    long numPreUpgradeOpenPipelines = preUpgradeOpenPipelines
+        .stream()
+        .filter(postUpgradeOpenPipelines::contains)
+        .count();
+    Assert.assertEquals(0, numPreUpgradeOpenPipelines);
+
     // Verify Post-Upgrade conditions on the SCM.
     testPostUpgradeConditionsSCM();
 
-    // All datanodes on the SCM should have moved to HEALTHY-READONLY state.
-    testDataNodesStateOnSCM(HEALTHY_READONLY);
-
     // Verify the SCM has driven all the DataNodes through Layout Upgrade.
     testPostUpgradeConditionsDataNodes();
 
-    // Need to wait for post finalization heartbeat from DNs.
-    LambdaTestUtils.await(30000, 5000, () -> {
-      try {
-        testDataNodesStateOnSCM(HEALTHY);
-      } catch (Throwable ex) {
-        LOG.info(ex.getMessage());
-        return false;
-      }
-      return true;
-    });
-
-    // Verify that new pipeline can be created with upgraded datanodes.
-    testPostUpgradePipelineCreation();
+    // Test that we can use a pipeline after upgrade.
+    // Will fail with exception if there are no pipelines.
+    ObjectStore store = cluster.getClient().getObjectStore();
+    store.createVolume("vol1");
+    store.getVolume("vol1").createBucket("buc1");
+    store.getVolume("vol1").getBucket("buc1").createKey("key1", 100,
+        ReplicationType.RATIS, ReplicationFactor.THREE, new HashMap<>());
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to