This is an automated email from the ASF dual-hosted git repository. elek pushed a commit to branch HDDS-1868 in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit b6c45f45483027764b1585e34fd160f9c24a1f2d Author: Siddharth Wagle <swa...@hortonworks.com> AuthorDate: Mon Oct 7 10:55:44 2019 -0700 HDDS-1868. Ozone pipelines should be marked as ready only after the leader election is complete. --- .../apache/hadoop/hdds/scm/pipeline/Pipeline.java | 30 +++++++- .../common/report/PipelineReportPublisher.java | 2 +- .../container/common/report/ReportManager.java | 46 +++++++----- .../common/statemachine/DatanodeStateMachine.java | 4 + .../server/ratis/ContainerStateMachine.java | 7 ++ .../transport/server/ratis/XceiverServerRatis.java | 23 ++++++ .../proto/StorageContainerDatanodeProtocol.proto | 1 + .../container/common/report/TestReportManager.java | 15 +++- hadoop-hdds/pom.xml | 2 +- .../hdds/scm/pipeline/PipelineReportHandler.java | 50 ++++++++----- .../hdds/scm/pipeline/PipelineStateManager.java | 15 ++-- .../hdds/scm/pipeline/RatisPipelineProvider.java | 20 ++--- .../scm/pipeline/MockRatisPipelineProvider.java | 15 ++++ .../hdds/scm/pipeline/TestSCMPipelineManager.java | 85 ++++++++++++++++++++-- .../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 48 ++++++++++-- hadoop-ozone/pom.xml | 2 +- 16 files changed, 294 insertions(+), 71 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index c62d977..d8c7267 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -19,12 +19,15 @@ package org.apache.hadoop.hdds.scm.pipeline; import com.google.common.base.Preconditions; + +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.ratis.protocol.RaftPeerId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,8 +44,7 @@ import java.util.stream.Collectors; */ public final class Pipeline { - private static final Logger LOG = LoggerFactory - .getLogger(Pipeline.class); + private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class); private final PipelineID id; private final ReplicationType type; private final ReplicationFactor factor; @@ -51,6 +53,8 @@ public final class Pipeline { private Map<DatanodeDetails, Long> nodeStatus; // nodes with ordered distance to client private ThreadLocal<List<DatanodeDetails>> nodesInOrder = new ThreadLocal<>(); + // Current reported Leader for the pipeline + private RaftPeerId leaderId; /** * The immutable properties of pipeline object is used in @@ -103,6 +107,17 @@ public final class Pipeline { return state; } + public RaftPeerId getLeaderId() { + return leaderId; + } + + /** + * Pipeline object, outside of letting leader id to be set, is immutable. + */ + void setLeaderId(RaftPeerId leaderId) { + this.leaderId = leaderId; + } + /** * Returns the list of nodes which form this pipeline. * @@ -175,7 +190,7 @@ public final class Pipeline { .setType(type) .setFactor(factor) .setState(PipelineState.getProtobuf(state)) - .setLeaderID("") + .setLeaderID(leaderId != null ? leaderId.toString() : "") .addAllMembers(nodeStatus.keySet().stream() .map(DatanodeDetails::getProtoBufMessage) .collect(Collectors.toList())); @@ -207,6 +222,8 @@ public final class Pipeline { .setFactor(pipeline.getFactor()) .setType(pipeline.getType()) .setState(PipelineState.fromProtobuf(pipeline.getState())) + .setLeaderId(StringUtils.isNotEmpty(pipeline.getLeaderID()) ? + RaftPeerId.valueOf(pipeline.getLeaderID()) : null) .setNodes(pipeline.getMembersList().stream() .map(DatanodeDetails::getFromProtoBuf).collect(Collectors.toList())) .setNodesInOrder(pipeline.getMemberOrdersList()) @@ -275,6 +292,7 @@ public final class Pipeline { private Map<DatanodeDetails, Long> nodeStatus = null; private List<Integer> nodeOrder = null; private List<DatanodeDetails> nodesInOrder = null; + private RaftPeerId leaderId = null; public Builder() {} @@ -307,6 +325,11 @@ public final class Pipeline { return this; } + public Builder setLeaderId(RaftPeerId leaderId1) { + this.leaderId = leaderId1; + return this; + } + public Builder setNodes(List<DatanodeDetails> nodes) { this.nodeStatus = new LinkedHashMap<>(); nodes.forEach(node -> nodeStatus.put(node, -1L)); @@ -325,6 +348,7 @@ public final class Pipeline { Preconditions.checkNotNull(state); Preconditions.checkNotNull(nodeStatus); Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus); + pipeline.setLeaderId(leaderId); if (nodeOrder != null && !nodeOrder.isEmpty()) { // This branch is for build from ProtoBuf diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java index e7f4347..5bd3b58 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java @@ -33,7 +33,7 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVA * Publishes Pipeline which will be sent to SCM as part of heartbeat. * PipelineReport consist of the following information about each containers: * - pipelineID - * + * - LeaderID */ public class PipelineReportPublisher extends ReportPublisher<PipelineReportsProto> { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java index 536d4cc..e0cc0fd 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java @@ -27,7 +27,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -41,7 +43,8 @@ public final class ReportManager { LoggerFactory.getLogger(ReportManager.class); private final StateContext context; - private final List<ReportPublisher> publishers; + private final Map<Class<? extends GeneratedMessage>, ReportPublisher> + publishers; private final ScheduledExecutorService executorService; /** @@ -52,7 +55,7 @@ public final class ReportManager { * @param publishers List of publishers which generates report */ private ReportManager(StateContext context, - List<ReportPublisher> publishers) { + Map<Class<? extends GeneratedMessage>, ReportPublisher> publishers) { this.context = context; this.publishers = publishers; this.executorService = HadoopExecutors.newScheduledThreadPool( @@ -66,11 +69,16 @@ public final class ReportManager { * report publishers. */ public void init() { - for (ReportPublisher publisher : publishers) { + for (ReportPublisher publisher : publishers.values()) { publisher.init(context, executorService); } } + public ReportPublisher getReportPublisher( + Class<? extends GeneratedMessage> publisherClass) { + return publishers.get(publisherClass); + } + /** * Shutdown the ReportManager. */ @@ -94,20 +102,33 @@ public final class ReportManager { } /** + * Test friendly builder + */ + public static Builder newBuilder(ReportPublisherFactory publisherFactory) { + return new Builder(publisherFactory); + } + + /** * Builder to construct {@link ReportManager}. */ public static final class Builder { private StateContext stateContext; - private List<ReportPublisher> reportPublishers; + private Map<Class<? extends GeneratedMessage>, ReportPublisher> + reportPublishers; private ReportPublisherFactory publisherFactory; private Builder(Configuration conf) { - this.reportPublishers = new ArrayList<>(); + this.reportPublishers = new HashMap<>(); this.publisherFactory = new ReportPublisherFactory(conf); } + private Builder(ReportPublisherFactory publisherFactory) { + this.reportPublishers = new HashMap<>(); + this.publisherFactory = publisherFactory; + } + /** * Sets the {@link StateContext}. * @@ -128,19 +149,7 @@ public final class ReportManager { * @return ReportManager.Builder */ public Builder addPublisherFor(Class<? extends GeneratedMessage> report) { - reportPublishers.add(publisherFactory.getPublisherFor(report)); - return this; - } - - /** - * Adds new ReportPublisher to the ReportManager. - * - * @param publisher ReportPublisher - * - * @return ReportManager.Builder - */ - public Builder addPublisher(ReportPublisher publisher) { - reportPublishers.add(publisher); + reportPublishers.put(report, publisherFactory.getPublisherFor(report)); return this; } @@ -153,6 +162,5 @@ public final class ReportManager { Preconditions.checkNotNull(stateContext); return new ReportManager(stateContext, reportPublishers); } - } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index c9eb702..891bbb3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -486,4 +486,8 @@ public class DatanodeStateMachine implements Closeable { public ReplicationSupervisor getSupervisor() { return supervisor; } + + public ReportManager getReportManager() { + return reportManager; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index b89ec73..493b3bd 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -37,6 +37,8 @@ import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.util.Time; import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftGroupMemberId; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.impl.RaftServerProxy; @@ -868,4 +870,9 @@ public class ContainerStateMachine extends BaseStateMachine { executor.shutdown(); } } + + @Override + public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId raftPeerId) { + ratisServer.handleLeaderChangedNotification(groupMemberId, raftPeerId); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 80e91cd..2407429 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -19,11 +19,14 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; + import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction; @@ -66,8 +69,10 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Collections; import java.util.Set; @@ -107,6 +112,8 @@ public final class XceiverServerRatis extends XceiverServer { // TODO: Remove the gids set when Ratis supports an api to query active // pipelines private final Set<RaftGroupId> raftGids = new HashSet<>(); + // pipeline leaders + private Map<RaftGroupId, RaftPeerId> leaderIdMap = new HashMap<>(); @SuppressWarnings("parameternumber") private XceiverServerRatis(DatanodeDetails dd, int port, @@ -598,6 +605,9 @@ public final class XceiverServerRatis extends XceiverServer { for (RaftGroupId groupId : gids) { reports.add(PipelineReport.newBuilder() .setPipelineID(PipelineID.valueOf(groupId.getUuid()).getProtobuf()) + .setLeaderID(leaderIdMap.containsKey(groupId) ? + ByteString.copyFromUtf8(leaderIdMap.get(groupId).toString()) : + ByteString.EMPTY) .build()); } return reports; @@ -686,4 +696,17 @@ public final class XceiverServerRatis extends XceiverServer { void notifyGroupAdd(RaftGroupId gid) { raftGids.add(gid); } + + void handleLeaderChangedNotification(RaftGroupMemberId groupMemberId, + RaftPeerId raftPeerId) { + LOG.info("Leader change notification received for group: {} with new " + + "leaderId: {}", groupMemberId.getGroupId(), raftPeerId); + // Save the reported leader to be sent with the report to SCM + leaderIdMap.put(groupMemberId.getGroupId(), raftPeerId); + // Publish new reports with leaderID + context.getParent().getReportManager().getReportPublisher( + PipelineReportsProto.class).run(); + // Trigger HB immediately + context.getParent().triggerHeartbeat(); + } } diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index a975cd5..3699860 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -239,6 +239,7 @@ message ContainerAction { message PipelineReport { required PipelineID pipelineID = 1; + optional bytes leaderID = 2; } message PipelineReportsProto { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportManager.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportManager.java index aae388d..b15f8d2 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportManager.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportManager.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.junit.Test; +import org.mockito.Mock; import org.mockito.Mockito; import java.util.concurrent.ScheduledExecutorService; @@ -30,6 +31,8 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import com.google.protobuf.GeneratedMessage; + /** * Test cases to test {@link ReportManager}. */ @@ -40,13 +43,21 @@ public class TestReportManager { Configuration conf = new OzoneConfiguration(); StateContext dummyContext = Mockito.mock(StateContext.class); ReportPublisher dummyPublisher = Mockito.mock(ReportPublisher.class); - ReportManager.Builder builder = ReportManager.newBuilder(conf); + ReportPublisherFactory publisherFactory = Mockito.mock( + ReportPublisherFactory.class); + Mockito.when(publisherFactory.getPublisherFor(any())).thenReturn( + dummyPublisher); + ReportManager.Builder builder = ReportManager.newBuilder(publisherFactory); builder.setStateContext(dummyContext); - builder.addPublisher(dummyPublisher); + GeneratedMessage generatedMessageMock = Mockito.mock( + GeneratedMessage.class, Mockito.RETURNS_DEEP_STUBS); + builder.addPublisherFor(generatedMessageMock.getClass()); ReportManager reportManager = builder.build(); reportManager.init(); verify(dummyPublisher, times(1)).init(eq(dummyContext), any(ScheduledExecutorService.class)); } + + } diff --git a/hadoop-hdds/pom.xml b/hadoop-hdds/pom.xml index a174337..93df2d1 100644 --- a/hadoop-hdds/pom.xml +++ b/hadoop-hdds/pom.xml @@ -46,7 +46,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> <hdds.version>0.5.0-SNAPSHOT</hdds.version> <!-- Apache Ratis version --> - <ratis.version>0.5.0-201fc85-SNAPSHOT</ratis.version> + <ratis.version>0.5.0-63cd2fb-SNAPSHOT</ratis.version> <bouncycastle.version>1.60</bouncycastle.version> diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java index 793f4e2..10d40d9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java @@ -18,25 +18,28 @@ package org.apache.hadoop.hdds.scm.pipeline; -import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReport; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; -import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.server - .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; +import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.ratis.protocol.RaftPeerId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Objects; +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; /** * Handles Pipeline Reports from datanode. @@ -50,6 +53,8 @@ public class PipelineReportHandler implements private final Configuration conf; private final SCMSafeModeManager scmSafeModeManager; private final boolean pipelineAvailabilityCheck; + private Map<PipelineID, Map<UUID, ByteString>> + reportedLeadersForPipeline = new HashMap<>(); public PipelineReportHandler(SCMSafeModeManager scmSafeModeManager, PipelineManager pipelineManager, @@ -72,8 +77,8 @@ public class PipelineReportHandler implements DatanodeDetails dn = pipelineReportFromDatanode.getDatanodeDetails(); PipelineReportsProto pipelineReport = pipelineReportFromDatanode.getReport(); - Preconditions.checkNotNull(dn, "Pipeline Report is " - + "missing DatanodeDetails."); + Preconditions.checkNotNull(dn, "Pipeline Report is " + + "missing DatanodeDetails."); if (LOGGER.isTraceEnabled()) { LOGGER.trace("Processing pipeline report for dn: {}", dn); } @@ -89,7 +94,6 @@ public class PipelineReportHandler implements publisher.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT, pipelineReportFromDatanode); } - } private void processPipelineReport(PipelineReport report, DatanodeDetails dn) @@ -104,12 +108,24 @@ public class PipelineReportHandler implements return; } + if (report.hasLeaderID()) { + Map<UUID, ByteString> ids = + reportedLeadersForPipeline.computeIfAbsent(pipelineID, + k -> new HashMap<>()); + ids.put(dn.getUuid(), report.getLeaderID()); + } + if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) { - LOGGER.info("Pipeline {} reported by {}", pipeline.getId(), dn); - pipeline.reportDatanode(dn); - if (pipeline.isHealthy()) { - // if all the dns have reported, pipeline can be moved to OPEN state + LOGGER.info("Pipeline {} reported by {} with leaderId {}", + pipeline.getId(), dn, report.getLeaderID().toStringUtf8()); + Map<UUID, ByteString> leaderIdPairs = + reportedLeadersForPipeline.get(pipelineID); + if (leaderIdPairs.size() == pipeline.getFactor().getNumber() && + leaderIdPairs.values().stream().distinct().count() == 1) { + // All datanodes reported same leader pipelineManager.openPipeline(pipelineID); + pipeline.setLeaderId( + RaftPeerId.valueOf(report.getLeaderID().toString())); } } else { // In OPEN state case just report the datanode diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java index 7615057..9033b37 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java @@ -32,6 +32,8 @@ import java.util.Collection; import java.util.List; import java.util.NavigableSet; +import com.google.common.annotations.VisibleForTesting; + /** * Manages the state of pipelines in SCM. All write operations like pipeline * creation, removal and updates should come via SCMPipelineManager. @@ -52,9 +54,7 @@ class PipelineStateManager { void addPipeline(Pipeline pipeline) throws IOException { pipelineStateMap.addPipeline(pipeline); - if (pipeline.getPipelineState() == PipelineState.OPEN) { - LOG.info("Created pipeline " + pipeline); - } + LOG.info("Created pipeline " + pipeline); } void addContainerToPipeline(PipelineID pipelineId, ContainerID containerID) @@ -131,8 +131,8 @@ class PipelineStateManager { throw new IOException("Closed pipeline can not be opened"); } if (pipeline.getPipelineState() == PipelineState.ALLOCATED) { - pipeline = pipelineStateMap - .updatePipelineState(pipelineId, PipelineState.OPEN); + pipeline = pipelineStateMap.updatePipelineState( + pipelineId, PipelineState.OPEN); LOG.info("Pipeline {} moved to OPEN state", pipeline.toString()); } return pipeline; @@ -161,4 +161,9 @@ class PipelineStateManager { pipelineStateMap .updatePipelineState(pipelineID, PipelineState.DORMANT); } + + @VisibleForTesting + PipelineStateMap getPipelineStateMap() { + return pipelineStateMap; + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index 0324a58..c5dca2e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -58,6 +58,8 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; + /** * Implements Api for creating ratis pipelines. */ @@ -153,30 +155,23 @@ public class RatisPipelineProvider implements PipelineProvider { throw new InsufficientDatanodesException(e); } - Pipeline pipeline = Pipeline.newBuilder() - .setId(PipelineID.randomId()) - .setState(PipelineState.OPEN) - .setType(ReplicationType.RATIS) - .setFactor(factor) - .setNodes(dns) - .build(); + Pipeline pipeline = create(factor, dns); initializePipeline(pipeline); return pipeline; } @Override public Pipeline create(ReplicationFactor factor, - List<DatanodeDetails> nodes) { + List<DatanodeDetails> nodes) { return Pipeline.newBuilder() .setId(PipelineID.randomId()) - .setState(PipelineState.OPEN) + .setState(PipelineState.ALLOCATED) .setType(ReplicationType.RATIS) .setFactor(factor) .setNodes(nodes) .build(); } - @Override public void shutdown() { forkJoinPool.shutdownNow(); @@ -253,4 +248,9 @@ public class RatisPipelineProvider implements PipelineProvider { throw MultipleIOException.createIOException(exceptions); } } + + @VisibleForTesting + PipelineStateManager getStateManager() { + return stateManager; + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java index 01c53ba..a563ead 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java @@ -19,9 +19,13 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.node.NodeManager; import java.io.IOException; +import java.util.Collections; +import java.util.List; /** * Mock Ratis Pipeline Provider for Mock Nodes. @@ -42,4 +46,15 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider { public void shutdown() { // Do nothing. } + + @Override + public Pipeline create(HddsProtos.ReplicationFactor factor, List<DatanodeDetails> nodes) { + return Pipeline.newBuilder() + .setId(PipelineID.randomId()) + .setState(Pipeline.PipelineState.OPEN) + .setType(HddsProtos.ReplicationType.RATIS) + .setFactor(factor) + .setNodes(nodes) + .build(); + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java index 2a486b1..7fcd1c1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -21,16 +21,25 @@ package org.apache.hadoop.hdds.scm.pipeline; import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.TestUtils; -import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.MockNodeManager; +import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.metrics2.MetricsRecordBuilder; @@ -40,12 +49,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import com.google.protobuf.ByteString; /** * Test cases to verify PipelineManager. @@ -314,4 +318,71 @@ public class TestSCMPipelineManager { pipelineManager.close(); } + + @Test + public void testPipelineOpenOnlyWhenLeaderReported() throws Exception { + EventQueue eventQueue = new EventQueue(); + SCMPipelineManager pipelineManager = + new SCMPipelineManager(conf, nodeManager, eventQueue, null); + PipelineProvider mockRatisProvider = + new MockRatisPipelineProvider(nodeManager, + pipelineManager.getStateManager(), conf); + pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, + mockRatisProvider); + Pipeline pipeline = pipelineManager + .createPipeline(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + // close manager + pipelineManager.close(); + // new pipeline manager loads the pipelines from the db in ALLOCATED state + pipelineManager = + new SCMPipelineManager(conf, nodeManager, eventQueue, null); + mockRatisProvider = + new MockRatisPipelineProvider(nodeManager, + pipelineManager.getStateManager(), conf); + pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, + mockRatisProvider); + Assert.assertEquals(Pipeline.PipelineState.ALLOCATED, + pipelineManager.getPipeline(pipeline.getId()).getPipelineState()); + + SCMSafeModeManager scmSafeModeManager = + new SCMSafeModeManager(new OzoneConfiguration(), + new ArrayList<>(), pipelineManager, eventQueue); + PipelineReportHandler pipelineReportHandler = + new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf); + + // Report pipelines with leaders + List<DatanodeDetails> nodes = pipeline.getNodes(); + Assert.assertEquals(3, nodes.size()); + // Send leader for only first 2 dns + nodes.subList(0 ,2).forEach(dn -> + sendPipelineReport(dn, pipeline, pipelineReportHandler, true)); + sendPipelineReport(nodes.get(2), pipeline, pipelineReportHandler, false); + + Assert.assertEquals(Pipeline.PipelineState.ALLOCATED, + pipelineManager.getPipeline(pipeline.getId()).getPipelineState()); + + nodes.forEach(dn -> + sendPipelineReport(dn, pipeline, pipelineReportHandler, true)); + + Assert.assertEquals(Pipeline.PipelineState.OPEN, + pipelineManager.getPipeline(pipeline.getId()).getPipelineState()); + } + + private void sendPipelineReport(DatanodeDetails dn, + Pipeline pipeline, PipelineReportHandler pipelineReportHandler, + boolean sendLeaderId) { + + PipelineReportsProto.Builder reportProtoBuilder = + PipelineReportsProto.newBuilder(); + PipelineReport.Builder reportBuilder = PipelineReport.newBuilder(); + reportBuilder.setPipelineID(pipeline.getId().getProtobuf()); + if (sendLeaderId) { + reportBuilder.setLeaderID(ByteString.copyFromUtf8("raftPeer-1")); + } + + pipelineReportHandler.onMessage(new PipelineReportFromDatanode(dn, + reportProtoBuilder.addPipelineReport( + reportBuilder.build()).build()), new EventQueue()); + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index ac76482..0f2e209 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -28,6 +28,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -64,6 +66,7 @@ import java.nio.file.Paths; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState @@ -95,6 +98,8 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster { // Timeout for the cluster to be ready private int waitForClusterToBeReadyTimeout = 60000; // 1 min + // Timeout for all/any pipelines to be in open state + private int waitForPipelineOpenTimeout = 60000; private CertificateClient caClient; /** @@ -143,10 +148,31 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster { throws TimeoutException, InterruptedException { GenericTestUtils.waitFor(() -> { final int healthy = scm.getNodeCount(HEALTHY); - final boolean isReady = healthy == hddsDatanodes.size(); - LOG.info("{}. Got {} of {} DN Heartbeats.", - isReady? "Cluster is ready" : "Waiting for cluster to be ready", - healthy, hddsDatanodes.size()); + boolean isReady = healthy == hddsDatanodes.size(); + boolean printIsReadyMsg = true; + List<Pipeline> pipelines = scm.getPipelineManager().getPipelines(); + if (!pipelines.isEmpty()) { + List<Pipeline> raftPipelines = pipelines.stream().filter(p -> + p.getType() == HddsProtos.ReplicationType.RATIS).collect( + Collectors.toList()); + if (!raftPipelines.isEmpty()) { + List<Pipeline> notOpenPipelines = raftPipelines.stream().filter(p -> + p.getPipelineState() != Pipeline.PipelineState.OPEN && + p.getPipelineState() != Pipeline.PipelineState.CLOSED) + .collect(Collectors.toList()); + if (notOpenPipelines.size() > 0) { + LOG.info("Waiting for {} number of pipelines out of {}, to report " + + "a leader.", notOpenPipelines.size(), raftPipelines.size()); + isReady = false; + printIsReadyMsg = false; + } + } + } + if (printIsReadyMsg) { + LOG.info("{}. Got {} of {} DN Heartbeats.", + isReady ? "Cluster is ready" : "Waiting for cluster to be ready", + healthy, hddsDatanodes.size()); + } return isReady; }, 1000, waitForClusterToBeReadyTimeout); } @@ -260,6 +286,18 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster { ozoneManager.restart(); } + private void waitForHddsDatanodesStop() throws TimeoutException, + InterruptedException { + GenericTestUtils.waitFor(() -> { + final int healthy = scm.getNodeCount(HEALTHY); + boolean isReady = healthy == hddsDatanodes.size(); + if (!isReady) { + LOG.info("Waiting on {} datanodes to be marked unhealthy.", healthy); + } + return isReady; + }, 1000, waitForClusterToBeReadyTimeout); + } + @Override public void restartHddsDatanode(int i, boolean waitForDatanode) throws InterruptedException, TimeoutException { @@ -279,7 +317,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster { hddsDatanodes.remove(i); if (waitForDatanode) { // wait for node to be removed from SCM healthy node list. - waitForClusterToBeReady(); + waitForHddsDatanodesStop(); } String[] args = new String[]{}; HddsDatanodeService service = diff --git a/hadoop-ozone/pom.xml b/hadoop-ozone/pom.xml index 825e65c..da2f0de 100644 --- a/hadoop-ozone/pom.xml +++ b/hadoop-ozone/pom.xml @@ -28,7 +28,7 @@ <properties> <hdds.version>0.5.0-SNAPSHOT</hdds.version> <ozone.version>0.5.0-SNAPSHOT</ozone.version> - <ratis.version>0.5.0-201fc85-SNAPSHOT</ratis.version> + <ratis.version>0.5.0-63cd2fb-SNAPSHOT</ratis.version> <bouncycastle.version>1.60</bouncycastle.version> <ozone.release>Crater Lake</ozone.release> <declared.ozone.version>${ozone.version}</declared.ozone.version> --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-commits-h...@hadoop.apache.org