This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch HDDS-1868
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit b6c45f45483027764b1585e34fd160f9c24a1f2d
Author: Siddharth Wagle <swa...@hortonworks.com>
AuthorDate: Mon Oct 7 10:55:44 2019 -0700

    HDDS-1868. Ozone pipelines should be marked as ready only after the leader 
election is complete.
---
 .../apache/hadoop/hdds/scm/pipeline/Pipeline.java  | 30 +++++++-
 .../common/report/PipelineReportPublisher.java     |  2 +-
 .../container/common/report/ReportManager.java     | 46 +++++++-----
 .../common/statemachine/DatanodeStateMachine.java  |  4 +
 .../server/ratis/ContainerStateMachine.java        |  7 ++
 .../transport/server/ratis/XceiverServerRatis.java | 23 ++++++
 .../proto/StorageContainerDatanodeProtocol.proto   |  1 +
 .../container/common/report/TestReportManager.java | 15 +++-
 hadoop-hdds/pom.xml                                |  2 +-
 .../hdds/scm/pipeline/PipelineReportHandler.java   | 50 ++++++++-----
 .../hdds/scm/pipeline/PipelineStateManager.java    | 15 ++--
 .../hdds/scm/pipeline/RatisPipelineProvider.java   | 20 ++---
 .../scm/pipeline/MockRatisPipelineProvider.java    | 15 ++++
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  | 85 ++++++++++++++++++++--
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  | 48 ++++++++++--
 hadoop-ozone/pom.xml                               |  2 +-
 16 files changed, 294 insertions(+), 71 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index c62d977..d8c7267 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -19,12 +19,15 @@
 package org.apache.hadoop.hdds.scm.pipeline;
 
 import com.google.common.base.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,8 +44,7 @@ import java.util.stream.Collectors;
  */
 public final class Pipeline {
 
-  private static final Logger LOG = LoggerFactory
-      .getLogger(Pipeline.class);
+  private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);
   private final PipelineID id;
   private final ReplicationType type;
   private final ReplicationFactor factor;
