This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new aed64de760 HDDS-7777. Implement container replication in push model 
(#4197)
aed64de760 is described below

commit aed64de760b9860650d8bfb435c15e07f1d802af
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue Jan 24 12:32:11 2023 +0100

    HDDS-7777. Implement container replication in push model (#4197)
---
 .../common/statemachine/DatanodeStateMachine.java  |  27 +++--
 .../ReplicateContainerCommandHandler.java          |   5 +-
 .../container/keyvalue/KeyValueContainer.java      |   4 +-
 .../ozone/container/ozoneimpl/OzoneContainer.java  |   6 +-
 ...mportReplicator.java => ContainerImporter.java} | 134 ++++++---------------
 .../container/replication/ContainerUploader.java   |  32 +++++
 .../replication/CopyContainerResponseStream.java   |  47 ++++++++
 .../replication/DownloadAndImportReplicator.java   | 110 ++---------------
 .../replication/GrpcContainerUploader.java         | 100 +++++++++++++++
 .../container/replication/GrpcOutputStream.java    |  41 ++++---
 .../replication/GrpcReplicationClient.java         |  16 +--
 .../replication/GrpcReplicationService.java        |  20 ++-
 .../container/replication/PushReplicator.java      |  70 +++++++++++
 .../container/replication/ReplicationServer.java   |   7 +-
 .../replication/ReplicationSupervisor.java         |  59 ++++-----
 .../container/replication/ReplicationTask.java     |  49 +++-----
 .../replication/SendContainerOutputStream.java     |  43 +++++++
 .../replication/SendContainerRequestHandler.java   | 131 ++++++++++++++++++++
 .../replication/SimpleContainerDownloader.java     |   9 +-
 .../commands/ReplicateContainerCommand.java        |  57 +++++++--
 .../common/statemachine/TestStateContext.java      |   9 +-
 ...OutputStream.java => GrpcOutputStreamTest.java} |  46 ++++---
 .../ReplicationSupervisorScheduling.java           |   6 +-
 .../TestCopyContainerResponseStream.java           |  50 ++++++++
 .../replication/TestMeasuredReplicator.java        |  21 ++--
 .../replication/TestReplicationSupervisor.java     |  24 ++--
 .../upgrade/TestDatanodeUpgradeToScmHA.java        |   8 +-
 .../src/main/proto/DatanodeClientProtocol.proto    |  11 ++
 .../proto/ScmServerDatanodeHeartbeatProtocol.proto |   1 +
 .../replication/ECMisReplicationHandler.java       |  16 +--
 .../replication/ECUnderReplicationHandler.java     |  40 +++---
 .../replication/LegacyReplicationManager.java      |  21 ++--
 .../replication/MisReplicationHandler.java         |  20 ++-
 .../replication/RatisMisReplicationHandler.java    |  12 +-
 .../replication/RatisUnderReplicationHandler.java  |  32 ++++-
 .../container/replication/ReplicationManager.java  |  23 +++-
 .../replication/TestECMisReplicationHandler.java   |   3 +-
 .../replication/TestECUnderReplicationHandler.java |   4 +
 .../TestRatisMisReplicationHandler.java            |   3 +-
 .../TestRatisUnderReplicationHandler.java          |  11 +-
 .../replication/TestReplicationManager.java        |   4 +-
 .../replication/TestUnderReplicatedProcessor.java  |   2 +-
 .../src/main/compose/ozonesecure/docker-config     |   1 +
 .../hdds/scm/TestSCMDatanodeProtocolServer.java    |   4 +-
 .../ozone/freon/ClosedContainerReplicator.java     |  15 +--
 45 files changed, 913 insertions(+), 441 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 4b8f0be1e3..6da466d8c0 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -60,9 +60,13 @@ import 
org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionMetri
 import 
org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionSupervisor;
 import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.replication.ContainerImporter;
 import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
 import 
org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator;
+import org.apache.hadoop.ozone.container.replication.GrpcContainerUploader;
 import org.apache.hadoop.ozone.container.replication.MeasuredReplicator;
+import 
org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
+import org.apache.hadoop.ozone.container.replication.PushReplicator;
 import 
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
 import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
 import 
