This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new a0a936d  HDDS-4496. Separate client and server2server GRPC services of 
datanode (#1636)
a0a936d is described below

commit a0a936dda93ee513c8eaf1906cd359d10b02b892
Author: Elek, Márton <[email protected]>
AuthorDate: Wed Jan 6 11:36:45 2021 +0100

    HDDS-4496. Separate client and server2server GRPC services of datanode 
(#1636)
---
 .../hadoop/hdds/protocol/DatanodeDetails.java      |  18 ++-
 .../apache/hadoop/ozone/HddsDatanodeService.java   |  12 +-
 .../common/transport/server/XceiverServerGrpc.java |   8 +-
 .../ozone/container/ozoneimpl/OzoneContainer.java  |  39 ++++--
 .../replication/GrpcReplicationClient.java         |  15 ++-
 .../container/replication/ReplicationServer.java   | 149 +++++++++++++++++++++
 .../replication/SimpleContainerDownloader.java     |   6 +-
 .../replication/TestSimpleContainerDownloader.java |   4 +-
 .../ozone/container/common/TestEndPoint.java       |  54 ++++----
 hadoop-ozone/dev-support/intellij/ozone-site.xml   |   4 +
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |   3 +
 .../container/metrics/TestContainerMetrics.java    |  13 +-
 .../container/server/TestContainerServer.java      |  75 +++++------
 .../server/TestSecureContainerServer.java          |  57 ++++----
 14 files changed, 311 insertions(+), 146 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index 3c1c6ec..60fedd2 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -18,17 +18,19 @@
 
 package org.apache.hadoop.hdds.protocol;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.annotation.InterfaceStability;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.net.NetConstants;
 import org.apache.hadoop.hdds.scm.net.NodeImpl;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 
 /**
  * DatanodeDetails class contains details about DataNode like:
@@ -183,6 +185,10 @@ public class DatanodeDetails extends NodeImpl implements
     ports.add(port);
   }
 
+  public void setPort(Name name, int port) {
+    setPort(new Port(name, port));
+  }
+
   /**
    * Returns all the Ports used by DataNode.
    *
@@ -678,7 +684,7 @@ public class DatanodeDetails extends NodeImpl implements
      * Ports that are supported in DataNode.
      */
     public enum Name {
-      STANDALONE, RATIS, REST
+      STANDALONE, RATIS, REST, REPLICATION
     }
 
     private Name name;
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
index cfb22e3..2d1d4e3 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.ozone;
 
+import javax.management.ObjectName;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
@@ -27,10 +28,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-import com.sun.jmx.mbeanserver.Introspector;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
 import org.apache.hadoop.hdds.HddsUtils;
@@ -42,7 +42,6 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertResponseProto;
 import 
org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
@@ -50,6 +49,7 @@ import 
org.apache.hadoop.hdds.security.x509.certificate.client.DNCertificateClie
 import 
org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest;
 import org.apache.hadoop.hdds.server.http.RatisDropwizardExports;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdds.utils.HddsVersionInfo;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
@@ -61,22 +61,20 @@ import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.util.ServicePlugin;
+import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.sun.jmx.mbeanserver.Introspector;
 import static 
org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec.getX509Certificate;
 import static 
org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getEncodedString;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_DATANODE_PLUGINS_KEY;
 import static org.apache.hadoop.util.ExitUtil.terminate;
-
-import org.apache.hadoop.util.Time;
 import org.bouncycastle.pkcs.PKCS10CertificationRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import picocli.CommandLine.Command;
 
-import javax.management.ObjectName;
-
 /**
  * Datanode service plugin to start the HDDS container services.
  */
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
index 3647af1..4bbde9d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -45,7 +45,6 @@ import com.google.common.base.Preconditions;
 import io.opentracing.Scope;
 import io.opentracing.Span;
 import io.opentracing.util.GlobalTracer;
-import org.apache.ratis.thirdparty.io.grpc.BindableService;
 import org.apache.ratis.thirdparty.io.grpc.Server;
 import org.apache.ratis.thirdparty.io.grpc.ServerBuilder;
 import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors;
@@ -78,8 +77,7 @@ public final class XceiverServerGrpc implements 
XceiverServerSpi {
    */
   public XceiverServerGrpc(DatanodeDetails datanodeDetails,
       ConfigurationSource conf,
-      ContainerDispatcher dispatcher, CertificateClient caClient,
-      BindableService... additionalServices) {
+      ContainerDispatcher dispatcher, CertificateClient caClient) {
     Preconditions.checkNotNull(conf);
 
     this.id = datanodeDetails.getUuid();
@@ -100,10 +98,6 @@ public final class XceiverServerGrpc implements 
XceiverServerSpi {
     nettyServerBuilder.addService(ServerInterceptors.intercept(
         new GrpcXceiverService(dispatcher), tracingInterceptor));
 
-    for (BindableService service : additionalServices) {
-      nettyServerBuilder.addService(service);
-    }
-
     SecurityConfig secConf = new SecurityConfig(conf);
     if (secConf.isGrpcTlsEnabled()) {
       try {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index a44ef38..3ecddac 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -29,6 +29,7 @@ import java.util.function.Consumer;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
@@ -52,8 +53,8 @@ import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverSe
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import 
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
-import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
-import 
org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
+import org.apache.hadoop.ozone.container.replication.ReplicationServer;
+import 
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -85,19 +86,25 @@ public class OzoneContainer {
   private List<ContainerDataScanner> dataScanners;
   private final BlockDeletingService blockDeletingService;
   private final GrpcTlsConfig tlsClientConfig;
+  private final ReplicationServer replicationServer;
+  private DatanodeDetails datanodeDetails;
 
   /**
    * Construct OzoneContainer object.
+   *
    * @param datanodeDetails
    * @param conf
    * @param certClient
    * @throws DiskOutOfSpaceException
    * @throws IOException
    */
-  public OzoneContainer(DatanodeDetails datanodeDetails, ConfigurationSource
-      conf, StateContext context, CertificateClient certClient)
+  public OzoneContainer(
+      DatanodeDetails datanodeDetails, ConfigurationSource
+      conf, StateContext context, CertificateClient certClient
+  )
       throws IOException {
     config = conf;
+    this.datanodeDetails = datanodeDetails;
     volumeSet = new MutableVolumeSet(datanodeDetails.getUuidString(), conf);
     volumeSet.setFailedVolumeListener(this::handleVolumeFailures);
     containerSet = new ContainerSet();
@@ -135,14 +142,22 @@ public class OzoneContainer {
      * XceiverServerGrpc is the read channel
      */
     controller = new ContainerController(containerSet, handlers);
+
     writeChannel = XceiverServerRatis.newXceiverServerRatis(
         datanodeDetails, config, hddsDispatcher, controller, certClient,
         context);
+
+    replicationServer = new ReplicationServer(
+        controller,
+        conf.getObject(ReplicationConfig.class),
+        secConf,
+        certClient);
+
     readChannel = new XceiverServerGrpc(
-        datanodeDetails, config, hddsDispatcher, certClient,
-        createReplicationService());
+        datanodeDetails, config, hddsDispatcher, certClient);
     Duration svcInterval = conf.getObject(
             DatanodeConfiguration.class).getBlockDeletionInterval();
+
     long serviceTimeout = config
         .getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
             OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
@@ -158,10 +173,7 @@ public class OzoneContainer {
     return tlsClientConfig;
   }
 
-  private GrpcReplicationService createReplicationService() {
-    return new GrpcReplicationService(
-        new OnDemandContainerReplicationSource(controller));
-  }
+
 
   /**
    * Build's container map.
@@ -169,7 +181,7 @@ public class OzoneContainer {
   private void buildContainerSet() {
     Iterator<HddsVolume> volumeSetIterator = volumeSet.getVolumesList()
         .iterator();
-    ArrayList<Thread> volumeThreads = new ArrayList<Thread>();
+    ArrayList<Thread> volumeThreads = new ArrayList<>();
     long startTime = System.currentTimeMillis();
 
     //TODO: diskchecker should be run before this, to see how disks are.
@@ -242,6 +254,10 @@ public class OzoneContainer {
   public void start(String scmId) throws IOException {
     LOG.info("Attempting to start container services.");
     startContainerScrub();
+
+    replicationServer.start();
+    datanodeDetails.setPort(Name.REPLICATION, replicationServer.getPort());
+
     writeChannel.start();
     readChannel.start();
     hddsDispatcher.init();
@@ -256,6 +272,7 @@ public class OzoneContainer {
     //TODO: at end of container IO integration work.
     LOG.info("Attempting to stop container services.");
     stopContainerScrub();
+    replicationServer.stop();
     writeChannel.stop();
     readChannel.stop();
     this.handlers.values().forEach(Handler::stop);
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
index 275321d..53dac9d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.container.replication;
 
+import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -40,6 +41,7 @@ import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
 import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
 import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.thirdparty.io.netty.handler.ssl.ClientAuth;
 import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,20 +60,27 @@ public class GrpcReplicationClient implements AutoCloseable{
 
   private final Path workingDirectory;
 
-  public GrpcReplicationClient(String host, int port, Path workingDir,
-      SecurityConfig secConfig, X509Certificate caCert) throws IOException {
+  public GrpcReplicationClient(
+      String host, int port, Path workingDir,
+      SecurityConfig secConfig, X509Certificate caCert
+  ) throws IOException {
     NettyChannelBuilder channelBuilder =
         NettyChannelBuilder.forAddress(host, port)
             .usePlaintext()
             .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE);
 
-    if (secConfig.isGrpcTlsEnabled()) {
+    if (secConfig.isSecurityEnabled()) {
       channelBuilder.useTransportSecurity();
 
       SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
       if (caCert != null) {
         sslContextBuilder.trustManager(caCert);
       }
+
+      sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
+      sslContextBuilder.keyManager(
+          new File(secConfig.getCertificateFileName()),
+          new File(secConfig.getPrivateKeyFileName()));
       if (secConfig.useTestCert()) {
         channelBuilder.overrideAuthority("localhost");
       }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
new file mode 100644
index 0000000..1288b22
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import javax.net.ssl.SSLException;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigTag;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.hdds.tracing.GrpcServerInterceptor;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+
+import org.apache.ratis.thirdparty.io.grpc.Server;
+import org.apache.ratis.thirdparty.io.grpc.ServerBuilder;
+import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors;
+import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
+import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
+import org.apache.ratis.thirdparty.io.netty.handler.ssl.ClientAuth;
+import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Separated network server for server2server container replication.
+ */
+public class ReplicationServer {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReplicationServer.class);
+
+  private Server server;
+
+  private SecurityConfig secConf;
+
+  private CertificateClient caClient;
+
+  private ContainerController controller;
+
+  private int port;
+
+  public ReplicationServer(
+      ContainerController controller,
+      ReplicationConfig replicationConfig,
+      SecurityConfig secConf,
+      CertificateClient caClient
+  ) {
+    this.secConf = secConf;
+    this.caClient = caClient;
+    this.controller = controller;
+    this.port = replicationConfig.getPort();
+    init();
+  }
+
+  public void init() {
+    NettyServerBuilder nettyServerBuilder =
+        ((NettyServerBuilder) ServerBuilder.forPort(port))
+            .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE);
+
+    GrpcServerInterceptor tracingInterceptor = new GrpcServerInterceptor();
+    nettyServerBuilder
+        .addService(ServerInterceptors.intercept(new GrpcReplicationService(
+            new OnDemandContainerReplicationSource(controller)
+        ), tracingInterceptor));
+
+    if (secConf.isSecurityEnabled()) {
+      try {
+        SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(
+            caClient.getPrivateKey(), caClient.getCertificate());
+
+        sslContextBuilder = GrpcSslContexts.configure(
+            sslContextBuilder, secConf.getGrpcSslProvider());
+
+        sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
+        sslContextBuilder.trustManager(caClient.getCACertificate());
+
+        nettyServerBuilder.sslContext(sslContextBuilder.build());
+      } catch (SSLException ex) {
+        throw new IllegalArgumentException(
+            "Unable to setup TLS for secure datanode replication GRPC "
+                + "endpoint.", ex);
+      }
+    }
+
+    server = nettyServerBuilder.build();
+  }
+
+  public void start() throws IOException {
+    server.start();
+
+    if (port == 0) {
+      LOG.info("{} is started using port {}", getClass().getSimpleName(),
+          server.getPort());
+    }
+
+    port = server.getPort();
+
+  }
+
+  public void stop() {
+    try {
+      server.shutdown().awaitTermination(10L, TimeUnit.SECONDS);
+    } catch (InterruptedException ex) {
+      LOG.warn("{} couldn't be stopped gracefully", 
getClass().getSimpleName());
+    }
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  @ConfigGroup(prefix = "hdds.datanode.replication")
+  public static final class ReplicationConfig {
+
+    @Config(key = "port", defaultValue = "9886", description = "Port used for"
+        + " the server2server replication server", tags = {
+        ConfigTag.MANAGEMENT})
+    private int port;
+
+    public int getPort() {
+      return port;
+    }
+
+    public ReplicationConfig setPort(int portParam) {
+      this.port = portParam;
+      return this;
+    }
+  }
+
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
index 5d8a86b..0967503 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.container.replication;
 
+import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.security.cert.X509Certificate;
@@ -87,6 +88,7 @@ public class SimpleContainerDownloader implements 
ContainerDownloader {
         if (result == null) {
           result = downloadContainer(containerId, datanode);
         } else {
+
           result = result.exceptionally(t -> {
             LOG.error("Error on replicating container: " + containerId, t);
             try {
@@ -128,11 +130,11 @@ public class SimpleContainerDownloader implements 
ContainerDownloader {
   protected CompletableFuture<Path> downloadContainer(
       long containerId,
       DatanodeDetails datanode
-  ) throws Exception {
+  ) throws IOException {
     CompletableFuture<Path> result;
     GrpcReplicationClient grpcReplicationClient =
         new GrpcReplicationClient(datanode.getIpAddress(),
-            datanode.getPort(Name.STANDALONE).getValue(),
+            datanode.getPort(Name.REPLICATION).getValue(),
             workingDirectory, securityConfig, caCert);
     result = grpcReplicationClient.download(containerId)
         .thenApply(r -> {
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java
index f29b157..7070425 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java
@@ -115,7 +115,7 @@ public class TestSimpleContainerDownloader {
           @Override
           protected CompletableFuture<Path> downloadContainer(
               long containerId, DatanodeDetails datanode
-          ) throws Exception {
+          ) {
             //download is always successful.
             return CompletableFuture
                 .completedFuture(Paths.get(datanode.getUuidString()));
@@ -169,7 +169,7 @@ public class TestSimpleContainerDownloader {
       protected CompletableFuture<Path> downloadContainer(
           long containerId,
           DatanodeDetails datanode
-      ) throws Exception {
+      ) {
 
         if (datanodes.contains(datanode)) {
           if (directException) {
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index 8cad8b0..7a71c80 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -53,6 +53,7 @@ import 
org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointT
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import 
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
 import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.PathUtils;
@@ -105,6 +106,7 @@ public class TestEndPoint {
     config
         .setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, true);
     config.set(HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL, "1s");
+    config.setFromObject(new ReplicationConfig().setPort(0));
   }
 
   @Test
@@ -114,8 +116,8 @@ public class TestEndPoint {
    */
   public void testGetVersion() throws Exception {
     try (EndpointStateMachine rpcEndPoint =
-             createEndpoint(SCMTestUtils.getConf(),
-                 serverAddress, 1000)) {
+        createEndpoint(SCMTestUtils.getConf(),
+            serverAddress, 1000)) {
       SCMVersionResponseProto responseProto = rpcEndPoint.getEndPoint()
           .getVersion(null);
       Assert.assertNotNull(responseProto);
@@ -133,6 +135,7 @@ public class TestEndPoint {
    */
   public void testGetVersionTask() throws Exception {
     OzoneConfiguration conf = SCMTestUtils.getConf();
+    conf.setFromObject(new ReplicationConfig().setPort(0));
     try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
         serverAddress, 1000)) {
       DatanodeDetails datanodeDetails = randomDatanodeDetails();
@@ -160,6 +163,7 @@ public class TestEndPoint {
         true);
     conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
         true);
+    conf.setFromObject(new ReplicationConfig().setPort(0));
     try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
         serverAddress, 1000)) {
       GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
@@ -187,7 +191,7 @@ public class TestEndPoint {
       rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
       newState = versionTask.call();
       Assert.assertEquals(EndpointStateMachine.EndPointStates.SHUTDOWN,
-            newState);
+          newState);
       List<HddsVolume> volumesList = ozoneContainer.getVolumeSet()
           .getFailedVolumesList();
       Assert.assertTrue(volumesList.size() == 1);
@@ -204,8 +208,6 @@ public class TestEndPoint {
     }
   }
 
-
-
   @Test
   /**
    * This test makes a call to end point where there is no SCM server. We
@@ -273,7 +275,7 @@ public class TestEndPoint {
                   .createNodeReport(
                       getStorageReports(nodeToRegister.getUuid())),
               TestUtils.getRandomContainerReports(10),
-                  TestUtils.getRandomPipelineReports());
+              TestUtils.getRandomPipelineReports());
       Assert.assertNotNull(responseProto);
       Assert.assertEquals(nodeToRegister.getUuidString(),
           responseProto.getDatanodeUUID());
@@ -289,8 +291,10 @@ public class TestEndPoint {
     return TestUtils.createStorageReport(id, storagePath, 100, 10, 90, null);
   }
 
-  private EndpointStateMachine registerTaskHelper(InetSocketAddress scmAddress,
-      int rpcTimeout, boolean clearDatanodeDetails) throws Exception {
+  private EndpointStateMachine registerTaskHelper(
+      InetSocketAddress scmAddress,
+      int rpcTimeout, boolean clearDatanodeDetails
+  ) throws Exception {
     OzoneConfiguration conf = SCMTestUtils.getConf();
     EndpointStateMachine rpcEndPoint =
         createEndpoint(conf,
@@ -304,7 +308,7 @@ public class TestEndPoint {
         TestUtils.getRandomContainerReports(10));
     when(ozoneContainer.getController()).thenReturn(controller);
     when(ozoneContainer.getPipelineReport()).thenReturn(
-            TestUtils.getRandomPipelineReports());
+        TestUtils.getRandomPipelineReports());
     RegisterEndpointTask endpointTask =
         new RegisterEndpointTask(rpcEndPoint, conf, ozoneContainer,
             mock(StateContext.class));
@@ -319,7 +323,7 @@ public class TestEndPoint {
   @Test
   public void testRegisterTask() throws Exception {
     try (EndpointStateMachine rpcEndpoint =
-             registerTaskHelper(serverAddress, 1000, false)) {
+        registerTaskHelper(serverAddress, 1000, false)) {
       // Successful register should move us to Heartbeat state.
       Assert.assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT,
           rpcEndpoint.getState());
@@ -330,7 +334,7 @@ public class TestEndPoint {
   public void testRegisterToInvalidEndpoint() throws Exception {
     InetSocketAddress address = SCMTestUtils.getReuseableAddress();
     try (EndpointStateMachine rpcEndpoint =
-             registerTaskHelper(address, 1000, false)) {
+        registerTaskHelper(address, 1000, false)) {
       Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER,
           rpcEndpoint.getState());
     }
@@ -340,7 +344,7 @@ public class TestEndPoint {
   public void testRegisterNoContainerID() throws Exception {
     InetSocketAddress address = SCMTestUtils.getReuseableAddress();
     try (EndpointStateMachine rpcEndpoint =
-             registerTaskHelper(address, 1000, true)) {
+        registerTaskHelper(address, 1000, true)) {
       // No Container ID, therefore we tell the datanode that we would like to
       // shutdown.
       Assert.assertEquals(EndpointStateMachine.EndPointStates.SHUTDOWN,
@@ -364,8 +368,8 @@ public class TestEndPoint {
   public void testHeartbeat() throws Exception {
     DatanodeDetails dataNode = randomDatanodeDetails();
     try (EndpointStateMachine rpcEndPoint =
-             createEndpoint(SCMTestUtils.getConf(),
-                 serverAddress, 1000)) {
+        createEndpoint(SCMTestUtils.getConf(),
+            serverAddress, 1000)) {
       SCMHeartbeatRequestProto request = SCMHeartbeatRequestProto.newBuilder()
           .setDatanodeDetails(dataNode.getProtoBufMessage())
           .setNodeReport(TestUtils.createNodeReport(
@@ -388,7 +392,6 @@ public class TestEndPoint {
       // Add some scmCommands for heartbeat response
       addScmCommands();
 
-
       SCMHeartbeatRequestProto request = SCMHeartbeatRequestProto.newBuilder()
           .setDatanodeDetails(dataNode.getProtoBufMessage())
           .setNodeReport(TestUtils.createNodeReport(
@@ -419,17 +422,17 @@ public class TestEndPoint {
     SCMCommandProto closeCommand = SCMCommandProto.newBuilder()
         .setCloseContainerCommandProto(
             CloseContainerCommandProto.newBuilder().setCmdId(1)
-        .setContainerID(1)
-        .setPipelineID(PipelineID.randomId().getProtobuf())
-        .build())
+                .setContainerID(1)
+                .setPipelineID(PipelineID.randomId().getProtobuf())
+                .build())
         .setCommandType(Type.closeContainerCommand)
         .build();
     SCMCommandProto replicationCommand = SCMCommandProto.newBuilder()
         .setReplicateContainerCommandProto(
             ReplicateContainerCommandProto.newBuilder()
-        .setCmdId(2)
-        .setContainerID(2)
-        .build())
+                .setCmdId(2)
+                .setContainerID(2)
+                .build())
         .setCommandType(Type.replicateContainerCommand)
         .build();
     SCMCommandProto deleteBlockCommand = SCMCommandProto.newBuilder()
@@ -450,8 +453,10 @@ public class TestEndPoint {
     scmServerImpl.addScmCommandRequest(replicationCommand);
   }
 
-  private StateContext heartbeatTaskHelper(InetSocketAddress scmAddress,
-      int rpcTimeout) throws Exception {
+  private StateContext heartbeatTaskHelper(
+      InetSocketAddress scmAddress,
+      int rpcTimeout
+  ) throws Exception {
     OzoneConfiguration conf = SCMTestUtils.getConf();
     conf.set(DFS_DATANODE_DATA_DIR_KEY, testDir.getAbsolutePath());
     conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath());
@@ -460,11 +465,10 @@ public class TestEndPoint {
     // hard coding once we fix the Ratis default behaviour.
     conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, true);
 
-
     // Create a datanode state machine for stateConext used by endpoint task
     try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(
         randomDatanodeDetails(), conf, null, null);
-         EndpointStateMachine rpcEndPoint =
+        EndpointStateMachine rpcEndPoint =
             createEndpoint(conf, scmAddress, rpcTimeout)) {
       HddsProtos.DatanodeDetailsProto datanodeDetailsProto =
           randomDatanodeDetails().getProtoBufMessage();
diff --git a/hadoop-ozone/dev-support/intellij/ozone-site.xml 
b/hadoop-ozone/dev-support/intellij/ozone-site.xml
index 3fde850..e691d91 100644
--- a/hadoop-ozone/dev-support/intellij/ozone-site.xml
+++ b/hadoop-ozone/dev-support/intellij/ozone-site.xml
@@ -67,4 +67,8 @@
     <name>ozone.recon.db.dir</name>
     <value>/tmp/recon</value>
   </property>
+  <property>
+    <name>datanode.replication.port</name>
+    <value>0</value>
+  </property>
 </configuration>
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 629ab5a..c955948 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.common.Storage.StorageState;
 import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
+import 
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMStorage;
 import org.apache.hadoop.ozone.om.OzoneManager;
@@ -788,6 +789,8 @@ public class MiniOzoneClusterImpl implements 
MiniOzoneCluster {
           randomContainerPort);
       conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
           randomContainerPort);
+
+      conf.setFromObject(new ReplicationConfig().setPort(0));
     }
 
     private void configureTrace() {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
index ae8aae9..b25d4b0 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
@@ -47,9 +47,6 @@ import 
org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGr
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
-import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
-import 
org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
 import org.apache.hadoop.test.GenericTestUtils;
 
 import com.google.common.collect.Maps;
@@ -74,12 +71,6 @@ public class TestContainerMetrics {
   @Rule
   public Timeout timeout = new Timeout(300000);
 
-  private GrpcReplicationService createReplicationService(
-      ContainerController controller) {
-    return new GrpcReplicationService(
-        new OnDemandContainerReplicationSource(controller));
-  }
-
   @Test
   public void testContainerMetrics() throws Exception {
     XceiverServerGrpc server = null;
@@ -123,9 +114,7 @@ public class TestContainerMetrics {
           volumeSet, handlers, context, metrics, null);
       dispatcher.setScmId(UUID.randomUUID().toString());
 
-      server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher, null,
-          createReplicationService(new ContainerController(
-              containerSet, handlers)));
+      server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher, null);
       client = new XceiverClientGrpc(pipeline, conf);
 
       server.start();
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
index 77ca936..a29453d 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
@@ -18,65 +18,60 @@
 
 package org.apache.hadoop.ozone.container.server;
 
-import com.google.common.collect.Maps;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
 import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
+import org.apache.hadoop.hdds.scm.XceiverClientRatis;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.DNCertificateClient;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.RatisTestHelper;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
 import org.apache.hadoop.ozone.container.common.impl.TestHddsDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
-import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
-import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
-import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
-import 
org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandResponseProto;
-
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.RatisTestHelper;
-import org.apache.hadoop.ozone.container.ContainerTestHelper;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import 
org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
 import 
org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
+import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
-import org.apache.hadoop.hdds.scm.XceiverClientRatis;
-import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.test.GenericTestUtils;
+
+import com.google.common.collect.Maps;
+import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
 import org.apache.ratis.rpc.RpcType;
+import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
+import static org.apache.ratis.rpc.SupportedRpcType.NETTY;
 import org.apache.ratis.util.function.CheckedBiConsumer;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.Mockito;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
-import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
-import static org.apache.ratis.rpc.SupportedRpcType.NETTY;
 import static org.mockito.Mockito.mock;
 
 /**
@@ -89,12 +84,6 @@ public class TestContainerServer {
   private static final OzoneConfiguration CONF = new OzoneConfiguration();
   private static CertificateClient caClient;
 
-  private GrpcReplicationService createReplicationService(
-      ContainerController containerController) {
-    return new GrpcReplicationService(
-        new OnDemandContainerReplicationSource(containerController));
-  }
-
   @BeforeClass
   static public void setup() {
     CONF.set(HddsConfigKeys.HDDS_METADATA_DIR_NAME, TEST_DIR);
@@ -113,8 +102,7 @@ public class TestContainerServer {
                     .getPort(DatanodeDetails.Port.Name.STANDALONE).getValue()),
         XceiverClientGrpc::new,
         (dn, conf) -> new XceiverServerGrpc(datanodeDetails, conf,
-            new TestContainerDispatcher(), caClient,
-            createReplicationService(controller)), (dn, p) -> {
+            new TestContainerDispatcher(), caClient), (dn, p) -> {
         });
   }
 
@@ -238,8 +226,7 @@ public class TestContainerServer {
       dispatcher.init();
 
       server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher,
-          caClient, createReplicationService(
-              new ContainerController(containerSet, null)));
+          caClient);
       client = new XceiverClientGrpc(pipeline, conf);
 
       server.start();
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
index c319c1a..f050e2a 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
@@ -18,11 +18,16 @@
 
 package org.apache.hadoop.ozone.container.server;
 
-import com.google.common.collect.Maps;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.commons.lang3.RandomUtils;
-import org.apache.commons.lang3.exception.ExceptionUtils;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Consumer;
+
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -31,6 +36,7 @@ import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
 import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
 import org.apache.hadoop.hdds.scm.XceiverClientRatis;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
@@ -55,8 +61,8 @@ import 
org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import 
org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
 import 
org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
-import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
 import 
org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
@@ -65,35 +71,33 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.util.Time;
-import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
 
+import com.google.common.collect.Maps;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.SUCCESS;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
+import static 
org.apache.hadoop.ozone.container.ContainerTestHelper.getPutBlockRequest;
+import static 
org.apache.hadoop.ozone.container.ContainerTestHelper.getTestBlockID;
+import static 
org.apache.hadoop.ozone.container.ContainerTestHelper.getTestContainerID;
+import static 
org.apache.hadoop.ozone.container.ContainerTestHelper.getWriteChunkRequest;
 import org.apache.ratis.rpc.RpcType;
+import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
 import org.apache.ratis.util.function.CheckedBiConsumer;
 import org.junit.After;
 import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.function.Consumer;
-
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
-import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.SUCCESS;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
-import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
-import static org.apache.hadoop.ozone.container.ContainerTestHelper.*;
-import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
-import static org.junit.Assert.*;
-
 /**
  * Test Container servers when security is enabled.
  */
@@ -138,8 +142,7 @@ public class TestSecureContainerServer {
                     .getPort(DatanodeDetails.Port.Name.STANDALONE).getValue()),
         XceiverClientGrpc::new,
         (dn, conf) -> new XceiverServerGrpc(dd, conf,
-            hddsDispatcher, caClient,
-            createReplicationService(controller)), (dn, p) -> {}, (p) -> {});
+            hddsDispatcher, caClient), (dn, p) -> {}, (p) -> {});
   }
 
   private static HddsDispatcher createDispatcher(DatanodeDetails dd, UUID 
scmId,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to