This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new aed64de760 HDDS-7777. Implement container replication in push model
(#4197)
aed64de760 is described below
commit aed64de760b9860650d8bfb435c15e07f1d802af
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue Jan 24 12:32:11 2023 +0100
HDDS-7777. Implement container replication in push model (#4197)
---
.../common/statemachine/DatanodeStateMachine.java | 27 +++--
.../ReplicateContainerCommandHandler.java | 5 +-
.../container/keyvalue/KeyValueContainer.java | 4 +-
.../ozone/container/ozoneimpl/OzoneContainer.java | 6 +-
...mportReplicator.java => ContainerImporter.java} | 134 ++++++---------------
.../container/replication/ContainerUploader.java | 32 +++++
.../replication/CopyContainerResponseStream.java | 47 ++++++++
.../replication/DownloadAndImportReplicator.java | 110 ++---------------
.../replication/GrpcContainerUploader.java | 100 +++++++++++++++
.../container/replication/GrpcOutputStream.java | 41 ++++---
.../replication/GrpcReplicationClient.java | 16 +--
.../replication/GrpcReplicationService.java | 20 ++-
.../container/replication/PushReplicator.java | 70 +++++++++++
.../container/replication/ReplicationServer.java | 7 +-
.../replication/ReplicationSupervisor.java | 59 ++++-----
.../container/replication/ReplicationTask.java | 49 +++-----
.../replication/SendContainerOutputStream.java | 43 +++++++
.../replication/SendContainerRequestHandler.java | 131 ++++++++++++++++++++
.../replication/SimpleContainerDownloader.java | 9 +-
.../commands/ReplicateContainerCommand.java | 57 +++++++--
.../common/statemachine/TestStateContext.java | 9 +-
...OutputStream.java => GrpcOutputStreamTest.java} | 46 ++++---
.../ReplicationSupervisorScheduling.java | 6 +-
.../TestCopyContainerResponseStream.java | 50 ++++++++
.../replication/TestMeasuredReplicator.java | 21 ++--
.../replication/TestReplicationSupervisor.java | 24 ++--
.../upgrade/TestDatanodeUpgradeToScmHA.java | 8 +-
.../src/main/proto/DatanodeClientProtocol.proto | 11 ++
.../proto/ScmServerDatanodeHeartbeatProtocol.proto | 1 +
.../replication/ECMisReplicationHandler.java | 16 +--
.../replication/ECUnderReplicationHandler.java | 40 +++---
.../replication/LegacyReplicationManager.java | 21 ++--
.../replication/MisReplicationHandler.java | 20 ++-
.../replication/RatisMisReplicationHandler.java | 12 +-
.../replication/RatisUnderReplicationHandler.java | 32 ++++-
.../container/replication/ReplicationManager.java | 23 +++-
.../replication/TestECMisReplicationHandler.java | 3 +-
.../replication/TestECUnderReplicationHandler.java | 4 +
.../TestRatisMisReplicationHandler.java | 3 +-
.../TestRatisUnderReplicationHandler.java | 11 +-
.../replication/TestReplicationManager.java | 4 +-
.../replication/TestUnderReplicatedProcessor.java | 2 +-
.../src/main/compose/ozonesecure/docker-config | 1 +
.../hdds/scm/TestSCMDatanodeProtocolServer.java | 4 +-
.../ozone/freon/ClosedContainerReplicator.java | 15 +--
45 files changed, 913 insertions(+), 441 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 4b8f0be1e3..6da466d8c0 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -60,9 +60,13 @@ import
org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionMetri
import
org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionSupervisor;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.replication.ContainerImporter;
import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
import
org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator;
+import org.apache.hadoop.ozone.container.replication.GrpcContainerUploader;
import org.apache.hadoop.ozone.container.replication.MeasuredReplicator;
+import
org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
+import org.apache.hadoop.ozone.container.replication.PushReplicator;
import
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
import
org.apache.hadoop.ozone.container.replication.ReplicationSupervisorMetrics;
@@ -171,19 +175,26 @@ public class DatanodeStateMachine implements Closeable {
dnCertClient = certClient;
nextHB = new AtomicLong(Time.monotonicNow());
- ContainerReplicator replicator =
- new DownloadAndImportReplicator(conf, container.getContainerSet(),
- container.getController(),
- new SimpleContainerDownloader(conf, dnCertClient),
- new TarContainerPacker(), container.getVolumeSet());
-
- replicatorMetrics = new MeasuredReplicator(replicator);
+ ContainerImporter importer = new ContainerImporter(conf,
+ container.getContainerSet(),
+ container.getController(),
+ new TarContainerPacker(), container.getVolumeSet());
+ ContainerReplicator pullReplicator = new DownloadAndImportReplicator(
+ importer,
+ new SimpleContainerDownloader(conf, dnCertClient));
+ ContainerReplicator pushReplicator = new PushReplicator(
+ // TODO compression, metrics
+ new OnDemandContainerReplicationSource(container.getController()),
+ new GrpcContainerUploader(conf, dnCertClient)
+ );
+
+ replicatorMetrics = new MeasuredReplicator(pullReplicator);
ReplicationConfig replicationConfig =
conf.getObject(ReplicationConfig.class);
supervisor =
new ReplicationSupervisor(container.getContainerSet(), context,
- replicatorMetrics, replicationConfig, clock);
+ replicatorMetrics, pushReplicator, replicationConfig, clock);
replicationSupervisorMetrics =
ReplicationSupervisorMetrics.create(supervisor);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
index df589e287d..6d281adc9c 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
@@ -66,10 +66,11 @@ public class ReplicateContainerCommandHandler implements
CommandHandler {
final List<DatanodeDetails> sourceDatanodes =
replicateCommand.getSourceDatanodes();
final long containerID = replicateCommand.getContainerID();
+ final DatanodeDetails target = replicateCommand.getTargetDatanode();
- Preconditions.checkArgument(sourceDatanodes.size() > 0,
+ Preconditions.checkArgument(!sourceDatanodes.isEmpty() || target != null,
"Replication command is received for container %s "
- + "without source datanodes.", containerID);
+ + "without source or target datanodes.", containerID);
ReplicationTask task = new ReplicationTask(replicateCommand);
supervisor.addTask(task);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index e2bafc080b..e1ee881142 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -58,7 +58,7 @@ import
org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
import
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
-import
org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator;
+import org.apache.hadoop.ozone.container.replication.ContainerImporter;
import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -518,7 +518,7 @@ public class KeyValueContainer implements
Container<KeyValueContainerData> {
Path destContainerDir =
Paths.get(KeyValueContainerLocationUtil.getBaseContainerLocation(
hddsVolume.getHddsRootDir().toString(), idDir, containerId));
- Path tmpDir = DownloadAndImportReplicator.getUntarDirectory(hddsVolume);
+ Path tmpDir = ContainerImporter.getUntarDirectory(hddsVolume);
writeLock();
try {
//copy the values from the input stream to the final destination
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 5d2153465e..3c63aaa799 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -53,8 +53,10 @@ import
org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import
org.apache.hadoop.ozone.container.common.volume.StorageVolume.VolumeType;
import org.apache.hadoop.ozone.container.common.volume.StorageVolumeChecker;
+import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
import
org.apache.hadoop.ozone.container.keyvalue.statemachine.background.StaleRecoveringContainerScrubbingService;
+import org.apache.hadoop.ozone.container.replication.ContainerImporter;
import org.apache.hadoop.ozone.container.replication.ReplicationServer;
import
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
import
org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures.SchemaV3;
@@ -204,7 +206,9 @@ public class OzoneContainer {
controller,
conf.getObject(ReplicationConfig.class),
secConf,
- certClient);
+ certClient,
+ new ContainerImporter(conf, containerSet, controller,
+ new TarContainerPacker(), volumeSet));
readChannel = new XceiverServerGrpc(
datanodeDetails, config, hddsDispatcher, certClient);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
similarity index 50%
copy from
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
copy to
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
index c1d99c9148..59316a8c8f 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
@@ -1,32 +1,24 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.hadoop.ozone.container.replication;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.List;
-
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
@@ -39,58 +31,51 @@ import
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingP
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
-import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
/**
- * Default replication implementation.
- * <p>
- * This class does the real job. Executes the download and import the container
- * to the container set.
+ * Imports container from tarball.
*/
-public class DownloadAndImportReplicator implements ContainerReplicator {
+public class ContainerImporter {
- public static final Logger LOG =
- LoggerFactory.getLogger(DownloadAndImportReplicator.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ContainerImporter.class);
public static final String CONTAINER_COPY_DIR = "container-copy";
- public static final String CONTAINER_COPY_TMP_DIR = "tmp";
-
+ private static final String CONTAINER_COPY_TMP_DIR = "tmp";
private final ContainerSet containerSet;
private final ContainerController controller;
- private final ContainerDownloader downloader;
private final TarContainerPacker packer;
private final MutableVolumeSet volumeSet;
private final VolumeChoosingPolicy volumeChoosingPolicy;
private final long containerSize;
- public DownloadAndImportReplicator(
- ConfigurationSource conf,
- ContainerSet containerSet,
- ContainerController controller,
- ContainerDownloader downloader,
- TarContainerPacker packer,
+ public ContainerImporter(ConfigurationSource conf, ContainerSet containerSet,
+ ContainerController controller, TarContainerPacker tarContainerPacker,
MutableVolumeSet volumeSet) {
this.containerSet = containerSet;
this.controller = controller;
- this.downloader = downloader;
- this.packer = packer;
+ this.packer = tarContainerPacker;
this.volumeSet = volumeSet;
try {
- this.volumeChoosingPolicy = conf.getClass(
+ volumeChoosingPolicy = conf.getClass(
HDDS_DATANODE_VOLUME_CHOOSING_POLICY, RoundRobinVolumeChoosingPolicy
.class, VolumeChoosingPolicy.class).newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
- this.containerSize = (long) conf.getStorageSize(
+ containerSize = (long) conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
-
}
public void importContainer(long containerID, Path tarFilePath,
@@ -100,72 +85,33 @@ public class DownloadAndImportReplicator implements
ContainerReplicator {
if (targetVolume == null) {
targetVolume = chooseNextVolume();
}
- KeyValueContainerData originalContainerData;
try {
- try (FileInputStream tmpContainerTarStream = new FileInputStream(
- tarFilePath.toFile())) {
+ KeyValueContainerData containerData;
+
+ try (FileInputStream input = new FileInputStream(tarFilePath.toFile())) {
byte[] containerDescriptorYaml =
- packer.unpackContainerDescriptor(tmpContainerTarStream);
- originalContainerData = (KeyValueContainerData) ContainerDataYaml
+ packer.unpackContainerDescriptor(input);
+ containerData = (KeyValueContainerData) ContainerDataYaml
.readContainer(containerDescriptorYaml);
}
- originalContainerData.setVolume(targetVolume);
-
- try (FileInputStream tempContainerTarStream = new FileInputStream(
- tarFilePath.toFile())) {
+ containerData.setVolume(targetVolume);
+ try (FileInputStream input = new FileInputStream(tarFilePath.toFile())) {
Container container = controller.importContainer(
- originalContainerData, tempContainerTarStream, packer);
-
+ containerData, input, packer);
containerSet.addContainer(container);
}
-
} finally {
try {
Files.delete(tarFilePath);
} catch (Exception ex) {
- LOG.error("Got exception while deleting downloaded container file: "
- + tarFilePath.toAbsolutePath().toString(), ex);
- }
- }
- }
-
- @Override
- public void replicate(ReplicationTask task) {
- long containerID = task.getContainerId();
-
- List<DatanodeDetails> sourceDatanodes = task.getSources();
-
- LOG.info("Starting replication of container {} from {}", containerID,
- sourceDatanodes);
-
- try {
- HddsVolume targetVolume = chooseNextVolume();
- // Wait for the download. This thread pool is limiting the parallel
- // downloads, so it's ok to block here and wait for the full download.
- Path tarFilePath =
- downloader.getContainerDataFromReplicas(containerID, sourceDatanodes,
- getUntarDirectory(targetVolume));
- if (tarFilePath == null) {
- task.setStatus(Status.FAILED);
- return;
+ LOG.error("Got exception while deleting temporary container file: "
+ + tarFilePath.toAbsolutePath(), ex);
}
- long bytes = Files.size(tarFilePath);
- LOG.info("Container {} is downloaded with size {}, starting to import.",
- containerID, bytes);
- task.setTransferredBytes(bytes);
-
- importContainer(containerID, tarFilePath, targetVolume);
-
- LOG.info("Container {} is replicated successfully", containerID);
- task.setStatus(Status.DONE);
- } catch (IOException e) {
- LOG.error("Container {} replication was unsuccessful.", containerID, e);
- task.setStatus(Status.FAILED);
}
}
- private HddsVolume chooseNextVolume() throws IOException {
+ HddsVolume chooseNextVolume() throws IOException {
// Choose volume that can hold both container in tmp and dest directory
return volumeChoosingPolicy.chooseVolume(
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
@@ -177,8 +123,4 @@ public class DownloadAndImportReplicator implements
ContainerReplicator {
return Paths.get(hddsVolume.getVolumeRootDir())
.resolve(CONTAINER_COPY_TMP_DIR).resolve(CONTAINER_COPY_DIR);
}
-
- private List<HddsVolume> getHddsVolumesList() {
- return StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList());
- }
-}
+}
\ No newline at end of file
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerUploader.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerUploader.java
new file mode 100644
index 0000000000..cb9264f2b5
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerUploader.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Client-side interface for sending a container to a target datanode.
+ */
+public interface ContainerUploader {
+ OutputStream startUpload(long containerId, DatanodeDetails target,
+ CompletableFuture<Void> callback) throws IOException;
+}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerResponseStream.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerResponseStream.java
new file mode 100644
index 0000000000..3cb6dfc7f5
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerResponseStream.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+
+/**
+ * Output stream adapter for CopyContainerResponse.
+ */
+class CopyContainerResponseStream
+ extends GrpcOutputStream<CopyContainerResponseProto> {
+
+ CopyContainerResponseStream(
+ StreamObserver<CopyContainerResponseProto> streamObserver,
+ long containerId, int bufferSize) {
+ super(streamObserver, containerId, bufferSize);
+ }
+
+ protected void sendPart(boolean eof, int length, ByteString data) {
+ CopyContainerResponseProto response =
+ CopyContainerResponseProto.newBuilder()
+ .setContainerID(getContainerId())
+ .setData(data)
+ .setEof(eof)
+ .setReadOffset(getWrittenBytes())
+ .setLen(length)
+ .build();
+ getStreamObserver().onNext(response);
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
index c1d99c9148..59f165f93b 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
@@ -17,35 +17,18 @@
*/
package org.apache.hadoop.ozone.container.replication;
-import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.List;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
-import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
-import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
-import
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
-import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
-
/**
* Default replication implementation.
* <p>
@@ -57,77 +40,14 @@ public class DownloadAndImportReplicator implements
ContainerReplicator {
public static final Logger LOG =
LoggerFactory.getLogger(DownloadAndImportReplicator.class);
- public static final String CONTAINER_COPY_DIR = "container-copy";
- public static final String CONTAINER_COPY_TMP_DIR = "tmp";
-
- private final ContainerSet containerSet;
- private final ContainerController controller;
private final ContainerDownloader downloader;
- private final TarContainerPacker packer;
- private final MutableVolumeSet volumeSet;
- private final VolumeChoosingPolicy volumeChoosingPolicy;
- private final long containerSize;
+ private final ContainerImporter containerImporter;
public DownloadAndImportReplicator(
- ConfigurationSource conf,
- ContainerSet containerSet,
- ContainerController controller,
- ContainerDownloader downloader,
- TarContainerPacker packer,
- MutableVolumeSet volumeSet) {
- this.containerSet = containerSet;
- this.controller = controller;
+ ContainerImporter containerImporter,
+ ContainerDownloader downloader) {
this.downloader = downloader;
- this.packer = packer;
- this.volumeSet = volumeSet;
- try {
- this.volumeChoosingPolicy = conf.getClass(
- HDDS_DATANODE_VOLUME_CHOOSING_POLICY, RoundRobinVolumeChoosingPolicy
- .class, VolumeChoosingPolicy.class).newInstance();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- this.containerSize = (long) conf.getStorageSize(
- ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
- ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
-
- }
-
- public void importContainer(long containerID, Path tarFilePath,
- HddsVolume hddsVolume) throws IOException {
-
- HddsVolume targetVolume = hddsVolume;
- if (targetVolume == null) {
- targetVolume = chooseNextVolume();
- }
- KeyValueContainerData originalContainerData;
- try {
- try (FileInputStream tmpContainerTarStream = new FileInputStream(
- tarFilePath.toFile())) {
- byte[] containerDescriptorYaml =
- packer.unpackContainerDescriptor(tmpContainerTarStream);
- originalContainerData = (KeyValueContainerData) ContainerDataYaml
- .readContainer(containerDescriptorYaml);
- }
- originalContainerData.setVolume(targetVolume);
-
- try (FileInputStream tempContainerTarStream = new FileInputStream(
- tarFilePath.toFile())) {
-
- Container container = controller.importContainer(
- originalContainerData, tempContainerTarStream, packer);
-
- containerSet.addContainer(container);
- }
-
- } finally {
- try {
- Files.delete(tarFilePath);
- } catch (Exception ex) {
- LOG.error("Got exception while deleting downloaded container file: "
- + tarFilePath.toAbsolutePath().toString(), ex);
- }
- }
+ this.containerImporter = containerImporter;
}
@Override
@@ -140,12 +60,12 @@ public class DownloadAndImportReplicator implements
ContainerReplicator {
sourceDatanodes);
try {
- HddsVolume targetVolume = chooseNextVolume();
+ HddsVolume targetVolume = containerImporter.chooseNextVolume();
// Wait for the download. This thread pool is limiting the parallel
// downloads, so it's ok to block here and wait for the full download.
Path tarFilePath =
downloader.getContainerDataFromReplicas(containerID, sourceDatanodes,
- getUntarDirectory(targetVolume));
+ ContainerImporter.getUntarDirectory(targetVolume));
if (tarFilePath == null) {
task.setStatus(Status.FAILED);
return;
@@ -155,7 +75,7 @@ public class DownloadAndImportReplicator implements
ContainerReplicator {
containerID, bytes);
task.setTransferredBytes(bytes);
- importContainer(containerID, tarFilePath, targetVolume);
+ containerImporter.importContainer(containerID, tarFilePath,
targetVolume);
LOG.info("Container {} is replicated successfully", containerID);
task.setStatus(Status.DONE);
@@ -165,20 +85,4 @@ public class DownloadAndImportReplicator implements
ContainerReplicator {
}
}
- private HddsVolume chooseNextVolume() throws IOException {
- // Choose volume that can hold both container in tmp and dest directory
- return volumeChoosingPolicy.chooseVolume(
- StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
- containerSize * 2);
- }
-
- public static Path getUntarDirectory(HddsVolume hddsVolume)
- throws IOException {
- return Paths.get(hddsVolume.getVolumeRootDir())
- .resolve(CONTAINER_COPY_TMP_DIR).resolve(CONTAINER_COPY_DIR);
- }
-
- private List<HddsVolume> getHddsVolumesList() {
- return StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList());
- }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
new file mode 100644
index 0000000000..aa7322428e
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerResponse;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Uploads container to target datanode via gRPC.
+ */
+public class GrpcContainerUploader implements ContainerUploader {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(GrpcContainerUploader.class);
+
+ private final SecurityConfig securityConfig;
+ private final CertificateClient certClient;
+ private final CopyContainerCompression compression;
+
+ public GrpcContainerUploader(
+ ConfigurationSource conf, CertificateClient certClient) {
+ this.certClient = certClient;
+ securityConfig = new SecurityConfig(conf);
+ compression = CopyContainerCompression.getConf(conf);
+ }
+
+ @Override
+ public OutputStream startUpload(long containerId, DatanodeDetails target,
+ CompletableFuture<Void> callback) throws IOException {
+ GrpcReplicationClient client =
+ new GrpcReplicationClient(target.getIpAddress(),
+ target.getPort(Port.Name.REPLICATION).getValue(),
+ securityConfig, certClient, compression.toString());
+ StreamObserver<SendContainerRequest> requestStream = client.upload(
+ new SendContainerResponseStreamObserver(containerId, target,
callback));
+ return new SendContainerOutputStream(requestStream, containerId,
+ GrpcReplicationService.BUFFER_SIZE);
+ }
+
+ /**
+ *
+ */
+ private static class SendContainerResponseStreamObserver
+ implements StreamObserver<SendContainerResponse> {
+ private final long containerId;
+ private final DatanodeDetails target;
+ private final CompletableFuture<Void> callback;
+
+ SendContainerResponseStreamObserver(long containerId,
+ DatanodeDetails target, CompletableFuture<Void> callback) {
+ this.containerId = containerId;
+ this.target = target;
+ this.callback = callback;
+ }
+
+ @Override
+ public void onNext(SendContainerResponse sendContainerResponse) {
+ LOG.info("Response for upload container {} to {}", containerId, target);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ LOG.warn("Failed to upload container {} to {}", containerId, target, t);
+ callback.completeExceptionally(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ LOG.info("Finished uploading container {} to {}", containerId, target);
+ callback.complete(null);
+ }
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
index c09c8f6743..280e8cef75 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.ozone.container.replication;
-import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
@@ -31,12 +30,12 @@ import java.io.OutputStream;
* Adapter from {@code OutputStream} to gRPC {@code StreamObserver}.
* Data is buffered in a limited buffer of the specified size.
*/
-class GrpcOutputStream extends OutputStream {
+abstract class GrpcOutputStream<T> extends OutputStream {
private static final Logger LOG =
LoggerFactory.getLogger(GrpcOutputStream.class);
- private final StreamObserver<CopyContainerResponseProto> responseObserver;
+ private final StreamObserver<T> streamObserver;
private final ByteString.Output buffer;
@@ -46,10 +45,9 @@ class GrpcOutputStream extends OutputStream {
private long writtenBytes;
- GrpcOutputStream(
- StreamObserver<CopyContainerResponseProto> responseObserver,
+ GrpcOutputStream(StreamObserver<T> streamObserver,
long containerId, int bufferSize) {
- this.responseObserver = responseObserver;
+ this.streamObserver = streamObserver;
this.containerId = containerId;
this.bufferSize = bufferSize;
buffer = ByteString.newOutput(bufferSize);
@@ -63,7 +61,7 @@ class GrpcOutputStream extends OutputStream {
flushBuffer(false);
}
} catch (Exception ex) {
- responseObserver.onError(ex);
+ streamObserver.onError(ex);
}
}
@@ -94,7 +92,7 @@ class GrpcOutputStream extends OutputStream {
len = Math.min(bufferSize, remaining);
}
} catch (Exception ex) {
- responseObserver.onError(ex);
+ streamObserver.onError(ex);
}
}
@@ -103,27 +101,34 @@ class GrpcOutputStream extends OutputStream {
flushBuffer(true);
LOG.info("Sent {} bytes for container {}",
writtenBytes, containerId);
- responseObserver.onCompleted();
+ streamObserver.onCompleted();
buffer.close();
}
+ protected long getContainerId() {
+ return containerId;
+ }
+
+ protected long getWrittenBytes() {
+ return writtenBytes;
+ }
+
+ protected StreamObserver<T> getStreamObserver() {
+ return streamObserver;
+ }
+
private void flushBuffer(boolean eof) {
int length = buffer.size();
if (length > 0) {
ByteString data = buffer.toByteString();
LOG.debug("Sending {} bytes (of type {}) for container {}",
length, data.getClass().getSimpleName(), containerId);
- CopyContainerResponseProto response =
- CopyContainerResponseProto.newBuilder()
- .setContainerID(containerId)
- .setData(data)
- .setEof(eof)
- .setReadOffset(writtenBytes)
- .setLen(length)
- .build();
- responseObserver.onNext(response);
+ sendPart(eof, length, data);
writtenBytes += length;
buffer.reset();
}
}
+
+ protected abstract void sendPart(boolean eof, int length, ByteString data);
+
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
index fa4140040c..be9a0ece48 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
@@ -30,6 +30,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerResponse;
import
org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc;
import
org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceStub;
import org.apache.hadoop.hdds.security.ssl.KeyStoresFactory;
@@ -60,12 +62,10 @@ public class GrpcReplicationClient implements AutoCloseable
{
private final IntraDatanodeProtocolServiceStub client;
- private final Path workingDirectory;
-
private final ContainerProtos.CopyContainerCompressProto compression;
public GrpcReplicationClient(
- String host, int port, Path workingDir,
+ String host, int port,
SecurityConfig secConfig, CertificateClient certClient,
String compression)
throws IOException {
@@ -92,12 +92,11 @@ public class GrpcReplicationClient implements AutoCloseable
{
}
channel = channelBuilder.build();
client = IntraDatanodeProtocolServiceGrpc.newStub(channel);
- workingDirectory = workingDir;
this.compression =
ContainerProtos.CopyContainerCompressProto.valueOf(compression);
}
- public CompletableFuture<Path> download(long containerId) {
+ public CompletableFuture<Path> download(long containerId, Path dir) {
CopyContainerRequestProto request =
CopyContainerRequestProto.newBuilder()
.setContainerID(containerId)
@@ -108,7 +107,7 @@ public class GrpcReplicationClient implements AutoCloseable
{
CompletableFuture<Path> response = new CompletableFuture<>();
- Path destinationPath = getWorkingDirectory()
+ Path destinationPath = dir
.resolve(ContainerUtils.getContainerTarName(containerId));
client.download(request,
@@ -117,8 +116,9 @@ public class GrpcReplicationClient implements AutoCloseable
{
return response;
}
- private Path getWorkingDirectory() {
- return workingDirectory;
+ public StreamObserver<SendContainerRequest> upload(
+ StreamObserver<SendContainerResponse> responseObserver) {
+ return client.upload(responseObserver);
}
public void shutdown() {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
index 7246ae99f0..533a3b40fe 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
@@ -19,9 +19,12 @@
package org.apache.hadoop.ozone.container.replication;
import java.io.IOException;
+import java.io.OutputStream;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerResponse;
import
org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
@@ -37,12 +40,15 @@ public class GrpcReplicationService extends
private static final Logger LOG =
LoggerFactory.getLogger(GrpcReplicationService.class);
- private static final int BUFFER_SIZE = 1024 * 1024;
+ static final int BUFFER_SIZE = 1024 * 1024;
private final ContainerReplicationSource source;
+ private final ContainerImporter importer;
- public GrpcReplicationService(ContainerReplicationSource source) {
+ public GrpcReplicationService(ContainerReplicationSource source,
+ ContainerImporter importer) {
this.source = source;
+ this.importer = importer;
}
@Override
@@ -55,8 +61,8 @@ public class GrpcReplicationService extends
LOG.info("Streaming container data ({}) to other datanode " +
"with compression {}", containerID, compression);
try {
- GrpcOutputStream outputStream =
- new GrpcOutputStream(responseObserver, containerID, BUFFER_SIZE);
+ OutputStream outputStream = new CopyContainerResponseStream(
+ responseObserver, containerID, BUFFER_SIZE);
source.copyData(containerID, outputStream, compression);
} catch (IOException e) {
LOG.error("Error streaming container {}", containerID, e);
@@ -64,4 +70,10 @@ public class GrpcReplicationService extends
}
}
+ @Override
+ public StreamObserver<SendContainerRequest> upload(
+ StreamObserver<SendContainerResponse> responseObserver) {
+
+ return new SendContainerRequestHandler(importer, responseObserver);
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
new file mode 100644
index 0000000000..aa1a2966f7
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.OutputStream;
+import java.util.concurrent.CompletableFuture;
+
+import static
org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
+
+/**
+ * Pushes the container to the target datanode.
+ */
+public class PushReplicator implements ContainerReplicator {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PushReplicator.class);
+
+ private final ContainerReplicationSource source;
+ private final ContainerUploader uploader;
+
+ public PushReplicator(ContainerReplicationSource source,
+ ContainerUploader uploader) {
+ this.source = source;
+ this.uploader = uploader;
+ }
+
+ @Override
+ public void replicate(ReplicationTask task) {
+ long containerID = task.getContainerId();
+ DatanodeDetails target = task.getTarget();
+ CompletableFuture<Void> fut = new CompletableFuture<>();
+
+ source.prepare(containerID);
+
+ OutputStream output = null;
+ try {
+ output = uploader.startUpload(containerID, target, fut);
+ source.copyData(containerID, output, NO_COMPRESSION.name());
+ fut.get();
+ task.setStatus(Status.DONE);
+ } catch (Exception e) {
+ LOG.warn("Container {} replication was unsuccessful.", containerID, e);
+ task.setStatus(Status.FAILED);
+ } finally {
+ // output may have already been closed, ignore such errors
+ IOUtils.cleanupWithLogger(LOG, output);
+ }
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
index fcad690d4f..cdc1975360 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
@@ -59,13 +59,15 @@ public class ReplicationServer {
private ContainerController controller;
private int port;
+ private final ContainerImporter importer;
public ReplicationServer(ContainerController controller,
ReplicationConfig replicationConfig, SecurityConfig secConf,
- CertificateClient caClient) {
+ CertificateClient caClient, ContainerImporter importer) {
this.secConf = secConf;
this.caClient = caClient;
this.controller = controller;
+ this.importer = importer;
this.port = replicationConfig.getPort();
init();
}
@@ -74,7 +76,8 @@ public class ReplicationServer {
NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
.addService(ServerInterceptors.intercept(new GrpcReplicationService(
- new OnDemandContainerReplicationSource(controller)
+ new OnDemandContainerReplicationSource(controller),
+ importer
), new GrpcServerInterceptor()));
if (secConf.isSecurityEnabled() && secConf.isGrpcTlsEnabled()) {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
index 3f61273885..482ca1ceb4 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -19,8 +19,8 @@ package org.apache.hadoop.ozone.container.replication;
import java.time.Clock;
import java.util.OptionalLong;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentHashMap.KeySetView;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -48,7 +48,8 @@ public class ReplicationSupervisor {
LoggerFactory.getLogger(ReplicationSupervisor.class);
private final ContainerSet containerSet;
- private final ContainerReplicator replicator;
+ private final ContainerReplicator pullReplicator;
+ private final ContainerReplicator pushReplicator;
private final ExecutorService executor;
private final StateContext context;
private final Clock clock;
@@ -63,16 +64,18 @@ public class ReplicationSupervisor {
* or queued for download. Tracked so we don't schedule > 1
* concurrent download for the same container.
*/
- private final KeySetView<Object, Boolean> containersInFlight;
+ private final Set<ReplicationTask> inFlight;
@VisibleForTesting
ReplicationSupervisor(
ContainerSet containerSet, StateContext context,
- ContainerReplicator replicator, ExecutorService executor,
+ ContainerReplicator pullReplicator, ContainerReplicator pushReplicator,
+ ExecutorService executor,
Clock clock) {
this.containerSet = containerSet;
- this.replicator = replicator;
- this.containersInFlight = ConcurrentHashMap.newKeySet();
+ this.pullReplicator = pullReplicator;
+ this.pushReplicator = pushReplicator;
+ this.inFlight = ConcurrentHashMap.newKeySet();
this.executor = executor;
this.context = context;
this.clock = clock;
@@ -80,22 +83,24 @@ public class ReplicationSupervisor {
public ReplicationSupervisor(
ContainerSet containerSet, StateContext context,
- ContainerReplicator replicator, ReplicationConfig replicationConfig,
- Clock clock) {
- this(containerSet, context, replicator, new ThreadPoolExecutor(
- replicationConfig.getReplicationMaxStreams(),
- replicationConfig.getReplicationMaxStreams(), 60, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(),
- new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("ContainerReplicationThread-%d")
- .build()), clock);
+ ContainerReplicator pullReplicator, ContainerReplicator pushReplicator,
+ ReplicationConfig replicationConfig, Clock clock) {
+ this(containerSet, context, pullReplicator, pushReplicator,
+ new ThreadPoolExecutor(
+ replicationConfig.getReplicationMaxStreams(),
+ replicationConfig.getReplicationMaxStreams(), 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("ContainerReplicationThread-%d")
+ .build()),
+ clock);
}
/**
* Queue an asynchronous download of the given container.
*/
public void addTask(ReplicationTask task) {
- if (containersInFlight.add(task.getContainerId())) {
+ if (inFlight.add(task)) {
executor.execute(new TaskRunner(task));
}
}
@@ -125,7 +130,7 @@ public class ReplicationSupervisor {
* @return Count of in-flight replications.
*/
public int getInFlightReplications() {
- return containersInFlight.size();
+ return inFlight.size();
}
/**
@@ -170,37 +175,35 @@ public class ReplicationSupervisor {
}
}
- if (containerSet.getContainer(task.getContainerId()) != null) {
+ final boolean pull = task.getTarget() == null;
+ if (containerSet.getContainer(task.getContainerId()) != null && pull) {
LOG.debug("Container {} has already been downloaded.", containerId);
return;
}
- task.setStatus(Status.DOWNLOADING);
+ task.setStatus(Status.IN_PROGRESS);
+ ContainerReplicator replicator = pull ? pullReplicator :
pushReplicator;
replicator.replicate(task);
if (task.getStatus() == Status.FAILED) {
- LOG.error(
- "Container {} can't be downloaded from any of the datanodes.",
- containerId);
+ LOG.error("Failed {}", this);
failureCounter.incrementAndGet();
} else if (task.getStatus() == Status.DONE) {
- LOG.info("Container {} is replicated.", containerId);
+ LOG.info("Successful {}", this);
successCounter.incrementAndGet();
}
} catch (Exception e) {
task.setStatus(Status.FAILED);
- LOG.error("Encountered error while replicating container {}.",
- containerId, e);
+ LOG.error("Failed {}", this, e);
failureCounter.incrementAndGet();
} finally {
- containersInFlight.remove(containerId);
+ inFlight.remove(task);
}
}
@Override
public String toString() {
- return "replicate container command for container "
- + task.getContainerId();
+ return task.getCommand().toString();
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
index 7c57a73b33..1e90ff0339 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
@@ -31,15 +31,9 @@ public class ReplicationTask {
private volatile Status status = Status.QUEUED;
- private final long containerId;
-
- private final List<DatanodeDetails> sources;
-
private final Instant queued = Instant.now();
- private final long deadlineMsSinceEpoch;
-
- private final long term;
+ private final ReplicateContainerCommand cmd;
/**
* Counter for the transferred bytes.
@@ -47,20 +41,7 @@ public class ReplicationTask {
private long transferredBytes;
public ReplicationTask(ReplicateContainerCommand cmd) {
- this.containerId = cmd.getContainerID();
- this.sources = cmd.getSourceDatanodes();
- this.deadlineMsSinceEpoch = cmd.getDeadline();
- this.term = cmd.getTerm();
- }
-
- /**
- * Intended to only be used in tests.
- */
- protected ReplicationTask(
- long containerId,
- List<DatanodeDetails> sources
- ) {
- this(new ReplicateContainerCommand(containerId, sources));
+ this.cmd = cmd;
}
/**
@@ -68,7 +49,7 @@ public class ReplicationTask {
* A returned value of zero indicates no deadline.
*/
public long getDeadline() {
- return deadlineMsSinceEpoch;
+ return cmd.getDeadline();
}
@Override
@@ -80,20 +61,21 @@ public class ReplicationTask {
return false;
}
ReplicationTask that = (ReplicationTask) o;
- return containerId == that.containerId;
+ return getContainerId() == that.getContainerId() &&
+ Objects.equals(getTarget(), that.getTarget());
}
@Override
public int hashCode() {
- return Objects.hash(containerId);
+ return Objects.hash(getContainerId(), getTarget());
}
public long getContainerId() {
- return containerId;
+ return cmd.getContainerID();
}
public List<DatanodeDetails> getSources() {
- return sources;
+ return cmd.getSourceDatanodes();
}
public Status getStatus() {
@@ -108,8 +90,7 @@ public class ReplicationTask {
public String toString() {
return "ReplicationTask{" +
"status=" + status +
- ", containerId=" + containerId +
- ", sources=" + sources +
+ ", cmd={" + cmd + "}" +
", queued=" + queued +
'}';
}
@@ -127,7 +108,15 @@ public class ReplicationTask {
}
long getTerm() {
- return term;
+ return cmd.getTerm();
+ }
+
+ DatanodeDetails getTarget() {
+ return cmd.getTargetDatanode();
+ }
+
+ ReplicateContainerCommand getCommand() {
+ return cmd;
}
/**
@@ -135,7 +124,7 @@ public class ReplicationTask {
*/
public enum Status {
QUEUED,
- DOWNLOADING,
+ IN_PROGRESS,
FAILED,
DONE
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java
new file mode 100644
index 0000000000..622bd4f4a3
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+
+/**
+ * Output stream adapter for SendContainerResponse.
+ */
+class SendContainerOutputStream extends GrpcOutputStream<SendContainerRequest>
{
+ SendContainerOutputStream(
+ StreamObserver<SendContainerRequest> streamObserver,
+ long containerId, int bufferSize) {
+ super(streamObserver, containerId, bufferSize);
+ }
+
+ @Override
+ protected void sendPart(boolean eof, int length, ByteString data) {
+ SendContainerRequest request = SendContainerRequest.newBuilder()
+ .setContainerID(getContainerId())
+ .setData(data)
+ .setOffset(getWrittenBytes())
+ .build();
+ getStreamObserver().onNext(request);
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
new file mode 100644
index 0000000000..b12aad452c
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerResponse;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import static org.apache.ratis.util.Preconditions.assertSame;
+
+/**
+ * Handles incoming container pushed by other datanode.
+ */
+class SendContainerRequestHandler
+ implements StreamObserver<SendContainerRequest> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SendContainerRequestHandler.class);
+
+ private final ContainerImporter importer;
+ private final StreamObserver<SendContainerResponse> responseObserver;
+
+ private long containerId = -1;
+ private long nextOffset;
+ private OutputStream output;
+ private HddsVolume volume;
+ private Path path;
+
+ SendContainerRequestHandler(
+ ContainerImporter importer,
+ StreamObserver<SendContainerResponse> responseObserver) {
+ this.importer = importer;
+ this.responseObserver = responseObserver;
+ }
+
+ @Override
+ public void onNext(SendContainerRequest req) {
+ try {
+ final long length = req.getData().size();
+ LOG.info("Received part for container id:{} offset:{} len:{}",
+ req.getContainerID(), req.getOffset(), length);
+
+ assertSame(nextOffset, req.getOffset(), "offset");
+
+ if (containerId == -1) {
+ containerId = req.getContainerID();
+ volume = importer.chooseNextVolume();
+ Path dir = ContainerImporter.getUntarDirectory(volume);
+ Files.createDirectories(dir);
+ path = dir.resolve(ContainerUtils.getContainerTarName(containerId));
+ output = Files.newOutputStream(path);
+ }
+
+ assertSame(containerId, req.getContainerID(), "containerID");
+
+ req.getData().writeTo(output);
+
+ nextOffset += length;
+ } catch (Throwable t) {
+ onError(t);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ LOG.error("Error", t);
+ closeOutput();
+ deleteTarball();
+ responseObserver.onError(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ if (output == null) {
+ LOG.warn("Received container without any parts");
+ return;
+ }
+
+ LOG.info("Received all parts for container {}", containerId);
+ closeOutput();
+
+ try {
+ importer.importContainer(containerId, path, volume);
+ LOG.info("Imported container {}", containerId);
+ responseObserver.onNext(SendContainerResponse.newBuilder().build());
+ responseObserver.onCompleted();
+ } catch (Throwable t) {
+ LOG.info("Failed to import container {}", containerId, t);
+ deleteTarball();
+ responseObserver.onError(t);
+ }
+ }
+
+ private void closeOutput() {
+ IOUtils.cleanupWithLogger(LOG, output);
+ output = null;
+ }
+
+ private void deleteTarball() {
+ try {
+ Files.deleteIfExists(path);
+ } catch (IOException e) {
+ LOG.warn("Error removing {}", path);
+ }
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
index 329fd64fa2..0e81e512d3 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
@@ -37,9 +37,6 @@ import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static
org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator.CONTAINER_COPY_DIR;
-
-
/**
* Simple ContainerDownloaderImplementation to download the missing container
* from the first available datanode.
@@ -70,7 +67,7 @@ public class SimpleContainerDownloader implements
ContainerDownloader {
if (downloadDir == null) {
downloadDir = Paths.get(System.getProperty("java.io.tmpdir"))
- .resolve(CONTAINER_COPY_DIR);
+ .resolve(ContainerImporter.CONTAINER_COPY_DIR);
}
final List<DatanodeDetails> shuffledDatanodes =
@@ -118,9 +115,9 @@ public class SimpleContainerDownloader implements
ContainerDownloader {
GrpcReplicationClient grpcReplicationClient =
new GrpcReplicationClient(datanode.getIpAddress(),
datanode.getPort(Name.REPLICATION).getValue(),
- downloadDir, securityConfig, certClient, compression);
+ securityConfig, certClient, compression);
- result = grpcReplicationClient.download(containerId)
+ result = grpcReplicationClient.download(containerId, downloadDir)
.whenComplete((r, ex) -> {
try {
grpcReplicationClient.close();
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
index 3f5959a281..e090df42fe 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto;
import org.apache.hadoop.hdds.protocol.proto
@@ -33,29 +34,48 @@ import org.apache.hadoop.hdds.protocol.proto
import com.google.common.base.Preconditions;
+import static java.util.Collections.emptyList;
+
/**
* SCM command to request replication of a container.
*/
-public class ReplicateContainerCommand
+public final class ReplicateContainerCommand
extends SCMCommand<ReplicateContainerCommandProto> {
private final long containerID;
private final List<DatanodeDetails> sourceDatanodes;
+ private final DatanodeDetails targetDatanode;
private int replicaIndex = 0;
- public ReplicateContainerCommand(long containerID,
+ public static ReplicateContainerCommand fromSources(long containerID,
List<DatanodeDetails> sourceDatanodes) {
- super();
+ return new ReplicateContainerCommand(containerID, sourceDatanodes, null);
+ }
+
+ public static ReplicateContainerCommand toTarget(long containerID,
+ DatanodeDetails target) {
+ return new ReplicateContainerCommand(containerID, emptyList(), target);
+ }
+
+ public static ReplicateContainerCommand forTest(long containerID) {
+ return new ReplicateContainerCommand(containerID, emptyList(), null);
+ }
+
+ private ReplicateContainerCommand(long containerID,
+ List<DatanodeDetails> sourceDatanodes, DatanodeDetails target) {
this.containerID = containerID;
this.sourceDatanodes = sourceDatanodes;
+ this.targetDatanode = target;
}
// Should be called only for protobuf conversion
- public ReplicateContainerCommand(long containerID,
- List<DatanodeDetails> sourceDatanodes, long id) {
+ private ReplicateContainerCommand(long containerID,
+ List<DatanodeDetails> sourceDatanodes, long id,
+ DatanodeDetails targetDatanode) {
super(id);
this.containerID = containerID;
this.sourceDatanodes = sourceDatanodes;
+ this.targetDatanode = targetDatanode;
}
public void setReplicaIndex(int index) {
@@ -76,6 +96,9 @@ public class ReplicateContainerCommand
builder.addSources(dd.getProtoBufMessage());
}
builder.setReplicaIndex(replicaIndex);
+ if (targetDatanode != null) {
+ builder.setTarget(targetDatanode.getProtoBufMessage());
+ }
return builder.build();
}
@@ -83,15 +106,19 @@ public class ReplicateContainerCommand
ReplicateContainerCommandProto protoMessage) {
Preconditions.checkNotNull(protoMessage);
- List<DatanodeDetails> datanodeDetails =
- protoMessage.getSourcesList()
- .stream()
+ List<DatanodeDetailsProto> sources = protoMessage.getSourcesList();
+ List<DatanodeDetails> sourceNodes = !sources.isEmpty()
+ ? sources.stream()
.map(DatanodeDetails::getFromProtoBuf)
- .collect(Collectors.toList());
+ .collect(Collectors.toList())
+ : emptyList();
+ DatanodeDetails targetNode = protoMessage.hasTarget()
+ ? DatanodeDetails.getFromProtoBuf(protoMessage.getTarget())
+ : null;
ReplicateContainerCommand cmd =
new ReplicateContainerCommand(protoMessage.getContainerID(),
- datanodeDetails, protoMessage.getCmdId());
+ sourceNodes, protoMessage.getCmdId(), targetNode);
if (protoMessage.hasReplicaIndex()) {
cmd.setReplicaIndex(protoMessage.getReplicaIndex());
}
@@ -106,6 +133,10 @@ public class ReplicateContainerCommand
return sourceDatanodes;
}
+ public DatanodeDetails getTargetDatanode() {
+ return targetDatanode;
+ }
+
public int getReplicaIndex() {
return replicaIndex;
}
@@ -116,7 +147,11 @@ public class ReplicateContainerCommand
sb.append(getType());
sb.append(": containerId: ").append(getContainerID());
sb.append(", replicaIndex: ").append(getReplicaIndex());
- sb.append(", sourceNodes: ").append(sourceDatanodes);
+ if (targetDatanode != null) {
+ sb.append(", targetNode: ").append(targetDatanode);
+ } else {
+ sb.append(", sourceNodes: ").append(sourceDatanodes);
+ }
return sb.toString();
}
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index 331d9f2bd9..e05fe45229 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.ozone.container.common.statemachine;
import static
com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
-import static java.util.Collections.emptyList;
import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction.Action.CLOSE;
import static org.apache.ozone.test.GenericTestUtils.waitFor;
@@ -615,10 +614,10 @@ public class TestStateContext {
@Test
public void testCommandQueueSummary() throws IOException {
StateContext ctx = createSubject();
- ctx.addCommand(new ReplicateContainerCommand(1, null));
+ ctx.addCommand(ReplicateContainerCommand.forTest(1));
ctx.addCommand(new ClosePipelineCommand(PipelineID.randomId()));
- ctx.addCommand(new ReplicateContainerCommand(2, null));
- ctx.addCommand(new ReplicateContainerCommand(3, null));
+ ctx.addCommand(ReplicateContainerCommand.forTest(2));
+ ctx.addCommand(ReplicateContainerCommand.forTest(3));
ctx.addCommand(new ClosePipelineCommand(PipelineID.randomId()));
ctx.addCommand(new CloseContainerCommand(1, PipelineID.randomId()));
@@ -682,7 +681,7 @@ public class TestStateContext {
}
private static SCMCommand<?> someCommand() {
- return new ReplicateContainerCommand(1, emptyList());
+ return ReplicateContainerCommand.forTest(1);
}
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcOutputStream.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
similarity index 88%
rename from
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcOutputStream.java
rename to
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
index 0da88e1645..e342ffb338 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcOutputStream.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.ozone.container.replication;
-import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.junit.jupiter.api.BeforeEach;
@@ -44,23 +43,30 @@ import static org.mockito.Mockito.verify;
* Tests for {@code GrpcOutputStream}.
*/
@ExtendWith(MockitoExtension.class)
-public class TestGrpcOutputStream {
+abstract class GrpcOutputStreamTest<T> {
private static final Random RND = new Random();
private final long containerId = RND.nextLong();
private final int bufferSize = RND.nextInt(1024) + 128 + 1;
+ private final Class<? extends T> clazz;
@Mock
- private StreamObserver<CopyContainerResponseProto> observer;
+ private StreamObserver<T> observer;
private OutputStream subject;
+ protected GrpcOutputStreamTest(Class<? extends T> clazz) {
+ this.clazz = clazz;
+ }
+
@BeforeEach
- public void setUp() throws Exception {
- subject = new GrpcOutputStream(observer, containerId, bufferSize);
+ public void setUp() {
+ subject = createSubject();
}
+ protected abstract OutputStream createSubject();
+
@Test
public void seriesOfBytesInSingleResponse() throws IOException {
byte[] bytes = getRandomBytes(5);
@@ -158,25 +164,19 @@ public class TestGrpcOutputStream {
expectedResponseCount++;
}
- ArgumentCaptor<CopyContainerResponseProto> captor =
- ArgumentCaptor.forClass(CopyContainerResponseProto.class);
+ ArgumentCaptor<T> captor =
+ ArgumentCaptor.forClass(clazz);
verify(observer, times(expectedResponseCount)).onNext(captor.capture());
- List<CopyContainerResponseProto> responses =
+ List<T> responses =
new ArrayList<>(captor.getAllValues());
for (int i = 0; i < expectedResponseCount; i++) {
- CopyContainerResponseProto response = responses.get(i);
- assertEquals(containerId, response.getContainerID());
-
+ T response = responses.get(i);
int expectedOffset = i * bufferSize;
- assertEquals(expectedOffset, response.getReadOffset());
-
int size = Math.min(bufferSize, bytes.length - expectedOffset);
- assertEquals(size, response.getLen());
-
byte[] part = new byte[size];
System.arraycopy(bytes, expectedOffset, part, 0, size);
- ByteString data = response.getData();
+ ByteString data = verifyPart(response, expectedOffset, size);
assertArrayEquals(part, data.toByteArray());
// we don't want concatenated ByteStrings
@@ -186,6 +186,9 @@ public class TestGrpcOutputStream {
verify(observer, times(1)).onCompleted();
}
+ protected abstract ByteString verifyPart(
+ T response, int expectedOffset, int size);
+
private static byte[] concat(byte[]... parts) {
int length = Arrays.stream(parts).mapToInt(each -> each.length).sum();
byte[] bytes = new byte[length];
@@ -210,4 +213,15 @@ public class TestGrpcOutputStream {
return bytes;
}
+ long getContainerId() {
+ return containerId;
+ }
+
+ StreamObserver<T> getObserver() {
+ return observer;
+ }
+
+ int getBufferSize() {
+ return bufferSize;
+ }
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java
index 613f463185..11c362c8ab 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorScheduling.java
@@ -34,6 +34,8 @@ import
org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import static
org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand.fromSources;
+
/**
* Helper to check scheduling efficiency.
* <p>
@@ -113,7 +115,7 @@ public class ReplicationSupervisorScheduling {
}
}
- }, replicationConfig, Clock.system(ZoneId.systemDefault()));
+ }, null, replicationConfig, Clock.system(ZoneId.systemDefault()));
final long start = System.currentTimeMillis();
@@ -121,7 +123,7 @@ public class ReplicationSupervisorScheduling {
for (int i = 0; i < 100; i++) {
List<DatanodeDetails> sources = new ArrayList<>();
sources.add(datanodes.get(random.nextInt(datanodes.size())));
- rs.addTask(new ReplicationTask(i, sources));
+ rs.addTask(new ReplicationTask(fromSources(i, sources)));
}
rs.shutdownAfterFinish();
final long executionTime = System.currentTimeMillis() - start;
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestCopyContainerResponseStream.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestCopyContainerResponseStream.java
new file mode 100644
index 0000000000..54fc9b7e57
--- /dev/null
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestCopyContainerResponseStream.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
+import java.io.OutputStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test for {@link CopyContainerResponseStream}.
+ */
+class TestCopyContainerResponseStream
+ extends GrpcOutputStreamTest<CopyContainerResponseProto> {
+
+ TestCopyContainerResponseStream() {
+ super(CopyContainerResponseProto.class);
+ }
+
+ @Override
+ protected OutputStream createSubject() {
+ return new CopyContainerResponseStream(getObserver(),
+ getContainerId(), getBufferSize());
+ }
+
+ protected ByteString verifyPart(CopyContainerResponseProto response,
+ int expectedOffset, int size) {
+ assertEquals(getContainerId(), response.getContainerID());
+ assertEquals(expectedOffset, response.getReadOffset());
+ assertEquals(size, response.getLen());
+ return response.getData();
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestMeasuredReplicator.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestMeasuredReplicator.java
index 4f7bd048f9..987c47835d 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestMeasuredReplicator.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestMeasuredReplicator.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.ozone.container.replication;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
@@ -28,6 +27,8 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import static
org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand.forTest;
+
/**
* Test replicator metric measurement.
*/
@@ -63,9 +64,9 @@ public class TestMeasuredReplicator {
@Test
public void measureFailureSuccessAndBytes() {
//WHEN
- measuredReplicator.replicate(new ReplicationTask(1L, new ArrayList<>()));
- measuredReplicator.replicate(new ReplicationTask(2L, new ArrayList<>()));
- measuredReplicator.replicate(new ReplicationTask(3L, new ArrayList<>()));
+ measuredReplicator.replicate(new ReplicationTask(forTest(1)));
+ measuredReplicator.replicate(new ReplicationTask(forTest(2)));
+ measuredReplicator.replicate(new ReplicationTask(forTest(3)));
//THEN
//even containers should be failed
@@ -83,9 +84,9 @@ public class TestMeasuredReplicator {
public void testReplicationTime() throws Exception {
//WHEN
//will wait at least the 300ms
- measuredReplicator.replicate(new ReplicationTask(101L, new ArrayList<>()));
- measuredReplicator.replicate(new ReplicationTask(201L, new ArrayList<>()));
- measuredReplicator.replicate(new ReplicationTask(300L, new ArrayList<>()));
+ measuredReplicator.replicate(new ReplicationTask(forTest(101)));
+ measuredReplicator.replicate(new ReplicationTask(forTest(201)));
+ measuredReplicator.replicate(new ReplicationTask(forTest(300)));
//THEN
//even containers should be failed
@@ -101,7 +102,7 @@ public class TestMeasuredReplicator {
public void testFailureTimeSuccessExcluded() {
//WHEN
//will wait at least the 15ms
- measuredReplicator.replicate(new ReplicationTask(15L, new ArrayList<>()));
+ measuredReplicator.replicate(new ReplicationTask(forTest(15)));
//THEN
//even containers should be failed, supposed to be zero
@@ -112,7 +113,7 @@ public class TestMeasuredReplicator {
public void testSuccessTimeFailureExcluded() {
//WHEN
//will wait at least the 10ms
- measuredReplicator.replicate(new ReplicationTask(10L, new ArrayList<>()));
+ measuredReplicator.replicate(new ReplicationTask(forTest(10)));
//THEN
//even containers should be failed, supposed to be zero
@@ -122,7 +123,7 @@ public class TestMeasuredReplicator {
@Test
public void testReplicationQueueTimeMetrics() {
final Instant queued = Instant.now().minus(1, ChronoUnit.SECONDS);
- ReplicationTask task = new ReplicationTask(100L, new ArrayList<>()) {
+ ReplicationTask task = new ReplicationTask(forTest(100)) {
@Override
public Instant getQueued() {
return queued;
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
index 2ea47f5972..6bc58d2270 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
@@ -82,8 +82,12 @@ public class TestReplicationSupervisor {
};
private final AtomicReference<ContainerReplicator> replicatorRef =
new AtomicReference<>();
- private final ContainerReplicator mutableReplicator =
+ private final AtomicReference<ContainerReplicator> pushReplicatorRef =
+ new AtomicReference<>();
+ private final ContainerReplicator pullReplicator =
task -> replicatorRef.get().replicate(task);
+ private final ContainerReplicator pushReplicator =
+ task -> pushReplicatorRef.get().replicate(task);
private ContainerSet set;
@@ -251,8 +255,8 @@ public class TestReplicationSupervisor {
@Test
public void testDownloadAndImportReplicatorFailure() throws IOException {
ReplicationSupervisor supervisor =
- new ReplicationSupervisor(set, context, mutableReplicator,
- newDirectExecutorService(), clock);
+ new ReplicationSupervisor(set, context, pullReplicator,
+ pushReplicator, newDirectExecutorService(), clock);
OzoneConfiguration conf = new OzoneConfiguration();
// Mock to fetch an exception in the importContainer method.
@@ -271,10 +275,12 @@ public class TestReplicationSupervisor {
Mockito.when(volumeSet.getVolumesList())
.thenReturn(Collections.singletonList(
new HddsVolume.Builder(testDir).conf(conf).build()));
- ContainerReplicator replicatorFactory =
- new DownloadAndImportReplicator(conf, set, null, moc, null, volumeSet);
+ ContainerImporter importer =
+ new ContainerImporter(conf, set, null, null, volumeSet);
+ ContainerReplicator replicator =
+ new DownloadAndImportReplicator(importer, moc);
- replicatorRef.set(replicatorFactory);
+ replicatorRef.set(replicator);
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(DownloadAndImportReplicator.LOG);
@@ -340,8 +346,8 @@ public class TestReplicationSupervisor {
Function<ReplicationSupervisor, ContainerReplicator> replicatorFactory,
ExecutorService executor) {
ReplicationSupervisor supervisor =
- new ReplicationSupervisor(set, context, mutableReplicator, executor,
- clock);
+ new ReplicationSupervisor(set, context, pullReplicator, pushReplicator,
+ executor, clock);
replicatorRef.set(replicatorFactory.apply(supervisor));
return supervisor;
}
@@ -353,7 +359,7 @@ public class TestReplicationSupervisor {
private static ReplicateContainerCommand createCommand(long containerId) {
ReplicateContainerCommand cmd =
- new ReplicateContainerCommand(containerId, emptyList());
+ ReplicateContainerCommand.forTest(containerId);
cmd.setTerm(CURRENT_TERM);
return cmd;
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
index b73c04d399..85c7bc0f08 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
@@ -40,10 +40,9 @@ import
org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointT
import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
+import org.apache.hadoop.ozone.container.replication.ContainerImporter;
import
org.apache.hadoop.ozone.container.replication.ContainerReplicationSource;
-import
org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator;
import
org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
-import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
import org.apache.ozone.test.LambdaTestUtils;
import org.junit.After;
import org.junit.Assert;
@@ -666,11 +665,10 @@ public class TestDatanodeUpgradeToScmHA {
* {@code containerID}.
*/
public void importContainer(long containerID, File source) throws Exception {
- DownloadAndImportReplicator replicator =
- new DownloadAndImportReplicator(dsm.getConf(),
+ ContainerImporter replicator =
+ new ContainerImporter(dsm.getConf(),
dsm.getContainer().getContainerSet(),
dsm.getContainer().getController(),
- new SimpleContainerDownloader(conf, null),
new TarContainerPacker(), dsm.getContainer().getVolumeSet());
File tempFile = tempFolder.newFile(
diff --git
a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 1b7fcad140..7312fd6a9e 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -502,6 +502,16 @@ message CopyContainerResponseProto {
optional int64 checksum = 6;
}
+message SendContainerRequest {
+ required int64 containerID = 1;
+ required uint64 offset = 2;
+ required bytes data = 3;
+ optional int64 checksum = 4;
+}
+
+message SendContainerResponse {
+}
+
service XceiverClientProtocolService {
// A client-to-datanode RPC to send container commands
rpc send(stream ContainerCommandRequestProto) returns
@@ -512,4 +522,5 @@ service XceiverClientProtocolService {
service IntraDatanodeProtocolService {
// An intradatanode service to copy the raw container data between nodes
rpc download (CopyContainerRequestProto) returns (stream
CopyContainerResponseProto);
+ rpc upload (stream SendContainerRequest) returns (SendContainerResponse);
}
diff --git
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index 39d6a2931e..b36c3c89ff 100644
---
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -415,6 +415,7 @@ message ReplicateContainerCommandProto {
repeated DatanodeDetailsProto sources = 2;
required int64 cmdId = 3;
optional int32 replicaIndex = 4;
+ optional DatanodeDetailsProto target = 5;
}
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java
index 5998c93134..129bddffb3 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECMisReplicationHandler.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -37,8 +36,8 @@ import java.util.Set;
public class ECMisReplicationHandler extends MisReplicationHandler {
public ECMisReplicationHandler(
PlacementPolicy<ContainerReplica> containerPlacement,
- ConfigurationSource conf, NodeManager nodeManager) {
- super(containerPlacement, conf, nodeManager);
+ ConfigurationSource conf, NodeManager nodeManager, boolean push) {
+ super(containerPlacement, conf, nodeManager, push);
}
@Override
@@ -56,15 +55,12 @@ public class ECMisReplicationHandler extends
MisReplicationHandler {
}
@Override
- protected ReplicateContainerCommand getReplicateCommand(
- ContainerInfo containerInfo, ContainerReplica replica) {
- final ReplicateContainerCommand replicateCommand =
- new ReplicateContainerCommand(containerInfo.getContainerID(),
- Collections.singletonList(replica.getDatanodeDetails()));
+ protected ReplicateContainerCommand updateReplicateCommand(
+ ReplicateContainerCommand command, ContainerReplica replica) {
// For EC containers, we need to track the replica index which is
// to be replicated, so add it to the command.
- replicateCommand.setReplicaIndex(replica.getReplicaIndex());
- return replicateCommand;
+ command.setReplicaIndex(replica.getReplicaIndex());
+ return command;
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index a1e7f1a73f..49b29bd46c 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -356,16 +356,7 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
replicas, selectedDatanodes, excludedNodes, decomIndexes);
break;
}
- DatanodeDetails decommissioningSrcNode
- = replica.getDatanodeDetails();
- final ReplicateContainerCommand replicateCommand =
- new ReplicateContainerCommand(container.getContainerID(),
- ImmutableList.of(decommissioningSrcNode));
- // For EC containers, we need to track the replica index which is
- // to be replicated, so add it to the command.
- replicateCommand.setReplicaIndex(replica.getReplicaIndex());
- DatanodeDetails target = iterator.next();
- commands.put(target, replicateCommand);
+ createReplicateCommand(commands, container, iterator, replica);
}
}
}
@@ -414,21 +405,30 @@ public class ECUnderReplicationHandler implements
UnhealthyReplicationHandler {
replicas, targets, excludedNodes, maintIndexes);
break;
}
- DatanodeDetails maintenanceSourceNode = replica.getDatanodeDetails();
- final ReplicateContainerCommand replicateCommand =
- new ReplicateContainerCommand(
- container.containerID().getProtobuf().getId(),
- ImmutableList.of(maintenanceSourceNode));
- // For EC containers we need to track the replica index which is
- // to be replicated, so add it to the command.
- replicateCommand.setReplicaIndex(replica.getReplicaIndex());
- DatanodeDetails target = iterator.next();
- commands.put(target, replicateCommand);
+ createReplicateCommand(commands, container, iterator, replica);
additionalMaintenanceCopiesNeeded -= 1;
}
}
}
+ private void createReplicateCommand(
+ Map<DatanodeDetails, SCMCommand<?>> commands,
+ ContainerInfo container, Iterator<DatanodeDetails> iterator,
+ ContainerReplica replica) {
+ final boolean push = replicationManager.getConfig().isPush();
+ DatanodeDetails source = replica.getDatanodeDetails();
+ DatanodeDetails target = iterator.next();
+ final long containerID = container.getContainerID();
+ final ReplicateContainerCommand replicateCommand = push
+ ? ReplicateContainerCommand.toTarget(containerID, target)
+ : ReplicateContainerCommand.fromSources(containerID,
+ ImmutableList.of(source));
+ // For EC containers, we need to track the replica index which is
+ // to be replicated, so add it to the command.
+ replicateCommand.setReplicaIndex(replica.getReplicaIndex());
+ commands.put(push ? source : target, replicateCommand);
+ }
+
private static byte[] int2byte(List<Integer> src) {
byte[] dst = new byte[src.size()];
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
index 80a61ee5db..3e3a34c0b4 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
@@ -1474,21 +1474,24 @@ public class LegacyReplicationManager {
* datanode.
*
* @param container Container to be replicated
- * @param datanode The destination datanode to replicate
+ * @param target The destination datanode to replicate
* @param sources List of source nodes from where we can replicate
*/
private void sendReplicateCommand(final ContainerInfo container,
- final DatanodeDetails datanode,
+ final DatanodeDetails target,
final List<DatanodeDetails> sources) {
- LOG.info("Sending replicate container command for container {}" +
- " to datanode {} from datanodes {}",
- container.containerID(), datanode, sources);
-
final ContainerID id = container.containerID();
- final ReplicateContainerCommand replicateCommand =
- new ReplicateContainerCommand(id.getId(), sources);
- final boolean sent = sendAndTrackDatanodeCommand(datanode,
replicateCommand,
+ final long containerID = id.getId();
+ final boolean push = rmConf.isPush();
+ final ReplicateContainerCommand replicateCommand = push
+ ? ReplicateContainerCommand.toTarget(containerID, target)
+ : ReplicateContainerCommand.fromSources(containerID, sources);
+ final DatanodeDetails source = sources.get(0); // TODO randomize
+ final DatanodeDetails receiver = push ? source : target;
+ LOG.info("Sending {} to {}", replicateCommand, receiver);
+
+ final boolean sent = sendAndTrackDatanodeCommand(receiver,
replicateCommand,
action -> addInflight(InflightType.REPLICATION, id, action));
if (sent) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
index b418b9236e..3570f45eb7 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
@@ -58,15 +58,18 @@ public abstract class MisReplicationHandler implements
private final PlacementPolicy<ContainerReplica> containerPlacement;
private final long currentContainerSize;
private final NodeManager nodeManager;
+ private boolean push;
public MisReplicationHandler(
final PlacementPolicy<ContainerReplica> containerPlacement,
- final ConfigurationSource conf, NodeManager nodeManager) {
+ final ConfigurationSource conf, NodeManager nodeManager,
+ final boolean push) {
this.containerPlacement = containerPlacement;
this.currentContainerSize = (long) conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
this.nodeManager = nodeManager;
+ this.push = push;
}
protected abstract ContainerReplicaCount getContainerReplicaCount(
@@ -108,8 +111,8 @@ public abstract class MisReplicationHandler implements
.collect(Collectors.toSet());
}
- protected abstract ReplicateContainerCommand getReplicateCommand(
- ContainerInfo containerInfo, ContainerReplica replica);
+ protected abstract ReplicateContainerCommand updateReplicateCommand(
+ ReplicateContainerCommand command, ContainerReplica replica);
private Map<DatanodeDetails, SCMCommand<?>> getReplicateCommands(
ContainerInfo containerInfo,
@@ -121,8 +124,15 @@ public abstract class MisReplicationHandler implements
if (datanodeIdx == targetDns.size()) {
break;
}
- commandMap.put(targetDns.get(datanodeIdx),
- getReplicateCommand(containerInfo, replica));
+ long containerID = containerInfo.getContainerID();
+ DatanodeDetails source = replica.getDatanodeDetails();
+ DatanodeDetails target = targetDns.get(datanodeIdx);
+ ReplicateContainerCommand replicateCommand = push
+ ? ReplicateContainerCommand.toTarget(containerID, target)
+ : ReplicateContainerCommand.fromSources(containerID,
+ Collections.singletonList(source));
+ replicateCommand = updateReplicateCommand(replicateCommand, replica);
+ commandMap.put(push ? source : target, replicateCommand);
datanodeIdx += 1;
}
return commandMap;
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java
index 7dfe50e955..d3652a635f 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisMisReplicationHandler.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -39,8 +38,8 @@ public class RatisMisReplicationHandler extends
MisReplicationHandler {
public RatisMisReplicationHandler(
PlacementPolicy<ContainerReplica> containerPlacement,
- ConfigurationSource conf, NodeManager nodeManager) {
- super(containerPlacement, conf, nodeManager);
+ ConfigurationSource conf, NodeManager nodeManager, boolean push) {
+ super(containerPlacement, conf, nodeManager, push);
}
@Override
@@ -70,9 +69,8 @@ public class RatisMisReplicationHandler extends
MisReplicationHandler {
}
@Override
- protected ReplicateContainerCommand getReplicateCommand(
- ContainerInfo containerInfo, ContainerReplica replica) {
- return new ReplicateContainerCommand(containerInfo.getContainerID(),
- Collections.singletonList(replica.getDatanodeDetails()));
+ protected ReplicateContainerCommand updateReplicateCommand(
+ ReplicateContainerCommand command, ContainerReplica replica) {
+ return command;
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
index e5b2fe8bda..d9d96a566c 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
@@ -36,6 +36,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -53,14 +54,17 @@ public class RatisUnderReplicationHandler
private final PlacementPolicy placementPolicy;
private final NodeManager nodeManager;
private final long currentContainerSize;
+ private final ReplicationManager replicationManager;
public RatisUnderReplicationHandler(final PlacementPolicy placementPolicy,
- final ConfigurationSource conf, final NodeManager nodeManager) {
+ final ConfigurationSource conf, final NodeManager nodeManager,
+ final ReplicationManager replicationManager) {
this.placementPolicy = placementPolicy;
this.currentContainerSize = (long) conf
.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
this.nodeManager = nodeManager;
+ this.replicationManager = replicationManager;
}
/**
@@ -230,11 +234,29 @@ public class RatisUnderReplicationHandler
private Map<DatanodeDetails, SCMCommand<?>> createReplicationCommands(
long containerID, List<DatanodeDetails> sources,
List<DatanodeDetails> targets) {
+ final boolean push = replicationManager.getConfig().isPush()
+ && targets.size() <= sources.size();
+ // TODO if we need multiple commands per source datanode, we need a small
+ // interface change
Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
- for (DatanodeDetails target : targets) {
- ReplicateContainerCommand command =
- new ReplicateContainerCommand(containerID, sources);
- commands.put(target, command);
+
+ if (push) {
+ Collections.shuffle(sources);
+ for (Iterator<DatanodeDetails> srcIter = sources.iterator(),
+ targetIter = targets.iterator();
+ srcIter.hasNext() && targetIter.hasNext();) {
+ DatanodeDetails source = srcIter.next();
+ DatanodeDetails target = targetIter.next();
+ ReplicateContainerCommand command =
+ ReplicateContainerCommand.toTarget(containerID, target);
+ commands.put(source, command);
+ }
+ } else {
+ for (DatanodeDetails target : targets) {
+ ReplicateContainerCommand command =
+ ReplicateContainerCommand.fromSources(containerID, sources);
+ commands.put(target, command);
+ }
}
return commands;
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 535f0dde9d..911058f250 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -79,6 +79,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import static org.apache.hadoop.hdds.conf.ConfigTag.DATANODE;
import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
@@ -225,9 +226,9 @@ public class ReplicationManager implements SCMService {
ecOverReplicationHandler =
new ECOverReplicationHandler(ecContainerPlacement, nodeManager);
ecMisReplicationHandler = new ECMisReplicationHandler(ecContainerPlacement,
- conf, nodeManager);
+ conf, nodeManager, rmConf.isPush());
ratisUnderReplicationHandler = new RatisUnderReplicationHandler(
- ratisContainerPlacement, conf, nodeManager);
+ ratisContainerPlacement, conf, nodeManager, this);
ratisOverReplicationHandler =
new RatisOverReplicationHandler(ratisContainerPlacement, nodeManager);
underReplicatedProcessor =
@@ -861,6 +862,16 @@ public class ReplicationManager implements SCMService {
)
private int maintenanceRemainingRedundancy = 1;
+ @Config(key = "push",
+ type = ConfigType.BOOLEAN,
+ defaultValue = "false",
+ tags = { SCM, DATANODE },
+ description = "If false, replication happens by asking the target to "
+
+ "pull from source nodes. If true, the source node is asked to " +
+ "push to the target node."
+ )
+ private boolean push;
+
@PostConstruct
public void validate() {
if (!(commandDeadlineFactor > 0) || (commandDeadlineFactor > 1)) {
@@ -905,6 +916,10 @@ public class ReplicationManager implements SCMService {
public int getMaintenanceReplicaMinimum() {
return maintenanceReplicaMinimum;
}
+
+ public boolean isPush() {
+ return push;
+ }
}
@Override
@@ -952,6 +967,10 @@ public class ReplicationManager implements SCMService {
return metrics;
}
+ public ReplicationManagerConfiguration getConfig() {
+ return rmConf;
+ }
+
/**
* following functions will be refactored in a separate jira.
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
index 3332fc4e00..02cd592fe2 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECMisReplicationHandler.java
@@ -169,6 +169,7 @@ public class TestECMisReplicationHandler extends
TestMisReplicationHandler {
protected MisReplicationHandler getMisreplicationHandler(
PlacementPolicy placementPolicy, OzoneConfiguration conf,
NodeManager nodeManager) {
- return new ECMisReplicationHandler(placementPolicy, conf, nodeManager);
+ return new ECMisReplicationHandler(placementPolicy, conf, nodeManager,
+ false);
}
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index 58989daa48..28961646e6 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -97,6 +97,10 @@ public class TestECUnderReplicationHandler {
}
};
replicationManager = Mockito.mock(ReplicationManager.class);
+ ReplicationManager.ReplicationManagerConfiguration rmConf =
+ new ReplicationManager.ReplicationManagerConfiguration();
+ Mockito.when(replicationManager.getConfig())
+ .thenReturn(rmConf);
conf = SCMTestUtils.getConf();
repConfig = new ECReplicationConfig(DATA, PARITY);
container = ReplicationTestUtil
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java
index f960974256..874b4db2ba 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisMisReplicationHandler.java
@@ -173,6 +173,7 @@ public class TestRatisMisReplicationHandler extends
TestMisReplicationHandler {
protected MisReplicationHandler getMisreplicationHandler(
PlacementPolicy placementPolicy, OzoneConfiguration conf,
NodeManager nodeManager) {
- return new RatisMisReplicationHandler(placementPolicy, conf, nodeManager);
+ return new RatisMisReplicationHandler(placementPolicy, conf, nodeManager,
+ false);
}
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
index b2ccba1455..efed912e74 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisUnderReplicationHandler.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import
org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.UnderReplicatedHealthResult;
+import
org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
@@ -61,6 +62,7 @@ public class TestRatisUnderReplicationHandler {
private static final RatisReplicationConfig RATIS_REPLICATION_CONFIG =
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
private PlacementPolicy policy;
+ private ReplicationManager replicationManager;
@Before
public void setup() throws NodeNotFoundException {
@@ -71,6 +73,9 @@ public class TestRatisUnderReplicationHandler {
conf = SCMTestUtils.getConf();
policy = ReplicationTestUtil
.getSimpleTestPlacementPolicy(nodeManager, conf);
+ replicationManager = Mockito.mock(ReplicationManager.class);
+ Mockito.when(replicationManager.getConfig())
+ .thenReturn(new ReplicationManagerConfiguration());
/*
Return NodeStatus with NodeOperationalState as specified in
@@ -183,7 +188,8 @@ public class TestRatisUnderReplicationHandler {
policy = ReplicationTestUtil.getNoNodesTestPlacementPolicy(nodeManager,
conf);
RatisUnderReplicationHandler handler =
- new RatisUnderReplicationHandler(policy, conf, nodeManager);
+ new RatisUnderReplicationHandler(policy, conf, nodeManager,
+ replicationManager);
Set<ContainerReplica> replicas
= createReplicas(container.containerID(), State.CLOSED, 0, 0);
@@ -211,7 +217,8 @@ public class TestRatisUnderReplicationHandler {
ContainerHealthResult healthResult,
int minHealthyForMaintenance, int expectNumCommands) throws IOException {
RatisUnderReplicationHandler handler =
- new RatisUnderReplicationHandler(policy, conf, nodeManager);
+ new RatisUnderReplicationHandler(policy, conf, nodeManager,
+ replicationManager);
Map<DatanodeDetails, SCMCommand<?>> commands =
handler.processAndCreateCommands(replicas, pendingOps,
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index dd4a9e385c..2a2e6c68e6 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -576,7 +576,7 @@ public class TestReplicationManager {
sources.add(MockDatanodeDetails.randomDatanodeDetails());
- ReplicateContainerCommand command = new ReplicateContainerCommand(
+ ReplicateContainerCommand command = ReplicateContainerCommand.fromSources(
containerInfo.getContainerID(), sources);
command.setReplicaIndex(1);
@@ -611,7 +611,7 @@ public class TestReplicationManager {
containerInfo = ReplicationTestUtil.createContainerInfo(ratisRepConfig, 2,
HddsProtos.LifeCycleState.CLOSED, 10, 20);
- command = new ReplicateContainerCommand(
+ command = ReplicateContainerCommand.fromSources(
containerInfo.getContainerID(), sources);
replicationManager.sendDatanodeCommand(command, containerInfo, target);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
index 5a7900e8ec..a56030811d 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
@@ -111,7 +111,7 @@ public class TestUnderReplicatedProcessor {
List<DatanodeDetails> sourceDns = new ArrayList<>();
sourceDns.add(MockDatanodeDetails.randomDatanodeDetails());
DatanodeDetails targetDn = MockDatanodeDetails.randomDatanodeDetails();
- ReplicateContainerCommand rcc = new ReplicateContainerCommand(
+ ReplicateContainerCommand rcc = ReplicateContainerCommand.fromSources(
container.getContainerID(), sourceDns);
rcc.setReplicaIndex(3);
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
index e241eff809..138e12e5f1 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
@@ -68,6 +68,7 @@
OZONE-SITE.XML_ozone.s3g.kerberos.principal=s3g/[email protected]
OZONE-SITE.XML_hdds.scm.replication.thread.interval=5s
OZONE-SITE.XML_hdds.scm.replication.event.timeout=10s
+OZONE-SITE.XML_hdds.scm.replication.push=true
OZONE-SITE.XML_ozone.scm.stale.node.interval=30s
OZONE-SITE.XML_ozone.scm.dead.node.interval=45s
OZONE-SITE.XML_hdds.container.report.interval=60s
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDatanodeProtocolServer.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDatanodeProtocolServer.java
index cfa34fc445..4e329ad305 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDatanodeProtocolServer.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDatanodeProtocolServer.java
@@ -25,7 +25,6 @@ import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.IOException;
-import java.util.Collections;
import java.util.concurrent.TimeoutException;
/**
@@ -39,8 +38,7 @@ public class TestSCMDatanodeProtocolServer {
OzoneStorageContainerManager scm =
Mockito.mock(OzoneStorageContainerManager.class);
- ReplicateContainerCommand command = new ReplicateContainerCommand(1L,
- Collections.emptyList());
+ ReplicateContainerCommand command = ReplicateContainerCommand.forTest(1);
command.setTerm(5L);
command.setDeadline(1234L);
StorageContainerDatanodeProtocolProtos.SCMCommandProto proto =
diff --git
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
index 1c0ba2a80c..7eabe6884c 100644
---
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
+++
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
@@ -35,6 +35,7 @@ import
org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.replication.ContainerImporter;
import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
import
org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator;
import org.apache.hadoop.ozone.container.replication.ReplicationServer;
@@ -128,7 +129,7 @@ public class ClosedContainerReplicator extends
BaseFreonGenerator implements
//replica.
if (datanode.isEmpty() || datanodeUUIDs.contains(datanode)) {
replicationTasks.add(new ReplicationTask(
- new ReplicateContainerCommand(container.getContainerID(),
+ ReplicateContainerCommand.fromSources(container.getContainerID(),
datanodesWithContainer)));
}
}
@@ -202,16 +203,16 @@ public class ClosedContainerReplicator extends
BaseFreonGenerator implements
ContainerController controller =
new ContainerController(containerSet, handlers);
- ContainerReplicator replicator =
- new DownloadAndImportReplicator(conf, containerSet,
- controller,
- new SimpleContainerDownloader(conf, null),
- new TarContainerPacker(), null);
+ ContainerImporter importer = new ContainerImporter(conf, containerSet,
+ controller, new TarContainerPacker(), null);
+ ContainerReplicator replicator = new DownloadAndImportReplicator(importer,
+ new SimpleContainerDownloader(conf, null));
ReplicationServer.ReplicationConfig replicationConfig
= conf.getObject(ReplicationServer.ReplicationConfig.class);
supervisor = new ReplicationSupervisor(containerSet, null,
- replicator, replicationConfig, Clock.system(ZoneId.systemDefault()));
+ replicator, null, replicationConfig,
+ Clock.system(ZoneId.systemDefault()));
}
private void replicateContainer(long counter) throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]