org.apache.hadoop.ozone.container.replication.ReplicationSupervisorMetrics;
@@ -171,19 +175,26 @@ public class DatanodeStateMachine implements Closeable {
     dnCertClient = certClient;
     nextHB = new AtomicLong(Time.monotonicNow());
 
-    ContainerReplicator replicator =
-        new DownloadAndImportReplicator(conf, container.getContainerSet(),
-            container.getController(),
-            new SimpleContainerDownloader(conf, dnCertClient),
-            new TarContainerPacker(), container.getVolumeSet());
-
-    replicatorMetrics = new MeasuredReplicator(replicator);
+    ContainerImporter importer = new ContainerImporter(conf,
+        container.getContainerSet(),
+        container.getController(),
+        new TarContainerPacker(), container.getVolumeSet());
+    ContainerReplicator pullReplicator = new DownloadAndImportReplicator(
+        importer,
+        new SimpleContainerDownloader(conf, dnCertClient));
+    ContainerReplicator pushReplicator = new PushReplicator(
+        // TODO compression, metrics
+        new OnDemandContainerReplicationSource(container.getController()),
+        new GrpcContainerUploader(conf, dnCertClient)
+    );
+
+    replicatorMetrics = new MeasuredReplicator(pullReplicator);
 
     ReplicationConfig replicationConfig =
         conf.getObject(ReplicationConfig.class);
     supervisor =
         new ReplicationSupervisor(container.getContainerSet(), context,
-            replicatorMetrics, replicationConfig, clock);
+            replicatorMetrics, pushReplicator, replicationConfig, clock);
 
     replicationSupervisorMetrics =
         ReplicationSupervisorMetrics.create(supervisor);
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
index df589e287d..6d281adc9c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
@@ -66,10 +66,11 @@ public class ReplicateContainerCommandHandler implements 
CommandHandler {
     final List<DatanodeDetails> sourceDatanodes =
         replicateCommand.getSourceDatanodes();
     final long containerID = replicateCommand.getContainerID();
+    final DatanodeDetails target = replicateCommand.getTargetDatanode();
 
-    Preconditions.checkArgument(sourceDatanodes.size() > 0,
+    Preconditions.checkArgument(!sourceDatanodes.isEmpty() || target != null,
         "Replication command is received for container %s "
-            + "without source datanodes.", containerID);
+            + "without source or target datanodes.", containerID);
 
     ReplicationTask task = new ReplicationTask(replicateCommand);
     supervisor.addTask(task);
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index e2bafc080b..e1ee881142 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -58,7 +58,7 @@ import 
org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
 import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
-import 
org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator;
+import org.apache.hadoop.ozone.container.replication.ContainerImporter;
 import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 
@@ -518,7 +518,7 @@ public class KeyValueContainer implements 
Container<KeyValueContainerData> {
     Path destContainerDir =
         Paths.get(KeyValueContainerLocationUtil.getBaseContainerLocation(
             hddsVolume.getHddsRootDir().toString(), idDir, containerId));
-    Path tmpDir = DownloadAndImportReplicator.getUntarDirectory(hddsVolume);
+    Path tmpDir = ContainerImporter.getUntarDirectory(hddsVolume);
     writeLock();
     try {
       //copy the values from the input stream to the final destination
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 5d2153465e..3c63aaa799 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -53,8 +53,10 @@ import 
org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import 
org.apache.hadoop.ozone.container.common.volume.StorageVolume.VolumeType;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolumeChecker;
+import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 import 
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
 import 
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.StaleRecoveringContainerScrubbingService;
+import org.apache.hadoop.ozone.container.replication.ContainerImporter;
 import org.apache.hadoop.ozone.container.replication.ReplicationServer;
 import 
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
 import 
org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures.SchemaV3;
@@ -204,7 +206,9 @@ public class OzoneContainer {
         controller,
         conf.getObject(ReplicationConfig.class),
         secConf,
-        certClient);
+        certClient,
+        new ContainerImporter(conf, containerSet, controller,
+            new TarContainerPacker(), volumeSet));
 
     readChannel = new XceiverServerGrpc(
         datanodeDetails, config, hddsDispatcher, certClient);
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
similarity index 50%
copy from 
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
copy to 
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
index c1d99c9148..59316a8c8f 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
@@ -1,32 +1,24 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
  */
 package org.apache.hadoop.ozone.container.replication;
 
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.List;
-
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.StorageUnit;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
@@ -39,58 +31,51 @@ import 
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingP
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
-import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
 
 /**
- * Default replication implementation.
- * <p>
- * This class does the real job. Executes the download and import the container
- * to the container set.
+ * Imports container from tarball.
  */
-public class DownloadAndImportReplicator implements ContainerReplicator {
+public class ContainerImporter {
 
-  public static final Logger LOG =
-      LoggerFactory.getLogger(DownloadAndImportReplicator.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerImporter.class);
 
   public static final String CONTAINER_COPY_DIR = "container-copy";
-  public static final String CONTAINER_COPY_TMP_DIR = "tmp";
-
+  private static final String CONTAINER_COPY_TMP_DIR = "tmp";
   private final ContainerSet containerSet;
   private final ContainerController controller;
-  private final ContainerDownloader downloader;
   private final TarContainerPacker packer;
   private final MutableVolumeSet volumeSet;
   private final VolumeChoosingPolicy volumeChoosingPolicy;
   private final long containerSize;
 
-  public DownloadAndImportReplicator(
-      ConfigurationSource conf,
-      ContainerSet containerSet,
-      ContainerController controller,
-      ContainerDownloader downloader,
-      TarContainerPacker packer,
+  public ContainerImporter(ConfigurationSource conf, ContainerSet containerSet,
+      ContainerController controller, TarContainerPacker tarContainerPacker,
       MutableVolumeSet volumeSet) {
     this.containerSet = containerSet;
     this.controller = controller;
-    this.downloader = downloader;
-    this.packer = packer;
+    this.packer = tarContainerPacker;
     this.volumeSet = volumeSet;
     try {
-      this.volumeChoosingPolicy = conf.getClass(
+      volumeChoosingPolicy = conf.getClass(
           HDDS_DATANODE_VOLUME_CHOOSING_POLICY, RoundRobinVolumeChoosingPolicy
               .class, VolumeChoosingPolicy.class).newInstance();
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
-    this.containerSize = (long) conf.getStorageSize(
+    containerSize = (long) conf.getStorageSize(
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
-
   }
 
   public void importContainer(long containerID, Path tarFilePath,
@@ -100,72 +85,33 @@ public class DownloadAndImportReplicator implements 
ContainerReplicator {
     if (targetVolume == null) {
       targetVolume = chooseNextVolume();
     }
-    KeyValueContainerData originalContainerData;
     try {
-      try (FileInputStream tmpContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
+      KeyValueContainerData containerData;
+
+      try (FileInputStream input = new FileInputStream(tarFilePath.toFile())) {
         byte[] containerDescriptorYaml =
-            packer.unpackContainerDescriptor(tmpContainerTarStream);
-        originalContainerData = (KeyValueContainerData) ContainerDataYaml
+            packer.unpackContainerDescriptor(input);
+        containerData = (KeyValueContainerData) ContainerDataYaml
             .readContainer(containerDescriptorYaml);
       }
-      originalContainerData.setVolume(targetVolume);
-
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
+      containerData.setVolume(targetVolume);
 
+      try (FileInputStream input = new FileInputStream(tarFilePath.toFile())) {
         Container container = controller.importContainer(
-            originalContainerData, tempContainerTarStream, packer);
-
+            containerData, input, packer);
         containerSet.addContainer(container);
       }
-
     } finally {
       try {
         Files.delete(tarFilePath);
       } catch (Exception ex) {
-        LOG.error("Got exception while deleting downloaded container file: "
-            + tarFilePath.toAbsolutePath().toString(), ex);
-      }
-    }
-  }
-
-  @Override
-  public void replicate(ReplicationTask task) {
-    long containerID = task.getContainerId();
-
-    List<DatanodeDetails> sourceDatanodes = task.getSources();
-
-    LOG.info("Starting replication of container {} from {}", containerID,
-        sourceDatanodes);
-
-    try {
-      HddsVolume targetVolume = chooseNextVolume();
-      // Wait for the download. This thread pool is limiting the parallel
-      // downloads, so it's ok to block here and wait for the full download.
-      Path tarFilePath =
-          downloader.getContainerDataFromReplicas(containerID, sourceDatanodes,
-              getUntarDirectory(targetVolume));
-      if (tarFilePath == null) {
-        task.setStatus(Status.FAILED);
-        return;
+        LOG.error("Got exception while deleting temporary container file: "
+            + tarFilePath.toAbsolutePath(), ex);
       }
-      long bytes = Files.size(tarFilePath);
-      LOG.info("Container {} is downloaded with size {}, starting to import.",
-              containerID, bytes);
-      task.setTransferredBytes(bytes);
-
-      importContainer(containerID, tarFilePath, targetVolume);
-
-      LOG.info("Container {} is replicated successfully", containerID);
-      task.setStatus(Status.DONE);
-    } catch (IOException e) {
-      LOG.error("Container {} replication was unsuccessful.", containerID, e);
-      task.setStatus(Status.FAILED);
     }
   }
 
-  private HddsVolume chooseNextVolume() throws IOException {
+  HddsVolume chooseNextVolume() throws IOException {
     // Choose volume that can hold both container in tmp and dest directory
     return volumeChoosingPolicy.chooseVolume(
         StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
@@ -177,8 +123,4 @@ public class DownloadAndImportReplicator implements 
ContainerReplicator {
     return Paths.get(hddsVolume.getVolumeRootDir())
         .resolve(CONTAINER_COPY_TMP_DIR).resolve(CONTAINER_COPY_DIR);
   }
-
-  private List<HddsVolume> getHddsVolumesList() {
-    return StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList());
-  }
-}
+}
\ No newline at end of file
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerUploader.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerUploader.java
new file mode 100644
index 0000000000..cb9264f2b5
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerUploader.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Client-side interface for sending a container to a target datanode.
+ */
+public interface ContainerUploader {
+  OutputStream startUpload(long containerId, DatanodeDetails target,
+      CompletableFuture<Void> callback) throws IOException;
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerResponseStream.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerResponseStream.java
new file mode 100644
index 0000000000..3cb6dfc7f5
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerResponseStream.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+
+/**
+ * Output stream adapter for CopyContainerResponse.
+ */
+class CopyContainerResponseStream
+    extends GrpcOutputStream<CopyContainerResponseProto> {
+
+  CopyContainerResponseStream(
+      StreamObserver<CopyContainerResponseProto> streamObserver,
+      long containerId, int bufferSize) {
+    super(streamObserver, containerId, bufferSize);
+  }
+
+  protected void sendPart(boolean eof, int length, ByteString data) {
+    CopyContainerResponseProto response =
+        CopyContainerResponseProto.newBuilder()
+            .setContainerID(getContainerId())
+            .setData(data)
+            .setEof(eof)
+            .setReadOffset(getWrittenBytes())
+            .setLen(length)
+            .build();
+    getStreamObserver().onNext(response);
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
index c1d99c9148..59f165f93b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
@@ -17,35 +17,18 @@
  */
 package org.apache.hadoop.ozone.container.replication;
 
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.List;
 
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
-import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
-import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
-import 
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
-import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
-
 /**
  * Default replication implementation.
  * <p>
@@ -57,77 +40,14 @@ public class DownloadAndImportReplicator implements 
ContainerReplicator {
   public static final Logger LOG =
       LoggerFactory.getLogger(DownloadAndImportReplicator.class);
 
-  public static final String CONTAINER_COPY_DIR = "container-copy";
-  public static final String CONTAINER_COPY_TMP_DIR = "tmp";
-
-  private final ContainerSet containerSet;
-  private final ContainerController controller;
   private final ContainerDownloader downloader;
-  private final TarContainerPacker packer;
-  private final MutableVolumeSet volumeSet;
-  private final VolumeChoosingPolicy volumeChoosingPolicy;
-  private final long containerSize;
+  private final ContainerImporter containerImporter;
 
   public DownloadAndImportReplicator(
-      ConfigurationSource conf,
-      ContainerSet containerSet,
-      ContainerController controller,
-      ContainerDownloader downloader,
-      TarContainerPacker packer,
-      MutableVolumeSet volumeSet) {
-    this.containerSet = containerSet;
-    this.controller = controller;
+      ContainerImporter containerImporter,
+      ContainerDownloader downloader) {
     this.downloader = downloader;
-    this.packer = packer;
-    this.volumeSet = volumeSet;
-    try {
-      this.volumeChoosingPolicy = conf.getClass(
-          HDDS_DATANODE_VOLUME_CHOOSING_POLICY, RoundRobinVolumeChoosingPolicy
-              .class, VolumeChoosingPolicy.class).newInstance();
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    this.containerSize = (long) conf.getStorageSize(
-        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
-        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
-
-  }
-
-  public void importContainer(long containerID, Path tarFilePath,
-      HddsVolume hddsVolume) throws IOException {
-
-    HddsVolume targetVolume = hddsVolume;
-    if (targetVolume == null) {
-      targetVolume = chooseNextVolume();
-    }
-    KeyValueContainerData originalContainerData;
-    try {
-      try (FileInputStream tmpContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-        byte[] containerDescriptorYaml =
-            packer.unpackContainerDescriptor(tmpContainerTarStream);
-        originalContainerData = (KeyValueContainerData) ContainerDataYaml
-            .readContainer(containerDescriptorYaml);
-      }
-      originalContainerData.setVolume(targetVolume);
-
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-
-        Container container = controller.importContainer(
-            originalContainerData, tempContainerTarStream, packer);
-
-        containerSet.addContainer(container);
-      }
-
-    } finally {
-      try {
-        Files.delete(tarFilePath);
-      } catch (Exception ex) {
-        LOG.error("Got exception while deleting downloaded container file: "
-            + tarFilePath.toAbsolutePath().toString(), ex);
-      }
-    }
+    this.containerImporter = containerImporter;
   }
 
   @Override
@@ -140,12 +60,12 @@ public class DownloadAndImportReplicator implements 
ContainerReplicator {
         sourceDatanodes);
 
     try {
-      HddsVolume targetVolume = chooseNextVolume();
+      HddsVolume targetVolume = containerImporter.chooseNextVolume();
       // Wait for the download. This thread pool is limiting the parallel
       // downloads, so it's ok to block here and wait for the full download.
       Path tarFilePath =
           downloader.getContainerDataFromReplicas(containerID, sourceDatanodes,
-              getUntarDirectory(targetVolume));
+              ContainerImporter.getUntarDirectory(targetVolume));
       if (tarFilePath == null) {
         task.setStatus(Status.FAILED);
         return;
@@ -155,7 +75,7 @@ public class DownloadAndImportReplicator implements 
ContainerReplicator {
               containerID, bytes);
       task.setTransferredBytes(bytes);
 
-      importContainer(containerID, tarFilePath, targetVolume);
+      containerImporter.importContainer(containerID, tarFilePath, 
targetVolume);
 
       LOG.info("Container {} is replicated successfully", containerID);
       task.setStatus(Status.DONE);
@@ -165,20 +85,4 @@ public class DownloadAndImportReplicator implements 
ContainerReplicator {
     }
   }
 
-  private HddsVolume chooseNextVolume() throws IOException {
-    // Choose volume that can hold both container in tmp and dest directory
-    return volumeChoosingPolicy.chooseVolume(
-        StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
-        containerSize * 2);
-  }
-
-  public static Path getUntarDirectory(HddsVolume hddsVolume)
-      throws IOException {
-    return Paths.get(hddsVolume.getVolumeRootDir())
-        .resolve(CONTAINER_COPY_TMP_DIR).resolve(CONTAINER_COPY_DIR);
-  }
-
-  private List<HddsVolume> getHddsVolumesList() {
-    return StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList());
-  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
new file mode 100644
index 0000000000..aa7322428e
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerResponse;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Uploads container to target datanode via gRPC.
+ */
+public class GrpcContainerUploader implements ContainerUploader {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(GrpcContainerUploader.class);
+
+  private final SecurityConfig securityConfig;
+  private final CertificateClient certClient;
+  private final CopyContainerCompression compression;
+
+  public GrpcContainerUploader(
+      ConfigurationSource conf, CertificateClient certClient) {
+    this.certClient = certClient;
+    securityConfig = new SecurityConfig(conf);
+    compression = CopyContainerCompression.getConf(conf);
+  }
+
+  @Override
+  public OutputStream startUpload(long containerId, DatanodeDetails target,
+      CompletableFuture<Void> callback) throws IOException {
+    GrpcReplicationClient client =
+        new GrpcReplicationClient(target.getIpAddress(),
+            target.getPort(Port.Name.REPLICATION).getValue(),
+            securityConfig, certClient, compression.toString());
+    StreamObserver<SendContainerRequest> requestStream = client.upload(
+        new SendContainerResponseStreamObserver(containerId, target, 
callback));
+    return new SendContainerOutputStream(requestStream, containerId,
+        GrpcReplicationService.BUFFER_SIZE);
+  }
+
+  /**
+   *
+   */
+  private static class SendContainerResponseStreamObserver
+      implements StreamObserver<SendContainerResponse> {
+    private final long containerId;
+    private final DatanodeDetails target;
+    private final CompletableFuture<Void> callback;
+
+    SendContainerResponseStreamObserver(long containerId,
+        DatanodeDetails target, CompletableFuture<Void> callback) {
+      this.containerId = containerId;
+      this.target = target;
+      this.callback = callback;
+    }
+
+    @Override
+    public void onNext(SendContainerResponse sendContainerResponse) {
+      LOG.info("Response for upload container {} to {}", containerId, target);
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      LOG.warn("Failed to upload container {} to {}", containerId, target, t);
+      callback.completeExceptionally(t);
+    }
+
+    @Override
+    public void onCompleted() {
+      LOG.info("Finished uploading container {} to {}", containerId, target);
+      callback.complete(null);
+    }
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
index c09c8f6743..280e8cef75 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.ozone.container.replication;
 
-import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.slf4j.Logger;
@@ -31,12 +30,12 @@ import java.io.OutputStream;
  * Adapter from {@code OutputStream} to gRPC {@code StreamObserver}.
  * Data is buffered in a limited buffer of the specified size.
  */
-class GrpcOutputStream extends OutputStream {
+abstract class GrpcOutputStream<T> extends OutputStream {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(GrpcOutputStream.class);
 
-  private final StreamObserver<CopyContainerResponseProto> responseObserver;
+  private final StreamObserver<T> streamObserver;
 
   private final ByteString.Output buffer;
 
@@ -46,10 +45,9 @@ class GrpcOutputStream extends OutputStream {
 
   private long writtenBytes;
 
-  GrpcOutputStream(
-      StreamObserver<CopyContainerResponseProto> responseObserver,
+  GrpcOutputStream(StreamObserver<T> streamObserver,
       long containerId, int bufferSize) {
-    this.responseObserver = responseObserver;
+    this.streamObserver = streamObserver;
     this.containerId = containerId;
     this.bufferSize = bufferSize;
     buffer = ByteString.newOutput(bufferSize);
@@ -63,7 +61,7 @@ class GrpcOutputStream extends OutputStream {
         flushBuffer(false);
       }
     } catch (Exception ex) {
-      responseObserver.onError(ex);
+      streamObserver.onError(ex);
     }
   }
 
@@ -94,7 +92,7 @@ class GrpcOutputStream extends OutputStream {
         len = Math.min(bufferSize, remaining);
       }
     } catch (Exception ex) {
-      responseObserver.onError(ex);
+      streamObserver.onError(ex);
     }
   }
 
@@ -103,27 +101,34 @@ class GrpcOutputStream extends OutputStream {
     flushBuffer(true);
     LOG.info("Sent {} bytes for container {}",
         writtenBytes, containerId);
-    responseObserver.onCompleted();
+    streamObserver.onCompleted();
     buffer.close();
   }
 
+  protected long getContainerId() {
+    return containerId;
+  }
+
+  protected long getWrittenBytes() {
+    return writtenBytes;
+  }
+
+  protected StreamObserver<T> getStreamObserver() {
+    return streamObserver;
+  }
+
   private void flushBuffer(boolean eof) {
     int length = buffer.size();
     if (length > 0) {
       ByteString data = buffer.toByteString();
       LOG.debug("Sending {} bytes (of type {}) for container {}",
           length, data.getClass().getSimpleName(), containerId);
-      CopyContainerResponseProto response =
-          CopyContainerResponseProto.newBuilder()
-              .setContainerID(containerId)
-              .setData(data)
-              .setEof(eof)
-              .setReadOffset(writtenBytes)
-              .setLen(length)
-              .build();
-      responseObserver.onNext(response);
+      sendPart(eof, length, data);
       writtenBytes += length;
       buffer.reset();
     }
   }
+
+  protected abstract void sendPart(boolean eof, int length, ByteString data);
+
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
index fa4140040c..be9a0ece48 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
@@ -30,6 +30,8 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerResponse;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceStub;
 import org.apache.hadoop.hdds.security.ssl.KeyStoresFactory;
@@ -60,12 +62,10 @@ public class GrpcReplicationClient implements AutoCloseable 
{
 
   private final IntraDatanodeProtocolServiceStub client;
 
-  private final Path workingDirectory;
-
   private final ContainerProtos.CopyContainerCompressProto compression;
 
   public GrpcReplicationClient(
-      String host, int port, Path workingDir,
+      String host, int port,
       SecurityConfig secConfig, CertificateClient certClient,
       String compression)
       throws IOException {
@@ -92,12 +92,11 @@ public class GrpcReplicationClient implements AutoCloseable 
{
     }
     channel = channelBuilder.build();
     client = IntraDatanodeProtocolServiceGrpc.newStub(channel);
-    workingDirectory = workingDir;
     this.compression =
         ContainerProtos.CopyContainerCompressProto.valueOf(compression);
   }
 
-  public CompletableFuture<Path> download(long containerId) {
+  public CompletableFuture<Path> download(long containerId, Path dir) {
     CopyContainerRequestProto request =
         CopyContainerRequestProto.newBuilder()
             .setContainerID(containerId)
@@ -108,7 +107,7 @@ public class GrpcReplicationClient implements AutoCloseable 
{
 
     CompletableFuture<Path> response = new CompletableFuture<>();
 
-    Path destinationPath = getWorkingDirectory()
+    Path destinationPath = dir
         .resolve(ContainerUtils.getContainerTarName(containerId));
 
     client.download(request,
@@ -117,8 +116,9 @@ public class GrpcReplicationClient implements AutoCloseable 
{
     return response;
   }
 
-  private Path getWorkingDirectory() {
-    return workingDirectory;
+  public StreamObserver<SendContainerRequest> upload(
+      StreamObserver<SendContainerResponse> responseObserver) {
+    return client.upload(responseObserver);
   }
 
   public void shutdown() {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
index 7246ae99f0..533a3b40fe 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
@@ -19,9 +19,12 @@
 package org.apache.hadoop.ozone.container.replication;
 
 import java.io.IOException;
+import java.io.OutputStream;
 
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerResponse;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc;
 
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
@@ -37,12 +40,15 @@ public class GrpcReplicationService extends
   private static final Logger LOG =
       LoggerFactory.getLogger(GrpcReplicationService.class);
 
-  private static final int BUFFER_SIZE = 1024 * 1024;
+  static final int BUFFER_SIZE = 1024 * 1024;
 
   private final ContainerReplicationSource source;
+  private final ContainerImporter importer;
 
-  public GrpcReplicationService(ContainerReplicationSource source) {
+  public GrpcReplicationService(ContainerReplicationSource source,
+      ContainerImporter importer) {
     this.source = source;
+    this.importer = importer;
   }
 
   @Override
@@ -55,8 +61,8 @@ public class GrpcReplicationService extends
     LOG.info("Streaming container data ({}) to other datanode " +
         "with compression {}", containerID, compression);
     try {
-      GrpcOutputStream outputStream =
-          new GrpcOutputStream(responseObserver, containerID, BUFFER_SIZE);
+      OutputStream outputStream = new CopyContainerResponseStream(
+          responseObserver, containerID, BUFFER_SIZE);
       source.copyData(containerID, outputStream, compression);
     } catch (IOException e) {
       LOG.error("Error streaming container {}", containerID, e);
@@ -64,4 +70,10 @@ public class GrpcReplicationService extends
     }
   }
 
+  @Override
+  public StreamObserver<SendContainerRequest> upload(
+      StreamObserver<SendContainerResponse> responseObserver) {
+
+    return new SendContainerRequestHandler(importer, responseObserver);
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
new file mode 100644
index 0000000000..aa1a2966f7
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.OutputStream;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
+
+/**
+ * Pushes the container to the target datanode.
+ */
+public class PushReplicator implements ContainerReplicator {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PushReplicator.class);
+
+  private final ContainerReplicationSource source;
+  private final ContainerUploader uploader;
+
+  public PushReplicator(ContainerReplicationSource source,
+      ContainerUploader uploader) {
+    this.source = source;
+    this.uploader = uploader;
+  }
+
+  @Override
+  public void replicate(ReplicationTask task) {
+    long containerID = task.getContainerId();
+    DatanodeDetails target = task.getTarget();
+    CompletableFuture<Void> fut = new CompletableFuture<>();
+
+    source.prepare(containerID);
+
+    OutputStream output = null;
+    try {
+      output = uploader.startUpload(containerID, target, fut);
+      source.copyData(containerID, output, NO_COMPRESSION.name());
+      fut.get();
+      task.setStatus(Status.DONE);
+    } catch (Exception e) {
+      LOG.warn("Container {} replication was unsuccessful.", containerID, e);
+      task.setStatus(Status.FAILED);
+    } finally {
+      // output may have already been closed, ignore such errors
+      IOUtils.cleanupWithLogger(LOG, output);
+    }
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
index fcad690d4f..cdc1975360 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
@@ -59,13 +59,15 @@ public class ReplicationServer {
   private ContainerController controller;
 
   private int port;
+  private final ContainerImporter importer;
 
   public ReplicationServer(ContainerController controller,
       ReplicationConfig replicationConfig, SecurityConfig secConf,
-      CertificateClient caClient) {
+      CertificateClient caClient, ContainerImporter importer) {
     this.secConf = secConf;
     this.caClient = caClient;
     this.controller = controller;
+    this.importer = importer;
     this.port = replicationConfig.getPort();
     init();
   }
@@ -74,7 +76,8 @@ public class ReplicationServer {
     NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
         .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
         .addService(ServerInterceptors.intercept(new GrpcReplicationService(
-            new OnDemandContainerReplicationSource(controller)
+            new OnDemandContainerReplicationSource(controller),
+            importer
         ), new GrpcServerInterceptor()));
 
     if (secConf.isSecurityEnabled() && secConf.isGrpcTlsEnabled()) {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
index 3f61273885..482ca1ceb4 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -19,8 +19,8 @@ package org.apache.hadoop.ozone.container.replication;
 
 import java.time.Clock;
 import java.util.OptionalLong;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentHashMap.KeySetView;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -48,7 +48,8 @@ public class ReplicationSupervisor {
       LoggerFactory.getLogger(ReplicationSupervisor.class);
 
   private final ContainerSet containerSet;
-  private final ContainerReplicator replicator;
+  private final ContainerReplicator pullReplicator;
+  private final ContainerReplicator pushReplicator;
   private final ExecutorService executor;
   private final StateContext context;
   private final Clock clock;
@@ -63,16 +64,18 @@ public class ReplicationSupervisor {
    * or queued for download. Tracked so we don't schedule > 1
    * concurrent download for the same container.
    */
-  private final KeySetView<Object, Boolean> containersInFlight;
+  private final Set<ReplicationTask> inFlight;
 
   @VisibleForTesting
   ReplicationSupervisor(
       ContainerSet containerSet, StateContext context,
-      ContainerReplicator replicator, ExecutorService executor,
+      ContainerReplicator pullReplicator, ContainerReplicator pushReplicator,
+      ExecutorService executor,
       Clock clock) {
     this.containerSet = containerSet;
-    this.replicator = replicator;
-    this.containersInFlight = ConcurrentHashMap.newKeySet();
+    this.pullReplicator = pullReplicator;
+    this.pushReplicator = pushReplicator;
+    this.inFlight = ConcurrentHashMap.newKeySet();
     this.executor = executor;
     this.context = context;
     this.clock = clock;
@@ -80,22 +83,24 @@ public class ReplicationSupervisor {
 
   public ReplicationSupervisor(
       ContainerSet containerSet, StateContext context,
-      ContainerReplicator replicator, ReplicationConfig replicationConfig,
-      Clock clock) {
-    this(containerSet, context, replicator, new ThreadPoolExecutor(
-        replicationConfig.getReplicationMaxStreams(),
-        replicationConfig.getReplicationMaxStreams(), 60, TimeUnit.SECONDS,
-        new LinkedBlockingQueue<>(),
-        new ThreadFactoryBuilder().setDaemon(true)
-            .setNameFormat("ContainerReplicationThread-%d")
-            .build()), clock);
+      ContainerReplicator pullReplicator, ContainerReplicator pushReplicator,
+      ReplicationConfig replicationConfig, Clock clock) {
+    this(containerSet, context, pullReplicator, pushReplicator,
+        new ThreadPoolExecutor(
+            replicationConfig.getReplicationMaxStreams(),
+            replicationConfig.getReplicationMaxStreams(), 60, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(),
+            new ThreadFactoryBuilder().setDaemon(true)
+                .setNameFormat("ContainerReplicationThread-%d")
+                .build()),
+        clock);
   }
 
   /**
    * Queue an asynchronous download of the given container.
    */
   public void addTask(ReplicationTask task) {
-    if (containersInFlight.add(task.getContainerId())) {
+    if (inFlight.add(task)) {
       executor.execute(new TaskRunner(task));
     }
   }
@@ -125,7 +130,7 @@ public class ReplicationSupervisor {
    * @return Count of in-flight replications.
    */
   public int getInFlightReplications() {
-    return containersInFlight.size();
+    return inFlight.size();
   }
 
   /**
@@ -170,37 +175,35 @@ public class ReplicationSupervisor {
           }
         }
 
-        if (containerSet.getContainer(task.getContainerId()) != null) {
+        final boolean pull = task.getTarget() == null;
+        if (containerSet.getContainer(task.getContainerId()) != null && pull) {
           LOG.debug("Container {} has already been downloaded.", containerId);
           return;
         }
 
-        task.setStatus(Status.DOWNLOADING);
+        task.setStatus(Status.IN_PROGRESS);
+        ContainerReplicator replicator = pull ? pullReplicator : 
pushReplicator;
         replicator.replicate(task);
 
         if (task.getStatus() == Status.FAILED) {
-          LOG.error(
-              "Container {} can't be downloaded from any of the datanodes.",
-              containerId);
+          LOG.error("Failed {}", this);
           failureCounter.incrementAndGet();
         } else if (task.getStatus() == Status.DONE) {
-          LOG.info("Container {} is replicated.", containerId);
+          LOG.info("Successful {}", this);
           successCounter.incrementAndGet();
         }
       } catch (Exception e) {
         task.setStatus(Status.FAILED);
-        LOG.error("Encountered error while replicating container {}.",
-            containerId, e);
+        LOG.error("Failed {}", this, e);
         failureCounter.incrementAndGet();
       } finally {
-        containersInFlight.remove(containerId);
+        inFlight.remove(task);
       }
     }
 
     @Override
     public String toString() {
-      return "replicate container command for container "
-          + task.getContainerId();
+      return task.getCommand().toString();
     }
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
index 7c57a73b33..1e90ff0339 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
@@ -31,15 +31,9 @@ public class ReplicationTask {
 
   private volatile Status status = Status.QUEUED;
 
-  private final long containerId;
-
-  private final List<DatanodeDetails> sources;
-
   private final Instant queued = Instant.now();
 
-  private final long deadlineMsSinceEpoch;
-
-  private final long term;
+  private final ReplicateContainerCommand cmd;
 
   /**
    * Counter for the transferred bytes.
@@ -47,20 +41,7 @@ public class ReplicationTask {
   private long transferredBytes;
 
   public ReplicationTask(ReplicateContainerCommand cmd) {
-    this.containerId = cmd.getContainerID();
-    this.sources = cmd.getSourceDatanodes();
-    this.deadlineMsSinceEpoch = cmd.getDeadline();
-    this.term = cmd.getTerm();
-  }
-
-  /**
-   * Intended to only be used in tests.
-   */
-  protected ReplicationTask(
-      long containerId,
-      List<DatanodeDetails> sources
-  ) {
-    this(new ReplicateContainerCommand(containerId, sources));
+    this.cmd = cmd;
   }
 
   /**
@@ -68,7 +49,7 @@ public class ReplicationTask {
    * A returned value of zero indicates no deadline.
    */
   public long getDeadline() {
-    return deadlineMsSinceEpoch;
+    return cmd.getDeadline();
   }
 
   @Override
@@ -80,20 +61,21 @@ public class ReplicationTask {
       return false;
     }
     ReplicationTask that = (ReplicationTask) o;
-    return containerId == that.containerId;
+    return getContainerId() == that.getContainerId() &&
+        Objects.equals(getTarget(), that.getTarget());
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(containerId);
+    return Objects.hash(getContainerId(), getTarget());
   }
 
   public long getContainerId() {
-    return containerId;
+    return cmd.getContainerID();
   }
 
   public List<DatanodeDetails> getSources() {
-    return sources;
+    return cmd.getSourceDatanodes();
   }
 
   public Status getStatus() {
@@ -108,8 +90,7 @@ public class ReplicationTask {
   public String toString() {
     return "ReplicationTask{" +
         "status=" + status +
-        ", containerId=" + containerId +
-        ", sources=" + sources +
+        ", cmd={" + cmd + "}" +
         ", queued=" + queued +
         '}';
   }
@@ -127,7 +108,15 @@ public class ReplicationTask {
   }
 
   long getTerm() {
-    return term;
+    return cmd.getTerm();
+  }
+
+  DatanodeDetails getTarget() {
+    return cmd.getTargetDatanode();
+  }
+
+  ReplicateContainerCommand getCommand() {
+    return cmd;
   }
 
   /**
@@ -135,7 +124,7 @@ public class ReplicationTask {
    */
   public enum Status {
     QUEUED,
-    DOWNLOADING,
+    IN_PROGRESS,
     FAILED,
     DONE
   }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java
new file mode 100644
index 0000000000..622bd4f4a3
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+
+/**
+ * Output stream adapter for SendContainerResponse.
+ */
+class SendContainerOutputStream extends GrpcOutputStream<SendContainerRequest> 
{
+  SendContainerOutputStream(
+      StreamObserver<SendContainerRequest> streamObserver,
+      long containerId, int bufferSize) {
+    super(streamObserver, containerId, bufferSize);
+  }
+
+  @Override
+  protected void sendPart(boolean eof, int length, ByteString data) {
+    SendContainerRequest request = SendContainerRequest.newBuilder()
+        .setContainerID(getContainerId())
+        .setData(data)
+        .setOffset(getWrittenBytes())
+        .build();
+    getStreamObserver().onNext(request);
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
new file mode 100644
index 0000000000..b12aad452c
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerResponse;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import static org.apache.ratis.util.Preconditions.assertSame;
+
+/**
+ * Handles incoming container pushed by other datanode.
+ */
+class SendContainerRequestHandler
+    implements StreamObserver<SendContainerRequest> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SendContainerRequestHandler.class);
+
+  private final ContainerImporter importer;
+  private final StreamObserver<SendContainerResponse> responseObserver;
+
+  private long containerId = -1;
+  private long nextOffset;
+  private OutputStream output;
+  private HddsVolume volume;
+  private Path path;
+
+  SendContainerRequestHandler(
+      ContainerImporter importer,
+      StreamObserver<SendContainerResponse> responseObserver) {
+    this.importer = importer;
+    this.responseObserver = responseObserver;
+  }
+
+  @Override
+  public void onNext(SendContainerRequest req) {
+    try {
+      final long length = req.getData().size();
+      LOG.info("Received part for container id:{} offset:{} len:{}",
+          req.getContainerID(), req.getOffset(), length);
+
+      assertSame(nextOffset, req.getOffset(), "offset");
+
+      if (containerId == -1) {
+        containerId = req.getContainerID();
+        volume = importer.chooseNextVolume();
+        Path dir = ContainerImporter.getUntarDirectory(volume);
+        Files.createDirectories(dir);
+        path = dir.resolve(ContainerUtils.getContainerTarName(containerId));
+        output = Files.newOutputStream(path);
+      }
+
+      assertSame(containerId, req.getContainerID(), "containerID");
+
+      req.getData().writeTo(output);
+
+      nextOffset += length;
+    } catch (Throwable t) {
+      onError(t);
+    }
+  }
+
+  @Override
+  public void onError(Throwable t) {
+    LOG.error("Error", t);
+    closeOutput();
+    deleteTarball();
+    responseObserver.onError(t);
+  }
+
+  @Override
+  public void onCompleted() {
+    if (output == null) {
+      LOG.warn("Received container without any parts");
+      return;
+    }
+
+    LOG.info("Received all parts for container {}", containerId);
+    closeOutput();
+
+    try {
+      importer.importContainer(containerId, path, volume);
+      LOG.info("Imported container {}", containerId);
+      responseObserver.onNext(SendContainerResponse.newBuilder().build());
+      responseObserver.onCompleted();
+    } catch (Throwable t) {
+      LOG.info("Failed to import container {}", containerId, t);
+      deleteTarball();
+      responseObserver.onError(t);
+    }
+  }
+
+  private void closeOutput() {
+    IOUtils.cleanupWithLogger(LOG, output);
+    output = null;
+  }
+
+  private void deleteTarball() {
+    try {
+      Files.deleteIfExists(path);
+    } catch (IOException e) {
+      LOG.warn("Error removing {}", path);
+    }
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
index 329fd64fa2..0e81e512d3 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
@@ -37,9 +37,6 @@ import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static 
org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator.CONTAINER_COPY_DIR;
-
-
 /**
  * Simple ContainerDownloaderImplementation to download the missing container
  * from the first available datanode.
@@ -70,7 +67,7 @@ public class SimpleContainerDownloader implements 
ContainerDownloader {
 
     if (downloadDir == null) {
       downloadDir = Paths.get(System.getProperty("java.io.tmpdir"))
-              .resolve(CONTAINER_COPY_DIR);
+              .resolve(ContainerImporter.CONTAINER_COPY_DIR);
     }
 
     final List<DatanodeDetails> shuffledDatanodes =
@@ -118,9 +115,9 @@ public class SimpleContainerDownloader implements 
ContainerDownloader {
     GrpcReplicationClient grpcReplicationClient =
         new GrpcReplicationClient(datanode.getIpAddress(),
             datanode.getPort(Name.REPLICATION).getValue(),
-            downloadDir, securityConfig, certClient, compression);
+            securityConfig, certClient, compression);
 
-    result = grpcReplicationClient.download(containerId)
+    result = grpcReplicationClient.download(containerId, downloadDir)
         .whenComplete((r, ex) -> {
           try {
             grpcReplicationClient.close();
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
index 3f5959a281..e090df42fe 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -33,29 +34,48 @@ import org.apache.hadoop.hdds.protocol.proto
 
 import com.google.common.base.Preconditions;
 
+import static java.util.Collections.emptyList;
+
 /**
  * SCM command to request replication of a container.
  */
-public class ReplicateContainerCommand
+public final class ReplicateContainerCommand
     extends SCMCommand<ReplicateContainerCommandProto> {
 
   private final long containerID;
   private final List<DatanodeDetails> sourceDatanodes;
+  private final DatanodeDetails targetDatanode;
   private int replicaIndex = 0;
 
-  public ReplicateContainerCommand(long containerID,
+  public static ReplicateContainerCommand fromSources(long containerID,
       List<DatanodeDetails> sourceDatanodes) {
-    super();
+    return new ReplicateContainerCommand(containerID, sourceDatanodes, null);
+  }
+
+  public static ReplicateContainerCommand toTarget(long containerID,
+      DatanodeDetails target) {
+    return new ReplicateContainerCommand(containerID, emptyList(), target);
+  }
+
+  public static ReplicateContainerCommand forTest(long containerID) {
+    return new ReplicateContainerCommand(containerID, emptyList(), null);
+  }
+
+  private ReplicateContainerCommand(long containerID,
+      List<DatanodeDetails> sourceDatanodes, DatanodeDetails target) {
     this.containerID = containerID;
     this.sourceDatanodes = sourceDatanodes;
+    this.targetDatanode = target;
   }
 
   // Should be called only for protobuf conversion
-  public ReplicateContainerCommand(long containerID,
-      List<DatanodeDetails> sourceDatanodes, long id) {
+  private ReplicateContainerCommand(long containerID,
+      List<DatanodeDetails> sourceDatanodes, long id,
+      DatanodeDetails targetDatanode) {
     super(id);
     this.containerID = containerID;
     this.sourceDatanodes = sourceDatanodes;
+    this.targetDatanode = targetDatanode;
   }
 
   public void setReplicaIndex(int index) {
@@ -76,6 +96,9 @@ public class ReplicateContainerCommand
       builder.addSources(dd.getProtoBufMessage());
     }
     builder.setReplicaIndex(replicaIndex);
+    if (targetDatanode != null) {
+      builder.setTarget(targetDatanode.getProtoBufMessage());
+    }
     return builder.build();
   }
 
@@ -83,15 +106,19 @@ public class ReplicateContainerCommand
       ReplicateContainerCommandProto protoMessage) {
     Preconditions.checkNotNull(protoMessage);
 
-    List<DatanodeDetails> datanodeDetails =
-        protoMessage.getSourcesList()
-            .stream()
+    List<DatanodeDetailsProto> sources = protoMessage.getSourcesList();
+    List<DatanodeDetails> sourceNodes = !sources.isEmpty()
+        ? sources.stream()
             .map(DatanodeDetails::getFromProtoBuf)
-            .collect(Collectors.toList());
+            .collect(Collectors.toList())
+        : emptyList();
+    DatanodeDetails targetNode = protoMessage.hasTarget()
+        ? DatanodeDetails.getFromProtoBuf(protoMessage.getTarget())
+        : null;
 
     ReplicateContainerCommand cmd =
         new ReplicateContainerCommand(protoMessage.getContainerID(),
-            datanodeDetails, protoMessage.getCmdId());
+            sourceNodes, protoMessage.getCmdId(), targetNode);
     if (protoMessage.hasReplicaIndex()) {
       cmd.setReplicaIndex(protoMessage.getReplicaIndex());
     }
@@ -106,6 +133,10 @@ public class ReplicateContainerCommand
     return sourceDatanodes;
   }
 
+  public DatanodeDetails getTargetDatanode() {
+    return targetDatanode;
+  }
+
   public int getReplicaIndex() {
     return replicaIndex;
   }
@@ -116,7 +147,11 @@ public class ReplicateContainerCommand
     sb.append(getType());
     sb.append(": containerId: ").append(getContainerID());
     sb.append(", replicaIndex: ").append(getReplicaIndex());
-    sb.append(", sourceNodes: ").append(sourceDatanodes);
+    if (targetDatanode != null) {
+      sb.append(", targetNode: ").append(targetDatanode);
+    } else {
+      sb.append(", sourceNodes: ").append(sourceDatanodes);
+    }
     return sb.toString();
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index 331d9f2bd9..e05fe45229 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.ozone.container.common.statemachine;
 
 import static 
com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
-import static java.util.Collections.emptyList;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction.Action.CLOSE;
 import static org.apache.ozone.test.GenericTestUtils.waitFor;
@@ -615,10 +614,10 @@ public class TestStateContext {
   @Test
   public void testCommandQueueSummary() throws IOException {
     StateContext ctx = createSubject();
-    ctx.addCommand(new ReplicateContainerCommand(1, null));
+    ctx.addCommand(ReplicateContainerCommand.forTest(1));
     ctx.addCommand(new ClosePipelineCommand(PipelineID.randomId()));
-    ctx.addCommand(new ReplicateContainerCommand(2, null));
-    ctx.addCommand(new ReplicateContainerCommand(3, null));
+    ctx.addCommand(ReplicateContainerCommand.forTest(2));
+    ctx.addCommand(ReplicateContainerCommand.forTest(3));
     ctx.addCommand(new ClosePipelineCommand(PipelineID.randomId()));
     ctx.addCommand(new CloseContainerCommand(1, PipelineID.randomId()));
 
@@ -682,7 +681,7 @@ public class TestStateContext {
   }
 
   private static SCMCommand<?> someCommand() {
-    return new ReplicateContainerCommand(1, emptyList());
+    return ReplicateContainerCommand.forTest(1);
   }
 
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcOutputStream.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
similarity index 88%
rename from 
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcOutputStream.java
rename to 
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
index 0da88e1645..e342ffb338 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcOutputStream.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.ozone.container.replication;
 
-import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.junit.jupiter.api.BeforeEach;
@@ -44,23 +43,30 @@ import static org.mockito.Mockito.verify;
  * Tests for {@code GrpcOutputStream}.
  */
 @ExtendWith(MockitoExtension.class)
-public class TestGrpcOutputStream {
+abstract class GrpcOutputStreamTest<T> {
 
   private static final Random RND = new Random();
 
   private final long containerId = RND.nextLong();
   private final int bufferSize = RND.nextInt(1024) + 128 + 1;
+  private final Class<? extends T> clazz;
 
   @Mock
-  private StreamObserver<CopyContainerResponseProto> observer;
+  private StreamObserver<T> observer;
 
   private OutputStream subject;
 
+  protected GrpcOutputStreamTest(Class<? extends T> clazz) {
+    this.clazz = clazz;
+  }
+
   @BeforeEach
-  public void setUp() throws Exception {
-    subject = new GrpcOutputStream(observer, containerId, bufferSize);
+  public void setUp() {
+    subject = createSubject();
   }
 
+  protected abstract OutputStream createSubject();
+
   @Test
   public void seriesOfBytesInSingleResponse() throws IOException {
     byte[] bytes = getRandomBytes(5);
@@ -158,25 +164,19 @@ public class TestGrpcOutputStream {
       expectedResponseCount++;
     }
 
-    ArgumentCaptor<CopyContainerResponseProto> captor =
-        ArgumentCaptor.forClass(CopyContainerResponseProto.class);
+    ArgumentCaptor<T> captor =
+        ArgumentCaptor.forClass(clazz);
     verify(observer, times(expectedResponseCount)).onNext(captor.capture());
 
-    List<CopyContainerResponseProto> responses =
+    List<T> responses =
         new ArrayList<>(captor.getAllValues());
     for (int i = 0; i < expectedResponseCount; i++) {
-      CopyContainerResponseProto response = responses.get(i);
-      assertEquals(containerId, response.getContainerID());
-
+      T response = responses.get(i);
       int expectedOffset = i * bufferSize;
-      assertEquals(expectedOffset, response.getReadOffset());
-
       int size = Math.min(bufferSize, bytes.length - expectedOffset);
-      assertEquals(size, response.getLen());
-
       byte[] part = new byte[size];
       System.arraycopy(bytes, expectedOffset, part, 0, size);
-      ByteString data = response.getData();
+      ByteString data = verifyPart(response, expectedOffset, size);
       assertArrayEquals(part, data.toByteArray());
 
       // we don't want concatenated ByteStrings
@@ -186,6 +186,9 @@ public class TestGrpcOutputStream {
     verify(observer, times(1)).onCompleted();
   }
 
+  protected abstract ByteString verifyPart(
+      T response, int expectedOffset, int size);
+
   private static byte[] concat(byte[]... parts) {
     int length = Arrays.stream(parts).mapToInt(each -> each.length).sum();
     byte[] bytes = new byte[length];
@@ -210,4 +213,15 @@ public class TestGrpcOutputStream {
     return bytes;
   }
 
+  long getContainerId() {
+    return containerId;
+  }
+
+  StreamObserver<T> getObserver() {
+    return observer;
+  }
+
+  int getBufferSize() {
+    return bufferSize;
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java
index 613f463185..11c362c8ab 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java
@@ -34,6 +34,8 @@ import 
org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import static 
org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand.fromSources;
+
 /**
  * Helper to check scheduling efficiency.
  * <p>
@@ -113,7 +115,7 @@ public class ReplicationSupervisorScheduling {
             }
           }
 
-        }, replicationConfig, Clock.system(ZoneId.systemDefault()));
+        }, null, replicationConfig, Clock.system(ZoneId.systemDefault()));
 
     final long start = System.currentTimeMillis();
 
@@ -121,7 +123,7 @@ public class ReplicationSupervisorScheduling {
     for (int i = 0; i < 100; i++) {
       List<DatanodeDetails> sources = new ArrayList<>();
       sources.add(datanodes.get(random.nextInt(datanodes.size())));
-      rs.addTask(new ReplicationTask(i, sources));
+      rs.addTask(new ReplicationTask(fromSources(i, sources)));
     }
     rs.shutdownAfterFinish();
     final long executionTime = System.currentTimeMillis() - start;
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestCopyContainerResponseStream.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestCopyContainerResponseStream.java
new file mode 100644
index 0000000000..54fc9b7e57
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestCopyContainerResponseStream.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
+import java.io.OutputStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test for {@link CopyContainerResponseStream}.
+ */
+class TestCopyContainerResponseStream
+    extends GrpcOutputStreamTest<CopyContainerResponseProto> {
+
+  TestCopyContainerResponseStream() {
+    super(CopyContainerResponseProto.class);
+  }
+
+  @Override
+  protected OutputStream createSubject() {
+    return new CopyContainerResponseStream(getObserver(),
+        getContainerId(), getBufferSize());
+  }
+
+  protected ByteString verifyPart(CopyContainerResponseProto response,
+      int expectedOffset, int size) {
+    assertEquals(getContainerId(), response.getContainerID());
+    assertEquals(expectedOffset, response.getReadOffset());
+    assertEquals(size, response.getLen());
+    return response.getData();
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestMeasuredReplicator.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestMeasuredReplicator.java
index 4f7bd048f9..987c47835d 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestMeasuredReplicator.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestMeasuredReplicator.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.ozone.container.replication;
 
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
 
 import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
 
@@ -28,6 +27,8 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import static 
org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand.forTest;
+
 /**
  * Test replicator metric measurement.
  */
@@ -63,9 +64,9 @@ public class TestMeasuredReplicator {
   @Test
   public void measureFailureSuccessAndBytes() {
     //WHEN
-    measuredReplicator.replicate(new ReplicationTask(1L, new ArrayList<>()));
-    measuredReplicator.replicate(new ReplicationTask(2L, new ArrayList<>()));
-    measuredReplicator.replicate(new ReplicationTask(3L, new ArrayList<>()));
+    measuredReplicator.replicate(new ReplicationTask(forTest(1)));
+    measuredReplicator.replicate(new ReplicationTask(forTest(2)));
+    measuredReplicator.replicate(new ReplicationTask(forTest(3)));
 
     //THEN
     //even containers should be failed
@@ -83,9 +84,9 @@ public class TestMeasuredReplicator {
   public void testReplicationTime() throws Exception {
     //WHEN
     //will wait at least the 300ms
-    measuredReplicator.replicate(new ReplicationTask(101L, new ArrayList<>()));
-    measuredReplicator.replicate(new ReplicationTask(201L, new ArrayList<>()));
-    measuredReplicator.replicate(new ReplicationTask(300L, new ArrayList<>()));
+    measuredReplicator.replicate(new ReplicationTask(forTest(101)));
+    measuredReplicator.replicate(new ReplicationTask(forTest(201)));
+    measuredReplicator.replicate(new ReplicationTask(forTest(300)));
 
     //THEN
     //even containers should be failed
@@ -101,7 +102,7 @@ public class TestMeasuredReplicator {
   public void testFailureTimeSuccessExcluded() {
     //WHEN
     //will wait at least the 15ms
-    measuredReplicator.replicate(new ReplicationTask(15L, new ArrayList<>()));
+    measuredReplicator.replicate(new ReplicationTask(forTest(15)));
 
     //THEN
     //even containers should be failed, supposed to be zero
@@ -112,7 +113,7 @@ public class TestMeasuredReplicator {
   public void testSuccessTimeFailureExcluded() {
     //WHEN
     //will wait at least the 10ms
-    measuredReplicator.replicate(new ReplicationTask(10L, new ArrayList<>()));
+    measuredReplicator.replicate(new ReplicationTask(forTest(10)));
 
     //THEN
     //even containers should be failed, supposed to be zero
@@ -122,7 +123,7 @@ public class TestMeasuredReplicator {
   @Test
   public void testReplicationQueueTimeMetrics() {
     final Instant queued = Instant.now().minus(1, ChronoUnit.SECONDS);
-    ReplicationTask task = new ReplicationTask(100L, new ArrayList<>()) {
+    ReplicationTask task = new ReplicationTask(forTest(100)) {
       @Override
       public Instant getQueued() {
         return queued;
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
index 2ea47f5972..6bc58d2270 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
@@ -82,8 +82,12 @@ public class TestReplicationSupervisor {
   };
   private final AtomicReference<ContainerReplicator> replicatorRef =
       new AtomicReference<>();
-  private final ContainerReplicator mutableReplicator =
+  private final AtomicReference<ContainerReplicator> pushReplicatorRef =
+      new AtomicReference<>();
+  private final ContainerReplicator pullReplicator =
       task -> replicatorRef.get().replicate(task);
+  private final ContainerReplicator pushReplicator =
+      task -> pushReplicatorRef.get().replicate(task);
 
   private ContainerSet set;
 
@@ -251,8 +255,8 @@ public class TestReplicationSupervisor {
   @Test
   public void testDownloadAndImportReplicatorFailure() throws IOException {
     ReplicationSupervisor supervisor =
-        new ReplicationSupervisor(set, context, mutableReplicator,
-            newDirectExecutorService(), clock);
+        new ReplicationSupervisor(set, context, pullReplicator,
+            pushReplicator, newDirectExecutorService(), clock);
 
     OzoneConfiguration conf = new OzoneConfiguration();
     // Mock to fetch an exception in the importContainer method.
@@ -271,10 +275,12 @@ public class TestReplicationSupervisor {
     Mockito.when(volumeSet.getVolumesList())
         .thenReturn(Collections.singletonList(
             new HddsVolume.Builder(testDir).conf(conf).build()));
-    ContainerReplicator replicatorFactory =
-        new DownloadAndImportReplicator(conf, set, null, moc, null, volumeSet);
+    ContainerImporter importer =
+        new ContainerImporter(conf, set, null, null, volumeSet);
+    ContainerReplicator replicator =
+        new DownloadAndImportReplicator(importer, moc);
 
-    replicatorRef.set(replicatorFactory);
+    replicatorRef.set(replicator);
 
     GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
         .captureLogs(DownloadAndImportReplicator.LOG);
@@ -340,8 +346,8 @@ public class TestReplicationSupervisor {
       Function<ReplicationSupervisor, ContainerReplicator> replicatorFactory,
       ExecutorService executor) {
     ReplicationSupervisor supervisor =
-        new ReplicationSupervisor(set, context, mutableReplicator, executor,
-            clock);
+        new ReplicationSupervisor(set, context, pullReplicator, pushReplicator,
+            executor, clock);
     replicatorRef.set(replicatorFactory.apply(supervisor));
     return supervisor;
   }
@@ -353,7 +359,7 @@ public class TestReplicationSupervisor {
 
   private static ReplicateContainerCommand createCommand(long containerId) {
     ReplicateContainerCommand cmd =
-        new ReplicateContainerCommand(containerId, emptyList());
+        ReplicateContainerCommand.forTest(containerId);
     cmd.setTerm(CURRENT_TERM);
     return cmd;
   }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
index b73c04d399..85c7bc0f08 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
@@ -40,10 +40,9 @@ import 
org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointT
 import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
+import org.apache.hadoop.ozone.container.replication.ContainerImporter;
 import 
org.apache.hadoop.ozone.container.replication.ContainerReplicationSource;
-import 
org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator;
 import 
org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
-import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
 import org.apache.ozone.test.LambdaTestUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -666,11 +665,10 @@ public class TestDatanodeUpgradeToScmHA {
    * {@code containerID}.
    */
   public void importContainer(long containerID, File source) throws Exception {
-    DownloadAndImportReplicator replicator =
-        new DownloadAndImportReplicator(dsm.getConf(),
+    ContainerImporter replicator =
+        new ContainerImporter(dsm.getConf(),
             dsm.getContainer().getContainerSet(),
             dsm.getContainer().getController(),
-            new SimpleContainerDownloader(conf, null),
             new TarContainerPacker(), dsm.getContainer().getVolumeSet());
 
     File tempFile = tempFolder.newFile(
diff --git 
a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto 
b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 1b7fcad140..7312fd6a9e 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -502,6 +502,16 @@ message CopyContainerResponseProto {
   optional int64 checksum = 6;
 }
 
+message SendContainerRequest {
+  required int64 containerID = 1;
+  required uint64 offset = 2;
+  required bytes data = 3;
+  optional int64 checksum = 4;
+}
+
+message SendContainerResponse {
+}
+
 service XceiverClientProtocolService {
   // A client-to-datanode RPC to send container commands
   rpc send(stream ContainerCommandRequestProto) returns
@@ -512,4 +522,5 @@ service XceiverClientProtocolService {
 service IntraDatanodeProtocolService {
   // An intradatanode service to copy the raw container data between nodes
   rpc download (CopyContainerRequestProto) returns (stream 
CopyContainerResponseProto);
+  rpc upload (stream SendContainerRequest) returns (SendContainerResponse);
 }
diff --git 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index 39d6a2931e..b36c3c89ff 100644
--- 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++ 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -415,6 +415,7 @@ message ReplicateContainerCommandProto {
   repeated DatanodeDetailsProto sources = 2;
   required int64 cmdId = 3;
   optional int32 replicaIndex = 4;
+  optional DatanodeDetailsProto target = 5;
 }
 
 /**
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java
index 5998c93134..129bddffb3 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -37,8 +36,8 @@ import java.util.Set;
 public class ECMisReplicationHandler extends MisReplicationHandler {
   public ECMisReplicationHandler(
           PlacementPolicy<ContainerReplica> containerPlacement,
-          ConfigurationSource conf, NodeManager nodeManager) {
-    super(containerPlacement, conf, nodeManager);
+          ConfigurationSource conf, NodeManager nodeManager, boolean push) {
+    super(containerPlacement, conf, nodeManager, push);
   }
 
   @Override
@@ -56,15 +55,12 @@ public class ECMisReplicationHandler extends 
MisReplicationHandler {
   }
 
   @Override
-  protected ReplicateContainerCommand getReplicateCommand(
-          ContainerInfo containerInfo, ContainerReplica replica) {
-    final ReplicateContainerCommand replicateCommand =
-            new ReplicateContainerCommand(containerInfo.getContainerID(),
-            Collections.singletonList(replica.getDatanodeDetails()));
+  protected ReplicateContainerCommand updateReplicateCommand(
+          ReplicateContainerCommand command, ContainerReplica replica) {
     // For EC containers, we need to track the replica index which is
     // to be replicated, so add it to the command.
-    replicateCommand.setReplicaIndex(replica.getReplicaIndex());
-    return replicateCommand;
+    command.setReplicaIndex(replica.getReplicaIndex());
+    return command;
   }
 
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index a1e7f1a73f..49b29bd46c 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -356,16 +356,7 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
                   replicas, selectedDatanodes, excludedNodes, decomIndexes);
               break;
             }
-            DatanodeDetails decommissioningSrcNode
-                = replica.getDatanodeDetails();
-            final ReplicateContainerCommand replicateCommand =
-                new ReplicateContainerCommand(container.getContainerID(),
-                    ImmutableList.of(decommissioningSrcNode));
-            // For EC containers, we need to track the replica index which is
-            // to be replicated, so add it to the command.
-            replicateCommand.setReplicaIndex(replica.getReplicaIndex());
-            DatanodeDetails target = iterator.next();
-            commands.put(target, replicateCommand);
+            createReplicateCommand(commands, container, iterator, replica);
           }
         }
       }
@@ -414,21 +405,30 @@ public class ECUnderReplicationHandler implements 
UnhealthyReplicationHandler {
               replicas, targets, excludedNodes, maintIndexes);
           break;
         }
-        DatanodeDetails maintenanceSourceNode = replica.getDatanodeDetails();
-        final ReplicateContainerCommand replicateCommand =
-            new ReplicateContainerCommand(
-                container.containerID().getProtobuf().getId(),
-                ImmutableList.of(maintenanceSourceNode));
-        // For EC containers we need to track the replica index which is
-        // to be replicated, so add it to the command.
-        replicateCommand.setReplicaIndex(replica.getReplicaIndex());
-        DatanodeDetails target = iterator.next();
-        commands.put(target, replicateCommand);
+        createReplicateCommand(commands, container, iterator, replica);
         additionalMaintenanceCopiesNeeded -= 1;
       }
     }
   }
 
+  private void createReplicateCommand(
+      Map<DatanodeDetails, SCMCommand<?>> commands,
+      ContainerInfo container, Iterator<DatanodeDetails> iterator,
+      ContainerReplica replica) {
+    final boolean push = replicationManager.getConfig().isPush();
+    DatanodeDetails source = replica.getDatanodeDetails();
+    DatanodeDetails target = iterator.next();
+    final long containerID = container.getContainerID();
+    final ReplicateContainerCommand replicateCommand = push
+        ? ReplicateContainerCommand.toTarget(containerID, target)
+        : ReplicateContainerCommand.fromSources(containerID,
+            ImmutableList.of(source));
+    // For EC containers, we need to track the replica index which is
+    // to be replicated, so add it to the command.
+    replicateCommand.setReplicaIndex(replica.getReplicaIndex());
+    commands.put(push ? source : target, replicateCommand);
+  }
+
   private static byte[] int2byte(List<Integer> src) {
     byte[] dst = new byte[src.size()];
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
index 80a61ee5db..3e3a34c0b4 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
@@ -1474,21 +1474,24 @@ public class LegacyReplicationManager {
    * datanode.
    *
    * @param container Container to be replicated
-   * @param datanode The destination datanode to replicate
+   * @param target The destination datanode to replicate
    * @param sources List of source nodes from where we can replicate
    */
   private void sendReplicateCommand(final ContainerInfo container,
-                                    final DatanodeDetails datanode,
+                                    final DatanodeDetails target,
                                     final List<DatanodeDetails> sources) {
 
-    LOG.info("Sending replicate container command for container {}" +
-            " to datanode {} from datanodes {}",
-        container.containerID(), datanode, sources);
-
     final ContainerID id = container.containerID();
-    final ReplicateContainerCommand replicateCommand =
-        new ReplicateContainerCommand(id.getId(), sources);
-    final boolean sent = sendAndTrackDatanodeCommand(datanode, 
replicateCommand,
+    final long containerID = id.getId();
+    final boolean push = rmConf.isPush();
+    final ReplicateContainerCommand replicateCommand = push
+        ? ReplicateContainerCommand.toTarget(containerID, target)
+        : ReplicateContainerCommand.fromSources(containerID, sources);
+    final DatanodeDetails source = sources.get(0); // TODO randomize
+    final DatanodeDetails receiver = push ? source : target;
+    LOG.info("Sending {} to {}", replicateCommand, receiver);
+
+    final boolean sent = sendAndTrackDatanodeCommand(receiver, 
replicateCommand,
         action -> addInflight(InflightType.REPLICATION, id, action));
 
     if (sent) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
index b418b9236e..3570f45eb7 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
@@ -58,15 +58,18 @@ public abstract class MisReplicationHandler implements
   private final PlacementPolicy<ContainerReplica> containerPlacement;
   private final long currentContainerSize;
   private final NodeManager nodeManager;
+  private boolean push;
 
   public MisReplicationHandler(
           final PlacementPolicy<ContainerReplica> containerPlacement,
-          final ConfigurationSource conf, NodeManager nodeManager) {
+          final ConfigurationSource conf, NodeManager nodeManager,
+      final boolean push) {
     this.containerPlacement = containerPlacement;
     this.currentContainerSize = (long) conf.getStorageSize(
             ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
             ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
     this.nodeManager = nodeManager;
+    this.push = push;
   }
 
   protected abstract ContainerReplicaCount getContainerReplicaCount(
@@ -108,8 +111,8 @@ public abstract class MisReplicationHandler implements
         .collect(Collectors.toSet());
   }
 
-  protected abstract ReplicateContainerCommand getReplicateCommand(
-          ContainerInfo containerInfo, ContainerReplica replica);
+  protected abstract ReplicateContainerCommand updateReplicateCommand(
+          ReplicateContainerCommand command, ContainerReplica replica);
 
   private Map<DatanodeDetails, SCMCommand<?>> getReplicateCommands(
           ContainerInfo containerInfo,
@@ -121,8 +124,15 @@ public abstract class MisReplicationHandler implements
       if (datanodeIdx == targetDns.size()) {
         break;
       }
-      commandMap.put(targetDns.get(datanodeIdx),
-              getReplicateCommand(containerInfo, replica));
+      long containerID = containerInfo.getContainerID();
+      DatanodeDetails source = replica.getDatanodeDetails();
+      DatanodeDetails target = targetDns.get(datanodeIdx);
+      ReplicateContainerCommand replicateCommand = push
+          ? ReplicateContainerCommand.toTarget(containerID, target)
+          : ReplicateContainerCommand.fromSources(containerID,
+              Collections.singletonList(source));
+      replicateCommand = updateReplicateCommand(replicateCommand, replica);
+      commandMap.put(push ? source : target, replicateCommand);
       datanodeIdx += 1;
     }
     return commandMap;
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java
index 7dfe50e955..d3652a635f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -39,8 +38,8 @@ public class RatisMisReplicationHandler extends 
MisReplicationHandler {
 
   public RatisMisReplicationHandler(
           PlacementPolicy<ContainerReplica> containerPlacement,
-          ConfigurationSource conf, NodeManager nodeManager) {
-    super(containerPlacement, conf, nodeManager);
+          ConfigurationSource conf, NodeManager nodeManager, boolean push) {
+    super(containerPlacement, conf, nodeManager, push);
   }
 
   @Override
@@ -70,9 +69,8 @@ public class RatisMisReplicationHandler extends 
MisReplicationHandler {
   }
 
   @Override
-  protected ReplicateContainerCommand getReplicateCommand(
-          ContainerInfo containerInfo, ContainerReplica replica) {
-    return new ReplicateContainerCommand(containerInfo.getContainerID(),
-            Collections.singletonList(replica.getDatanodeDetails()));
+  protected ReplicateContainerCommand updateReplicateCommand(
+          ReplicateContainerCommand command, ContainerReplica replica) {
+    return command;
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
index e5b2fe8bda..d9d96a566c 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
@@ -36,6 +36,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -53,14 +54,17 @@ public class RatisUnderReplicationHandler
   private final PlacementPolicy placementPolicy;
   private final NodeManager nodeManager;
   private final long currentContainerSize;
+  private final ReplicationManager replicationManager;
 
   public RatisUnderReplicationHandler(final PlacementPolicy placementPolicy,
-      final ConfigurationSource conf, final NodeManager nodeManager) {
+      final ConfigurationSource conf, final NodeManager nodeManager,
+      final ReplicationManager replicationManager) {
     this.placementPolicy = placementPolicy;
     this.currentContainerSize = (long) conf
         .getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
             ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
     this.nodeManager = nodeManager;
+    this.replicationManager = replicationManager;
   }
 
   /**
@@ -230,11 +234,29 @@ public class RatisUnderReplicationHandler
   private Map<DatanodeDetails, SCMCommand<?>> createReplicationCommands(
       long containerID, List<DatanodeDetails> sources,
       List<DatanodeDetails> targets) {
+    final boolean push = replicationManager.getConfig().isPush()
+        && targets.size() <= sources.size();
+    // TODO if we need multiple commands per source datanode, we need a small
+    // interface change
     Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
-    for (DatanodeDetails target : targets) {
-      ReplicateContainerCommand command =
-          new ReplicateContainerCommand(containerID, sources);
-      commands.put(target, command);
+
+    if (push) {
+      Collections.shuffle(sources);
+      for (Iterator<DatanodeDetails> srcIter = sources.iterator(),
+              targetIter = targets.iterator();
+          srcIter.hasNext() && targetIter.hasNext();) {
+        DatanodeDetails source = srcIter.next();
+        DatanodeDetails target = targetIter.next();
+        ReplicateContainerCommand command =
+            ReplicateContainerCommand.toTarget(containerID, target);
+        commands.put(source, command);
+      }
+    } else {
+      for (DatanodeDetails target : targets) {
+        ReplicateContainerCommand command =
+            ReplicateContainerCommand.fromSources(containerID, sources);
+        commands.put(target, command);
+      }
     }
 
     return commands;
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 535f0dde9d..911058f250 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -79,6 +79,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import static org.apache.hadoop.hdds.conf.ConfigTag.DATANODE;
 import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
 import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
@@ -225,9 +226,9 @@ public class ReplicationManager implements SCMService {
     ecOverReplicationHandler =
         new ECOverReplicationHandler(ecContainerPlacement, nodeManager);
     ecMisReplicationHandler = new ECMisReplicationHandler(ecContainerPlacement,
-        conf, nodeManager);
+        conf, nodeManager, rmConf.isPush());
     ratisUnderReplicationHandler = new RatisUnderReplicationHandler(
-        ratisContainerPlacement, conf, nodeManager);
+        ratisContainerPlacement, conf, nodeManager, this);
     ratisOverReplicationHandler =
         new RatisOverReplicationHandler(ratisContainerPlacement, nodeManager);
     underReplicatedProcessor =
@@ -861,6 +862,16 @@ public class ReplicationManager implements SCMService {
     )
     private int maintenanceRemainingRedundancy = 1;
 
+    @Config(key = "push",
+        type = ConfigType.BOOLEAN,
+        defaultValue = "false",
+        tags = { SCM, DATANODE },
+        description = "If false, replication happens by asking the target to " 
+
+            "pull from source nodes.  If true, the source node is asked to " +
+            "push to the target node."
+    )
+    private boolean push;
+
     @PostConstruct
     public void validate() {
       if (!(commandDeadlineFactor > 0) || (commandDeadlineFactor > 1)) {
@@ -905,6 +916,10 @@ public class ReplicationManager implements SCMService {
     public int getMaintenanceReplicaMinimum() {
       return maintenanceReplicaMinimum;
     }
+
+    public boolean isPush() {
+      return push;
+    }
   }
 
   @Override
@@ -952,6 +967,10 @@ public class ReplicationManager implements SCMService {
     return metrics;
   }
 
+  public ReplicationManagerConfiguration getConfig() {
+    return rmConf;
+  }
+
 
   /**
   * following functions will be refactored in a separate jira.
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
index 3332fc4e00..02cd592fe2 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
@@ -169,6 +169,7 @@ public class TestECMisReplicationHandler extends 
TestMisReplicationHandler {
   protected MisReplicationHandler getMisreplicationHandler(
           PlacementPolicy placementPolicy, OzoneConfiguration conf,
           NodeManager nodeManager) {
-    return new ECMisReplicationHandler(placementPolicy, conf, nodeManager);
+    return new ECMisReplicationHandler(placementPolicy, conf, nodeManager,
+        false);
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index 58989daa48..28961646e6 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -97,6 +97,10 @@ public class TestECUnderReplicationHandler {
       }
     };
     replicationManager = Mockito.mock(ReplicationManager.class);
+    ReplicationManager.ReplicationManagerConfiguration rmConf =
+        new ReplicationManager.ReplicationManagerConfiguration();
+    Mockito.when(replicationManager.getConfig())
+        .thenReturn(rmConf);
     conf = SCMTestUtils.getConf();
     repConfig = new ECReplicationConfig(DATA, PARITY);
     container = ReplicationTestUtil
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java
index f960974256..874b4db2ba 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java
@@ -173,6 +173,7 @@ public class TestRatisMisReplicationHandler extends 
TestMisReplicationHandler {
   protected MisReplicationHandler getMisreplicationHandler(
           PlacementPolicy placementPolicy, OzoneConfiguration conf,
           NodeManager nodeManager) {
-    return new RatisMisReplicationHandler(placementPolicy, conf, nodeManager);
+    return new RatisMisReplicationHandler(placementPolicy, conf, nodeManager,
+        false);
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
index b2ccba1455..efed912e74 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import 
org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.UnderReplicatedHealthResult;
+import 
org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
@@ -61,6 +62,7 @@ public class TestRatisUnderReplicationHandler {
   private static final RatisReplicationConfig RATIS_REPLICATION_CONFIG =
       RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
   private PlacementPolicy policy;
+  private ReplicationManager replicationManager;
 
   @Before
   public void setup() throws NodeNotFoundException {
@@ -71,6 +73,9 @@ public class TestRatisUnderReplicationHandler {
     conf = SCMTestUtils.getConf();
     policy = ReplicationTestUtil
         .getSimpleTestPlacementPolicy(nodeManager, conf);
+    replicationManager = Mockito.mock(ReplicationManager.class);
+    Mockito.when(replicationManager.getConfig())
+        .thenReturn(new ReplicationManagerConfiguration());
 
     /*
      Return NodeStatus with NodeOperationalState as specified in
@@ -183,7 +188,8 @@ public class TestRatisUnderReplicationHandler {
     policy = ReplicationTestUtil.getNoNodesTestPlacementPolicy(nodeManager,
         conf);
     RatisUnderReplicationHandler handler =
-        new RatisUnderReplicationHandler(policy, conf, nodeManager);
+        new RatisUnderReplicationHandler(policy, conf, nodeManager,
+            replicationManager);
 
     Set<ContainerReplica> replicas
         = createReplicas(container.containerID(), State.CLOSED, 0, 0);
@@ -211,7 +217,8 @@ public class TestRatisUnderReplicationHandler {
       ContainerHealthResult healthResult,
       int minHealthyForMaintenance, int expectNumCommands) throws IOException {
     RatisUnderReplicationHandler handler =
-        new RatisUnderReplicationHandler(policy, conf, nodeManager);
+        new RatisUnderReplicationHandler(policy, conf, nodeManager,
+            replicationManager);
 
     Map<DatanodeDetails, SCMCommand<?>> commands =
         handler.processAndCreateCommands(replicas, pendingOps,
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index dd4a9e385c..2a2e6c68e6 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -576,7 +576,7 @@ public class TestReplicationManager {
     sources.add(MockDatanodeDetails.randomDatanodeDetails());
 
 
-    ReplicateContainerCommand command = new ReplicateContainerCommand(
+    ReplicateContainerCommand command = ReplicateContainerCommand.fromSources(
         containerInfo.getContainerID(), sources);
     command.setReplicaIndex(1);
 
@@ -611,7 +611,7 @@ public class TestReplicationManager {
     containerInfo = ReplicationTestUtil.createContainerInfo(ratisRepConfig, 2,
         HddsProtos.LifeCycleState.CLOSED, 10, 20);
 
-    command = new ReplicateContainerCommand(
+    command = ReplicateContainerCommand.fromSources(
         containerInfo.getContainerID(), sources);
     replicationManager.sendDatanodeCommand(command, containerInfo, target);
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
index 5a7900e8ec..a56030811d 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
@@ -111,7 +111,7 @@ public class TestUnderReplicatedProcessor {
     List<DatanodeDetails> sourceDns = new ArrayList<>();
     sourceDns.add(MockDatanodeDetails.randomDatanodeDetails());
     DatanodeDetails targetDn = MockDatanodeDetails.randomDatanodeDetails();
-    ReplicateContainerCommand rcc = new ReplicateContainerCommand(
+    ReplicateContainerCommand rcc = ReplicateContainerCommand.fromSources(
         container.getContainerID(), sourceDns);
     rcc.setReplicaIndex(3);
 
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config 
b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
index e241eff809..138e12e5f1 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
@@ -68,6 +68,7 @@ 
OZONE-SITE.XML_ozone.s3g.kerberos.principal=s3g/[email protected]
 
 OZONE-SITE.XML_hdds.scm.replication.thread.interval=5s
 OZONE-SITE.XML_hdds.scm.replication.event.timeout=10s
+OZONE-SITE.XML_hdds.scm.replication.push=true
 OZONE-SITE.XML_ozone.scm.stale.node.interval=30s
 OZONE-SITE.XML_ozone.scm.dead.node.interval=45s
 OZONE-SITE.XML_hdds.container.report.interval=60s
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDatanodeProtocolServer.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDatanodeProtocolServer.java
index cfa34fc445..4e329ad305 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDatanodeProtocolServer.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDatanodeProtocolServer.java
@@ -25,7 +25,6 @@ import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.concurrent.TimeoutException;
 
 /**
@@ -39,8 +38,7 @@ public class TestSCMDatanodeProtocolServer {
     OzoneStorageContainerManager scm =
         Mockito.mock(OzoneStorageContainerManager.class);
 
-    ReplicateContainerCommand command = new ReplicateContainerCommand(1L,
-        Collections.emptyList());
+    ReplicateContainerCommand command = ReplicateContainerCommand.forTest(1);
     command.setTerm(5L);
     command.setDeadline(1234L);
     StorageContainerDatanodeProtocolProtos.SCMCommandProto proto =
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
index 1c0ba2a80c..7eabe6884c 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
@@ -35,6 +35,7 @@ import 
org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.replication.ContainerImporter;
 import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
 import 
org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator;
 import org.apache.hadoop.ozone.container.replication.ReplicationServer;
@@ -128,7 +129,7 @@ public class ClosedContainerReplicator extends 
BaseFreonGenerator implements
         //replica.
         if (datanode.isEmpty() || datanodeUUIDs.contains(datanode)) {
           replicationTasks.add(new ReplicationTask(
-              new ReplicateContainerCommand(container.getContainerID(),
+              ReplicateContainerCommand.fromSources(container.getContainerID(),
                   datanodesWithContainer)));
         }
       }
@@ -202,16 +203,16 @@ public class ClosedContainerReplicator extends 
BaseFreonGenerator implements
     ContainerController controller =
         new ContainerController(containerSet, handlers);
 
-    ContainerReplicator replicator =
-        new DownloadAndImportReplicator(conf, containerSet,
-            controller,
-            new SimpleContainerDownloader(conf, null),
-            new TarContainerPacker(), null);
+    ContainerImporter importer = new ContainerImporter(conf, containerSet,
+        controller, new TarContainerPacker(), null);
+    ContainerReplicator replicator = new DownloadAndImportReplicator(importer,
+        new SimpleContainerDownloader(conf, null));
 
     ReplicationServer.ReplicationConfig replicationConfig
         = conf.getObject(ReplicationServer.ReplicationConfig.class);
     supervisor = new ReplicationSupervisor(containerSet, null,
-        replicator, replicationConfig, Clock.system(ZoneId.systemDefault()));
+        replicator, null, replicationConfig,
+        Clock.system(ZoneId.systemDefault()));
   }
 
   private void replicateContainer(long counter) throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to