@@ -51,6 +53,8 @@ public final class Pipeline {
   private Map<DatanodeDetails, Long> nodeStatus;
   // nodes with ordered distance to client
   private ThreadLocal<List<DatanodeDetails>> nodesInOrder = new 
ThreadLocal<>();
+  // Current reported Leader for the pipeline
+  private RaftPeerId leaderId;
 
   /**
    * The immutable properties of pipeline object is used in
@@ -103,6 +107,17 @@ public final class Pipeline {
     return state;
   }
 
+  public RaftPeerId getLeaderId() {
+    return leaderId;
+  }
+
+  /**
+   * Pipeline object, outside of letting leader id to be set, is immutable.
+   */
+  void setLeaderId(RaftPeerId leaderId) {
+    this.leaderId = leaderId;
+  }
+
   /**
    * Returns the list of nodes which form this pipeline.
    *
@@ -175,7 +190,7 @@ public final class Pipeline {
         .setType(type)
         .setFactor(factor)
         .setState(PipelineState.getProtobuf(state))
-        .setLeaderID("")
+        .setLeaderID(leaderId != null ? leaderId.toString() : "")
         .addAllMembers(nodeStatus.keySet().stream()
             .map(DatanodeDetails::getProtoBufMessage)
             .collect(Collectors.toList()));
@@ -207,6 +222,8 @@ public final class Pipeline {
         .setFactor(pipeline.getFactor())
         .setType(pipeline.getType())
         .setState(PipelineState.fromProtobuf(pipeline.getState()))
+        .setLeaderId(StringUtils.isNotEmpty(pipeline.getLeaderID()) ?
+            RaftPeerId.valueOf(pipeline.getLeaderID()) : null)
         .setNodes(pipeline.getMembersList().stream()
             
.map(DatanodeDetails::getFromProtoBuf).collect(Collectors.toList()))
         .setNodesInOrder(pipeline.getMemberOrdersList())
@@ -275,6 +292,7 @@ public final class Pipeline {
     private Map<DatanodeDetails, Long> nodeStatus = null;
     private List<Integer> nodeOrder = null;
     private List<DatanodeDetails> nodesInOrder = null;
+    private RaftPeerId leaderId = null;
 
     public Builder() {}
 
@@ -307,6 +325,11 @@ public final class Pipeline {
       return this;
     }
 
+    public Builder setLeaderId(RaftPeerId leaderId1) {
+      this.leaderId = leaderId1;
+      return this;
+    }
+
     public Builder setNodes(List<DatanodeDetails> nodes) {
       this.nodeStatus = new LinkedHashMap<>();
       nodes.forEach(node -> nodeStatus.put(node, -1L));
@@ -325,6 +348,7 @@ public final class Pipeline {
       Preconditions.checkNotNull(state);
       Preconditions.checkNotNull(nodeStatus);
       Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus);
+      pipeline.setLeaderId(leaderId);
 
       if (nodeOrder != null && !nodeOrder.isEmpty()) {
         // This branch is for build from ProtoBuf
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java
index e7f4347..5bd3b58 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java
@@ -33,7 +33,7 @@ import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVA
  * Publishes Pipeline which will be sent to SCM as part of heartbeat.
  * PipelineReport consist of the following information about each containers:
  *   - pipelineID
- *
+ *   - LeaderID
  */
 public class PipelineReportPublisher extends
     ReportPublisher<PipelineReportsProto> {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java
index 536d4cc..e0cc0fd 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java
@@ -27,7 +27,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -41,7 +43,8 @@ public final class ReportManager {
       LoggerFactory.getLogger(ReportManager.class);
 
   private final StateContext context;
-  private final List<ReportPublisher> publishers;
+  private final Map<Class<? extends GeneratedMessage>, ReportPublisher>
+      publishers;
   private final ScheduledExecutorService executorService;
 
   /**
@@ -52,7 +55,7 @@ public final class ReportManager {
    * @param publishers List of publishers which generates report
    */
   private ReportManager(StateContext context,
-                        List<ReportPublisher> publishers) {
+        Map<Class<? extends GeneratedMessage>, ReportPublisher> publishers) {
     this.context = context;
     this.publishers = publishers;
     this.executorService = HadoopExecutors.newScheduledThreadPool(
@@ -66,11 +69,16 @@ public final class ReportManager {
    * report publishers.
    */
   public void init() {
-    for (ReportPublisher publisher : publishers) {
+    for (ReportPublisher publisher : publishers.values()) {
       publisher.init(context, executorService);
     }
   }
 
+  public ReportPublisher getReportPublisher(
+      Class<? extends GeneratedMessage> publisherClass) {
+    return publishers.get(publisherClass);
+  }
+
   /**
    * Shutdown the ReportManager.
    */
@@ -94,20 +102,33 @@ public final class ReportManager {
   }
 
   /**
+   * Test friendly builder
+   */
+  public static Builder newBuilder(ReportPublisherFactory publisherFactory) {
+    return new Builder(publisherFactory);
+  }
+
+  /**
    * Builder to construct {@link ReportManager}.
    */
   public static final class Builder {
 
     private StateContext stateContext;
-    private List<ReportPublisher> reportPublishers;
+    private Map<Class<? extends GeneratedMessage>, ReportPublisher>
+        reportPublishers;
     private ReportPublisherFactory publisherFactory;
 
 
     private Builder(Configuration conf) {
-      this.reportPublishers = new ArrayList<>();
+      this.reportPublishers = new HashMap<>();
       this.publisherFactory = new ReportPublisherFactory(conf);
     }
 
+    private Builder(ReportPublisherFactory publisherFactory) {
+      this.reportPublishers = new HashMap<>();
+      this.publisherFactory = publisherFactory;
+    }
+
     /**
      * Sets the {@link StateContext}.
      *
@@ -128,19 +149,7 @@ public final class ReportManager {
      * @return ReportManager.Builder
      */
     public Builder addPublisherFor(Class<? extends GeneratedMessage> report) {
-      reportPublishers.add(publisherFactory.getPublisherFor(report));
-      return this;
-    }
-
-    /**
-     * Adds new ReportPublisher to the ReportManager.
-     *
-     * @param publisher ReportPublisher
-     *
-     * @return ReportManager.Builder
-     */
-    public Builder addPublisher(ReportPublisher publisher) {
-      reportPublishers.add(publisher);
+      reportPublishers.put(report, publisherFactory.getPublisherFor(report));
       return this;
     }
 
@@ -153,6 +162,5 @@ public final class ReportManager {
       Preconditions.checkNotNull(stateContext);
       return new ReportManager(stateContext, reportPublishers);
     }
-
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index c9eb702..891bbb3 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -486,4 +486,8 @@ public class DatanodeStateMachine implements Closeable {
   public ReplicationSupervisor getSupervisor() {
     return supervisor;
   }
+
+  public ReportManager getReportManager() {
+    return reportManager;
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index b89ec73..493b3bd 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -37,6 +37,8 @@ import 
org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.StateMachineException;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.impl.RaftServerProxy;
@@ -868,4 +870,9 @@ public class ContainerStateMachine extends BaseStateMachine 
{
       executor.shutdown();
     }
   }
+
+  @Override
+  public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId 
raftPeerId) {
+    ratisServer.handleLeaderChangedNotification(groupMemberId, raftPeerId);
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 80e91cd..2407429 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -19,11 +19,14 @@
 package org.apache.hadoop.ozone.container.common.transport.server.ratis;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
@@ -66,8 +69,10 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Collections;
 import java.util.Set;
@@ -107,6 +112,8 @@ public final class XceiverServerRatis extends XceiverServer 
{
   // TODO: Remove the gids set when Ratis supports an api to query active
   // pipelines
   private final Set<RaftGroupId> raftGids = new HashSet<>();
+  // pipeline leaders
+  private Map<RaftGroupId, RaftPeerId> leaderIdMap = new HashMap<>();
 
   @SuppressWarnings("parameternumber")
   private XceiverServerRatis(DatanodeDetails dd, int port,
@@ -598,6 +605,9 @@ public final class XceiverServerRatis extends XceiverServer 
{
       for (RaftGroupId groupId : gids) {
         reports.add(PipelineReport.newBuilder()
             .setPipelineID(PipelineID.valueOf(groupId.getUuid()).getProtobuf())
+            .setLeaderID(leaderIdMap.containsKey(groupId) ?
+                ByteString.copyFromUtf8(leaderIdMap.get(groupId).toString()) :
+                ByteString.EMPTY)
             .build());
       }
       return reports;
@@ -686,4 +696,17 @@ public final class XceiverServerRatis extends 
XceiverServer {
   void notifyGroupAdd(RaftGroupId gid) {
     raftGids.add(gid);
   }
+
+  void handleLeaderChangedNotification(RaftGroupMemberId groupMemberId,
+                                       RaftPeerId raftPeerId) {
+    LOG.info("Leader change notification received for group: {} with new " +
+        "leaderId: {}", groupMemberId.getGroupId(), raftPeerId);
+    // Save the reported leader to be sent with the report to SCM
+    leaderIdMap.put(groupMemberId.getGroupId(), raftPeerId);
+    // Publish new reports with leaderID
+    context.getParent().getReportManager().getReportPublisher(
+        PipelineReportsProto.class).run();
+    // Trigger HB immediately
+    context.getParent().triggerHeartbeat();
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
 
b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index a975cd5..3699860 100644
--- 
a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ 
b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -239,6 +239,7 @@ message ContainerAction {
 
 message PipelineReport {
   required PipelineID pipelineID = 1;
+  optional bytes leaderID = 2;
 }
 
 message PipelineReportsProto {
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportManager.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportManager.java
index aae388d..b15f8d2 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportManager.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportManager.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.junit.Test;
+import org.mockito.Mock;
 import org.mockito.Mockito;
 
 import java.util.concurrent.ScheduledExecutorService;
@@ -30,6 +31,8 @@ import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import com.google.protobuf.GeneratedMessage;
+
 /**
  * Test cases to test {@link ReportManager}.
  */
@@ -40,13 +43,21 @@ public class TestReportManager {
     Configuration conf = new OzoneConfiguration();
     StateContext dummyContext = Mockito.mock(StateContext.class);
     ReportPublisher dummyPublisher = Mockito.mock(ReportPublisher.class);
-    ReportManager.Builder builder = ReportManager.newBuilder(conf);
+    ReportPublisherFactory publisherFactory = Mockito.mock(
+        ReportPublisherFactory.class);
+    Mockito.when(publisherFactory.getPublisherFor(any())).thenReturn(
+        dummyPublisher);
+    ReportManager.Builder builder = ReportManager.newBuilder(publisherFactory);
     builder.setStateContext(dummyContext);
-    builder.addPublisher(dummyPublisher);
+    GeneratedMessage generatedMessageMock = Mockito.mock(
+        GeneratedMessage.class, Mockito.RETURNS_DEEP_STUBS);
+    builder.addPublisherFor(generatedMessageMock.getClass());
     ReportManager reportManager = builder.build();
     reportManager.init();
     verify(dummyPublisher, times(1)).init(eq(dummyContext),
         any(ScheduledExecutorService.class));
 
   }
+
+
 }
diff --git a/hadoop-hdds/pom.xml b/hadoop-hdds/pom.xml
index a174337..93df2d1 100644
--- a/hadoop-hdds/pom.xml
+++ b/hadoop-hdds/pom.xml
@@ -46,7 +46,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <hdds.version>0.5.0-SNAPSHOT</hdds.version>
 
     <!-- Apache Ratis version -->
-    <ratis.version>0.5.0-201fc85-SNAPSHOT</ratis.version>
+    <ratis.version>0.5.0-63cd2fb-SNAPSHOT</ratis.version>
 
     <bouncycastle.version>1.60</bouncycastle.version>
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
index 793f4e2..10d40d9 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
@@ -18,25 +18,28 @@
 
 package org.apache.hadoop.hdds.scm.pipeline;
 
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.server
-    .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
+import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.Objects;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
 
 /**
  * Handles Pipeline Reports from datanode.
@@ -50,6 +53,8 @@ public class PipelineReportHandler implements
   private final Configuration conf;
   private final SCMSafeModeManager scmSafeModeManager;
   private final boolean pipelineAvailabilityCheck;
+  private Map<PipelineID, Map<UUID, ByteString>>
+      reportedLeadersForPipeline = new HashMap<>();
 
   public PipelineReportHandler(SCMSafeModeManager scmSafeModeManager,
       PipelineManager pipelineManager,
@@ -72,8 +77,8 @@ public class PipelineReportHandler implements
     DatanodeDetails dn = pipelineReportFromDatanode.getDatanodeDetails();
     PipelineReportsProto pipelineReport =
         pipelineReportFromDatanode.getReport();
-    Preconditions.checkNotNull(dn, "Pipeline Report is "
-        + "missing DatanodeDetails.");
+    Preconditions.checkNotNull(dn, "Pipeline Report is " +
+        "missing DatanodeDetails.");
     if (LOGGER.isTraceEnabled()) {
       LOGGER.trace("Processing pipeline report for dn: {}", dn);
     }
@@ -89,7 +94,6 @@ public class PipelineReportHandler implements
       publisher.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
           pipelineReportFromDatanode);
     }
-
   }
 
   private void processPipelineReport(PipelineReport report, DatanodeDetails dn)
@@ -104,12 +108,24 @@ public class PipelineReportHandler implements
       return;
     }
 
+    if (report.hasLeaderID()) {
+      Map<UUID, ByteString> ids =
+          reportedLeadersForPipeline.computeIfAbsent(pipelineID,
+              k -> new HashMap<>());
+      ids.put(dn.getUuid(), report.getLeaderID());
+    }
+
     if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
-      LOGGER.info("Pipeline {} reported by {}", pipeline.getId(), dn);
-      pipeline.reportDatanode(dn);
-      if (pipeline.isHealthy()) {
-        // if all the dns have reported, pipeline can be moved to OPEN state
+      LOGGER.info("Pipeline {} reported by {} with leaderId {}",
+          pipeline.getId(), dn, report.getLeaderID().toStringUtf8());
+      Map<UUID, ByteString> leaderIdPairs =
+          reportedLeadersForPipeline.get(pipelineID);
+      if (leaderIdPairs.size() == pipeline.getFactor().getNumber() &&
+          leaderIdPairs.values().stream().distinct().count() == 1) {
+        // All datanodes reported same leader
         pipelineManager.openPipeline(pipelineID);
+        pipeline.setLeaderId(
+            RaftPeerId.valueOf(report.getLeaderID().toString()));
       }
     } else {
       // In OPEN state case just report the datanode
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
index 7615057..9033b37 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
@@ -32,6 +32,8 @@ import java.util.Collection;
 import java.util.List;
 import java.util.NavigableSet;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Manages the state of pipelines in SCM. All write operations like pipeline
  * creation, removal and updates should come via SCMPipelineManager.
@@ -52,9 +54,7 @@ class PipelineStateManager {
 
   void addPipeline(Pipeline pipeline) throws IOException {
     pipelineStateMap.addPipeline(pipeline);
-    if (pipeline.getPipelineState() == PipelineState.OPEN) {
-      LOG.info("Created pipeline " + pipeline);
-    }
+    LOG.info("Created pipeline " + pipeline);
   }
 
   void addContainerToPipeline(PipelineID pipelineId, ContainerID containerID)
@@ -131,8 +131,8 @@ class PipelineStateManager {
       throw new IOException("Closed pipeline can not be opened");
     }
     if (pipeline.getPipelineState() == PipelineState.ALLOCATED) {
-      pipeline = pipelineStateMap
-          .updatePipelineState(pipelineId, PipelineState.OPEN);
+      pipeline = pipelineStateMap.updatePipelineState(
+          pipelineId, PipelineState.OPEN);
       LOG.info("Pipeline {} moved to OPEN state", pipeline.toString());
     }
     return pipeline;
@@ -161,4 +161,9 @@ class PipelineStateManager {
     pipelineStateMap
         .updatePipelineState(pipelineID, PipelineState.DORMANT);
   }
+
+  @VisibleForTesting
+  PipelineStateMap getPipelineStateMap() {
+    return pipelineStateMap;
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 0324a58..c5dca2e 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -58,6 +58,8 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Implements Api for creating ratis pipelines.
  */
@@ -153,30 +155,23 @@ public class RatisPipelineProvider implements 
PipelineProvider {
       throw new InsufficientDatanodesException(e);
     }
 
-    Pipeline pipeline = Pipeline.newBuilder()
-        .setId(PipelineID.randomId())
-        .setState(PipelineState.OPEN)
-        .setType(ReplicationType.RATIS)
-        .setFactor(factor)
-        .setNodes(dns)
-        .build();
+    Pipeline pipeline = create(factor, dns);
     initializePipeline(pipeline);
     return pipeline;
   }
 
   @Override
   public Pipeline create(ReplicationFactor factor,
-      List<DatanodeDetails> nodes) {
+                         List<DatanodeDetails> nodes) {
     return Pipeline.newBuilder()
         .setId(PipelineID.randomId())
-        .setState(PipelineState.OPEN)
+        .setState(PipelineState.ALLOCATED)
         .setType(ReplicationType.RATIS)
         .setFactor(factor)
         .setNodes(nodes)
         .build();
   }
 
-
   @Override
   public void shutdown() {
     forkJoinPool.shutdownNow();
@@ -253,4 +248,9 @@ public class RatisPipelineProvider implements 
PipelineProvider {
       throw MultipleIOException.createIOException(exceptions);
     }
   }
+
+  @VisibleForTesting
+  PipelineStateManager getStateManager() {
+    return stateManager;
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
index 01c53ba..a563ead 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
@@ -19,9 +19,13 @@
 package org.apache.hadoop.hdds.scm.pipeline;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 
 /**
  * Mock Ratis Pipeline Provider for Mock Nodes.
@@ -42,4 +46,15 @@ public class MockRatisPipelineProvider extends 
RatisPipelineProvider {
   public void shutdown() {
     // Do nothing.
   }
+
+  @Override
+  public Pipeline create(HddsProtos.ReplicationFactor factor, 
List<DatanodeDetails> nodes) {
+    return Pipeline.newBuilder()
+        .setId(PipelineID.randomId())
+        .setState(Pipeline.PipelineState.OPEN)
+        .setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(factor)
+        .setNodes(nodes)
+        .build();
+  }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 2a486b1..7fcd1c1 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -21,16 +21,25 @@ package org.apache.hadoop.hdds.scm.pipeline;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.TestUtils;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
@@ -40,12 +49,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import com.google.protobuf.ByteString;
 
 /**
  * Test cases to verify PipelineManager.
@@ -314,4 +318,71 @@ public class TestSCMPipelineManager {
 
     pipelineManager.close();
   }
+
+  @Test
+  public void testPipelineOpenOnlyWhenLeaderReported() throws Exception {
+    EventQueue eventQueue = new EventQueue();
+    SCMPipelineManager pipelineManager =
+        new SCMPipelineManager(conf, nodeManager, eventQueue, null);
+    PipelineProvider mockRatisProvider =
+        new MockRatisPipelineProvider(nodeManager,
+            pipelineManager.getStateManager(), conf);
+    pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
+        mockRatisProvider);
+    Pipeline pipeline = pipelineManager
+        .createPipeline(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE);
+    // close manager
+    pipelineManager.close();
+    // new pipeline manager loads the pipelines from the db in ALLOCATED state
+    pipelineManager =
+        new SCMPipelineManager(conf, nodeManager, eventQueue, null);
+    mockRatisProvider =
+        new MockRatisPipelineProvider(nodeManager,
+            pipelineManager.getStateManager(), conf);
+    pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
+        mockRatisProvider);
+    Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
+        pipelineManager.getPipeline(pipeline.getId()).getPipelineState());
+
+    SCMSafeModeManager scmSafeModeManager =
+        new SCMSafeModeManager(new OzoneConfiguration(),
+            new ArrayList<>(), pipelineManager, eventQueue);
+    PipelineReportHandler pipelineReportHandler =
+        new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf);
+
+    // Report pipelines with leaders
+    List<DatanodeDetails> nodes = pipeline.getNodes();
+    Assert.assertEquals(3, nodes.size());
+    // Send leader for only first 2 dns
+    nodes.subList(0 ,2).forEach(dn ->
+        sendPipelineReport(dn, pipeline, pipelineReportHandler, true));
+    sendPipelineReport(nodes.get(2), pipeline, pipelineReportHandler, false);
+
+    Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
+        pipelineManager.getPipeline(pipeline.getId()).getPipelineState());
+
+    nodes.forEach(dn ->
+        sendPipelineReport(dn, pipeline, pipelineReportHandler, true));
+
+    Assert.assertEquals(Pipeline.PipelineState.OPEN,
+        pipelineManager.getPipeline(pipeline.getId()).getPipelineState());
+  }
+
+  private void sendPipelineReport(DatanodeDetails dn,
+      Pipeline pipeline, PipelineReportHandler pipelineReportHandler,
+      boolean sendLeaderId) {
+
+    PipelineReportsProto.Builder reportProtoBuilder =
+        PipelineReportsProto.newBuilder();
+    PipelineReport.Builder reportBuilder = PipelineReport.newBuilder();
+    reportBuilder.setPipelineID(pipeline.getId().getProtobuf());
+    if (sendLeaderId) {
+      reportBuilder.setLeaderID(ByteString.copyFromUtf8("raftPeer-1"));
+    }
+
+    pipelineReportHandler.onMessage(new PipelineReportFromDatanode(dn,
+        reportProtoBuilder.addPipelineReport(
+            reportBuilder.build()).build()), new EventQueue());
+  }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index ac76482..0f2e209 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -64,6 +66,7 @@ import java.nio.file.Paths;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
@@ -95,6 +98,8 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster 
{
 
   // Timeout for the cluster to be ready
   private int waitForClusterToBeReadyTimeout = 60000; // 1 min
+  // Timeout for all/any pipelines to be in open state
+  private int waitForPipelineOpenTimeout = 60000;
   private CertificateClient caClient;
 
   /**
@@ -143,10 +148,31 @@ public class MiniOzoneClusterImpl implements 
MiniOzoneCluster {
       throws TimeoutException, InterruptedException {
     GenericTestUtils.waitFor(() -> {
       final int healthy = scm.getNodeCount(HEALTHY);
-      final boolean isReady = healthy == hddsDatanodes.size();
-      LOG.info("{}. Got {} of {} DN Heartbeats.",
-          isReady? "Cluster is ready" : "Waiting for cluster to be ready",
-          healthy, hddsDatanodes.size());
+      boolean isReady = healthy == hddsDatanodes.size();
+      boolean printIsReadyMsg = true;
+      List<Pipeline> pipelines = scm.getPipelineManager().getPipelines();
+      if (!pipelines.isEmpty()) {
+        List<Pipeline> raftPipelines = pipelines.stream().filter(p ->
+            p.getType() == HddsProtos.ReplicationType.RATIS).collect(
+                Collectors.toList());
+        if (!raftPipelines.isEmpty()) {
+          List<Pipeline> notOpenPipelines = raftPipelines.stream().filter(p ->
+              p.getPipelineState() != Pipeline.PipelineState.OPEN &&
+                  p.getPipelineState() != Pipeline.PipelineState.CLOSED)
+              .collect(Collectors.toList());
+          if (notOpenPipelines.size() > 0) {
+            LOG.info("Waiting for {} number of pipelines out of {}, to report "
+                + "a leader.", notOpenPipelines.size(), raftPipelines.size());
+            isReady = false;
+            printIsReadyMsg = false;
+          }
+        }
+      }
+      if (printIsReadyMsg) {
+        LOG.info("{}. Got {} of {} DN Heartbeats.",
+            isReady ? "Cluster is ready" : "Waiting for cluster to be ready",
+            healthy, hddsDatanodes.size());
+      }
       return isReady;
     }, 1000, waitForClusterToBeReadyTimeout);
   }
@@ -260,6 +286,18 @@ public class MiniOzoneClusterImpl implements 
MiniOzoneCluster {
     ozoneManager.restart();
   }
 
+  private void waitForHddsDatanodesStop() throws TimeoutException,
+      InterruptedException {
+    GenericTestUtils.waitFor(() -> {
+      final int healthy = scm.getNodeCount(HEALTHY);
+      boolean isReady = healthy == hddsDatanodes.size();
+      if (!isReady) {
+        LOG.info("Waiting on {} datanodes to be marked unhealthy.", healthy);
+      }
+      return isReady;
+    }, 1000, waitForClusterToBeReadyTimeout);
+  }
+
   @Override
   public void restartHddsDatanode(int i, boolean waitForDatanode)
       throws InterruptedException, TimeoutException {
@@ -279,7 +317,7 @@ public class MiniOzoneClusterImpl implements 
MiniOzoneCluster {
     hddsDatanodes.remove(i);
     if (waitForDatanode) {
       // wait for node to be removed from SCM healthy node list.
-      waitForClusterToBeReady();
+      waitForHddsDatanodesStop();
     }
     String[] args = new String[]{};
     HddsDatanodeService service =
diff --git a/hadoop-ozone/pom.xml b/hadoop-ozone/pom.xml
index 825e65c..da2f0de 100644
--- a/hadoop-ozone/pom.xml
+++ b/hadoop-ozone/pom.xml
@@ -28,7 +28,7 @@
   <properties>
     <hdds.version>0.5.0-SNAPSHOT</hdds.version>
     <ozone.version>0.5.0-SNAPSHOT</ozone.version>
-    <ratis.version>0.5.0-201fc85-SNAPSHOT</ratis.version>
+    <ratis.version>0.5.0-63cd2fb-SNAPSHOT</ratis.version>
     <bouncycastle.version>1.60</bouncycastle.version>
     <ozone.release>Crater Lake</ozone.release>
     <declared.ozone.version>${ozone.version}</declared.ozone.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-commits-h...@hadoop.apache.org

Reply via email to