This is an automated email from the ASF dual-hosted git repository.
ritesh pushed a commit to branch HDDS-10239-container-reconciliation
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to
refs/heads/HDDS-10239-container-reconciliation by this push:
new 0baba47daa HDDS-11254. Reconcile commands should be handled by
datanode ReplicationSupervisor (#7076)
0baba47daa is described below
commit 0baba47daaded042ba3cc4b2e7a342b5d53279dc
Author: Ethan Rose <[email protected]>
AuthorDate: Wed Aug 21 13:34:57 2024 -0400
HDDS-11254. Reconcile commands should be handled by datanode
ReplicationSupervisor (#7076)
---
.../checksum/ContainerChecksumTreeManager.java | 5 +-
.../checksum/ContainerMerkleTreeMetrics.java | 1 -
.../container/checksum/ReconcileContainerTask.java | 88 ++++++++++++++
.../ozone/container/common/interfaces/Handler.java | 12 +-
.../common/statemachine/DatanodeStateMachine.java | 7 +-
.../ReconcileContainerCommandHandler.java | 61 +++-------
.../ozone/container/keyvalue/KeyValueHandler.java | 10 +-
.../container/ozoneimpl/ContainerController.java | 7 +-
.../ozone/container/ozoneimpl/OzoneContainer.java | 9 +-
.../commands/ReconcileContainerCommand.java | 34 ++++--
.../checksum/TestReconcileContainerTask.java | 126 +++++++++++++++++++++
.../ozone/container/common/ContainerTestUtils.java | 48 ++++++++
.../container/common/TestBlockDeletingService.java | 34 ++----
.../TestSchemaOneBackwardsCompatibility.java | 11 +-
.../TestSchemaTwoBackwardsCompatibility.java | 4 +-
.../common/impl/TestContainerPersistence.java | 3 +-
.../container/common/impl/TestHddsDispatcher.java | 44 +------
.../container/common/interfaces/TestHandler.java | 18 +--
.../common/statemachine/TestStateContext.java | 2 +-
.../TestReconcileContainerCommandHandler.java | 84 ++++----------
.../states/endpoint/TestHeartbeatEndpointTask.java | 4 +-
.../container/keyvalue/TestKeyValueHandler.java | 28 ++---
.../TestKeyValueHandlerWithUnhealthyContainer.java | 3 +-
.../replication/TestGrpcReplicationService.java | 8 +-
.../ReconcileContainerEventHandler.java | 5 +-
.../container/metrics/TestContainerMetrics.java | 15 +--
.../container/server/TestContainerServer.java | 15 +--
.../server/TestSecureContainerServer.java | 16 +--
.../ozone/debug/container/ContainerCommands.java | 5 +-
.../ozone/freon/ClosedContainerReplicator.java | 5 +-
30 files changed, 416 insertions(+), 296 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
index 7042531f57..3e18e16c40 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
@@ -60,10 +60,13 @@ public class ContainerChecksumTreeManager {
public ContainerChecksumTreeManager(ConfigurationSource conf) {
fileLock = SimpleStriped.readWriteLock(
conf.getObject(DatanodeConfiguration.class).getContainerChecksumLockStripes(),
true);
- // TODO: TO unregister metrics on stop.
metrics = ContainerMerkleTreeMetrics.create();
}
+ public void stop() {
+ ContainerMerkleTreeMetrics.unregister();
+ }
+
/**
* Writes the specified container merkle tree to the specified container's
checksum file.
* The data merkle tree within the file is replaced with the {@code tree}
parameter, but all other content of the
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeMetrics.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeMetrics.java
index 3d76288616..c1bab5aa48 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeMetrics.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeMetrics.java
@@ -32,7 +32,6 @@ public class ContainerMerkleTreeMetrics {
public static ContainerMerkleTreeMetrics create() {
MetricsSystem ms = DefaultMetricsSystem.instance();
- // TODO: Remove when checksum manager is moved from KeyValueHandler.
MetricsSource source = ms.getSource(METRICS_SOURCE_NAME);
if (source != null) {
ms.unregisterSource(METRICS_SOURCE_NAME);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ReconcileContainerTask.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ReconcileContainerTask.java
new file mode 100644
index 0000000000..ac42efd45a
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ReconcileContainerTask.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.checksum;
+
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask;
+import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+/**
+ * Used to execute a container reconciliation task that has been queued from
the ReplicationSupervisor.
+ */
+public class ReconcileContainerTask extends AbstractReplicationTask {
+ private final ReconcileContainerCommand command;
+ private final DNContainerOperationClient dnClient;
+ private final ContainerController controller;
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ReconcileContainerTask.class);
+
+ public ReconcileContainerTask(ContainerController controller,
+ DNContainerOperationClient dnClient, ReconcileContainerCommand command) {
+ super(command.getContainerID(), command.getDeadline(), command.getTerm());
+ this.command = command;
+ this.controller = controller;
+ this.dnClient = dnClient;
+ }
+
+ @Override
+ public void runTask() {
+ long start = Time.monotonicNow();
+
+ LOG.info("{}", this);
+
+ try {
+ controller.reconcileContainer(dnClient, command.getContainerID(),
command.getPeerDatanodes());
+ setStatus(Status.DONE);
+ long elapsed = Time.monotonicNow() - start;
+ LOG.info("{} completed in {} ms", this, elapsed);
+ } catch (Exception e) {
+ long elapsed = Time.monotonicNow() - start;
+ setStatus(Status.FAILED);
+ LOG.warn("{} failed in {} ms", this, elapsed, e);
+ }
+ }
+
+ @Override
+ protected Object getCommandForDebug() {
+ return command.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ReconcileContainerTask that = (ReconcileContainerTask) o;
+ return Objects.equals(command, that.command);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getContainerId());
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index 01435d8002..1579f4af8e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.ozone.container.common.interfaces;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.List;
+import java.util.Set;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -30,6 +30,8 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerC
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
+import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
@@ -70,16 +72,17 @@ public abstract class Handler {
this.icrSender = icrSender;
}
+ @SuppressWarnings("checkstyle:ParameterNumber")
public static Handler getHandlerForContainerType(
final ContainerType containerType, final ConfigurationSource config,
final String datanodeId, final ContainerSet contSet,
final VolumeSet volumeSet, final ContainerMetrics metrics,
- IncrementalReportSender<Container> icrSender) {
+ IncrementalReportSender<Container> icrSender,
ContainerChecksumTreeManager checksumManager) {
switch (containerType) {
case KeyValueContainer:
return new KeyValueHandler(config,
datanodeId, contSet, volumeSet, metrics,
- icrSender);
+ icrSender, checksumManager);
default:
throw new IllegalArgumentException("Handler for ContainerType: " +
containerType + "doesn't exist.");
@@ -199,7 +202,8 @@ public abstract class Handler {
* @param container container to be reconciled.
* @param peers The other datanodes with a copy of this container whose data
should be checked.
*/
- public abstract void reconcileContainer(Container<?> container,
List<DatanodeDetails> peers) throws IOException;
+ public abstract void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
+ Set<DatanodeDetails> peers) throws IOException;
/**
* Deletes the given files associated with a block of the container.
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index a8deb9823d..888d1fd018 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -45,6 +45,7 @@ import
org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.NettyMetrics;
import org.apache.hadoop.ozone.HddsDatanodeStopService;
+import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
import org.apache.hadoop.ozone.container.common.report.ReportManager;
import
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CloseContainerCommandHandler;
@@ -225,6 +226,10 @@ public class DatanodeStateMachine implements Closeable {
new ReconstructECContainersCommandHandler(conf, supervisor,
ecReconstructionCoordinator);
+ // TODO HDDS-11218 combine the clients used for reconstruction and
reconciliation so they share the same cache of
+ // datanode clients.
+ DNContainerOperationClient dnClient = new DNContainerOperationClient(conf,
certClient, secretKeyClient);
+
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "PipelineCommandHandlerThread-%d")
.build();
@@ -253,7 +258,7 @@ public class DatanodeStateMachine implements Closeable {
supervisor::nodeStateUpdated))
.addHandler(new FinalizeNewLayoutVersionCommandHandler())
.addHandler(new RefreshVolumeUsageCommandHandler())
- .addHandler(new ReconcileContainerCommandHandler(threadNamePrefix))
+ .addHandler(new ReconcileContainerCommandHandler(supervisor, dnClient))
.setConnectionManager(connectionManager)
.setContainer(container)
.setContext(context)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java
index 9a4110c7df..99185a7e10 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java
@@ -18,66 +18,38 @@
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
+import org.apache.hadoop.ozone.container.checksum.ReconcileContainerTask;
import
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* Handles commands from SCM to reconcile a container replica on this datanode
with the replicas on its peers.
*/
public class ReconcileContainerCommandHandler implements CommandHandler {
- private static final Logger LOG =
- LoggerFactory.getLogger(ReconcileContainerCommandHandler.class);
-
+ private final ReplicationSupervisor supervisor;
private final AtomicLong invocationCount;
- private final AtomicInteger queuedCount;
- private final ExecutorService executor;
- private long totalTime;
+ private final DNContainerOperationClient dnClient;
- public ReconcileContainerCommandHandler(String threadNamePrefix) {
- invocationCount = new AtomicLong(0);
- queuedCount = new AtomicInteger(0);
- // TODO Allow configurable thread pool size with a default value when the
implementation is ready.
- executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
- .setNameFormat(threadNamePrefix + "ReconcileContainerThread-%d")
- .build());
- totalTime = 0;
+ public ReconcileContainerCommandHandler(ReplicationSupervisor supervisor,
DNContainerOperationClient dnClient) {
+ this.supervisor = supervisor;
+ this.dnClient = dnClient;
+ this.invocationCount = new AtomicLong(0);
}
@Override
public void handle(SCMCommand command, OzoneContainer container,
StateContext context,
SCMConnectionManager connectionManager) {
- queuedCount.incrementAndGet();
- CompletableFuture.runAsync(() -> {
- invocationCount.incrementAndGet();
- long startTime = Time.monotonicNow();
- ReconcileContainerCommand reconcileCommand = (ReconcileContainerCommand)
command;
- LOG.info("Processing reconcile container command for container {} with
peers {}",
- reconcileCommand.getContainerID(),
reconcileCommand.getPeerDatanodes());
- try {
-
container.getController().reconcileContainer(reconcileCommand.getContainerID(),
- reconcileCommand.getPeerDatanodes());
- } catch (IOException ex) {
- LOG.error("Failed to reconcile container {}.",
reconcileCommand.getContainerID(), ex);
- } finally {
- long endTime = Time.monotonicNow();
- totalTime += endTime - startTime;
- }
- }, executor).whenComplete((v, e) -> queuedCount.decrementAndGet());
+ invocationCount.incrementAndGet();
+ ReconcileContainerCommand reconcileCommand = (ReconcileContainerCommand)
command;
+ supervisor.addTask(new ReconcileContainerTask(container.getController(),
dnClient, reconcileCommand));
}
@Override
@@ -90,21 +62,20 @@ public class ReconcileContainerCommandHandler implements
CommandHandler {
return (int)invocationCount.get();
}
+ // Uses ReplicationSupervisor for these metrics.
+
@Override
public long getAverageRunTime() {
- if (invocationCount.get() > 0) {
- return totalTime / invocationCount.get();
- }
return 0;
}
@Override
public long getTotalRunTime() {
- return totalTime;
+ return 0;
}
@Override
public int getQueuedCount() {
- return queuedCount.get();
+ return 0;
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 9b888a9c1e..4b635194cd 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -32,6 +32,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.function.Function;
@@ -65,6 +66,7 @@ import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
+import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
@@ -159,14 +161,15 @@ public class KeyValueHandler extends Handler {
ContainerSet contSet,
VolumeSet volSet,
ContainerMetrics metrics,
- IncrementalReportSender<Container> icrSender) {
+ IncrementalReportSender<Container> icrSender,
+ ContainerChecksumTreeManager checksumManager) {
super(config, datanodeId, contSet, volSet, metrics, icrSender);
blockManager = new BlockManagerImpl(config);
validateChunkChecksumData = conf.getObject(
DatanodeConfiguration.class).isChunkDataValidationCheck();
chunkManager = ChunkManagerFactory.createChunkManager(config, blockManager,
volSet);
- checksumManager = new ContainerChecksumTreeManager(config);
+ this.checksumManager = checksumManager;
try {
volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(conf);
} catch (Exception e) {
@@ -1303,7 +1306,8 @@ public class KeyValueHandler extends Handler {
}
@Override
- public void reconcileContainer(Container<?> container, List<DatanodeDetails>
peers) throws IOException {
+ public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
+ Set<DatanodeDetails> peers) throws
IOException {
// TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
ContainerData data = container.getContainerData();
long id = data.getContainerID();
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
index 47b503ee05..6a1ceef0c0 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
@@ -25,6 +25,7 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
@@ -39,7 +40,6 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.time.Instant;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -215,12 +215,13 @@ public class ContainerController {
}
}
- public void reconcileContainer(long containerID, List<DatanodeDetails>
peers) throws IOException {
+ public void reconcileContainer(DNContainerOperationClient dnClient, long
containerID, Set<DatanodeDetails> peers)
+ throws IOException {
Container<?> container = containerSet.getContainer(containerID);
if (container == null) {
LOG.warn("Container {} to reconcile not found on this datanode.",
containerID);
} else {
- getHandler(container).reconcileContainer(container, peers);
+ getHandler(container).reconcileContainer(dnClient, container, peers);
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 6a6ac8bb35..f8034244c1 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -34,7 +34,6 @@ import
org.apache.hadoop.hdds.security.symmetric.SecretKeyVerifierClient;
import org.apache.hadoop.hdds.security.token.TokenVerifier;
import
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
-import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeMetrics;
import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.BlockDeletingService;
@@ -188,12 +187,13 @@ public class OzoneContainer {
}
};
+ checksumTreeManager = new ContainerChecksumTreeManager(config);
for (ContainerType containerType : ContainerType.values()) {
handlers.put(containerType,
Handler.getHandlerForContainerType(
containerType, conf,
context.getParent().getDatanodeDetails().getUuidString(),
- containerSet, volumeSet, metrics, icrSender));
+ containerSet, volumeSet, metrics, icrSender,
checksumTreeManager));
}
SecurityConfig secConf = new SecurityConfig(conf);
@@ -226,8 +226,6 @@ public class OzoneContainer {
Duration blockDeletingSvcInterval = conf.getObject(
DatanodeConfiguration.class).getBlockDeletionInterval();
- checksumTreeManager = new ContainerChecksumTreeManager(config);
-
long blockDeletingServiceTimeout = config
.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
@@ -500,8 +498,7 @@ public class OzoneContainer {
blockDeletingService.shutdown();
recoveringContainerScrubbingService.shutdown();
ContainerMetrics.remove();
- // TODO: To properly shut down ContainerMerkleTreeMetrics
- ContainerMerkleTreeMetrics.unregister();
+ checksumTreeManager.stop();
}
public void handleVolumeFailures() {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconcileContainerCommand.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconcileContainerCommand.java
index cdd4522cc6..3d24756a40 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconcileContainerCommand.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconcileContainerCommand.java
@@ -25,18 +25,20 @@ import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReconcileContainerCommandProto;
import java.util.List;
+import java.util.Objects;
+import java.util.Set;
import java.util.stream.Collectors;
-import static java.util.Collections.emptyList;
+import static java.util.Collections.emptySet;
/**
* Asks datanodes to reconcile the specified container with other container
replicas.
*/
public class ReconcileContainerCommand extends
SCMCommand<ReconcileContainerCommandProto> {
- private final List<DatanodeDetails> peerDatanodes;
+ private final Set<DatanodeDetails> peerDatanodes;
- public ReconcileContainerCommand(long containerID, List<DatanodeDetails>
peerDatanodes) {
+ public ReconcileContainerCommand(long containerID, Set<DatanodeDetails>
peerDatanodes) {
// Container ID serves as command ID, since only one reconciliation should
be in progress at a time.
super(containerID);
this.peerDatanodes = peerDatanodes;
@@ -58,7 +60,7 @@ public class ReconcileContainerCommand extends
SCMCommand<ReconcileContainerComm
return builder.build();
}
- public List<DatanodeDetails> getPeerDatanodes() {
+ public Set<DatanodeDetails> getPeerDatanodes() {
return peerDatanodes;
}
@@ -70,11 +72,11 @@ public class ReconcileContainerCommand extends
SCMCommand<ReconcileContainerComm
Preconditions.checkNotNull(protoMessage);
List<HddsProtos.DatanodeDetailsProto> peers = protoMessage.getPeersList();
- List<DatanodeDetails> peerNodes = !peers.isEmpty()
+ Set<DatanodeDetails> peerNodes = !peers.isEmpty()
? peers.stream()
.map(DatanodeDetails::getFromProtoBuf)
- .collect(Collectors.toList())
- : emptyList();
+ .collect(Collectors.toSet())
+ : emptySet();
return new ReconcileContainerCommand(protoMessage.getContainerID(),
peerNodes);
}
@@ -85,4 +87,22 @@ public class ReconcileContainerCommand extends
SCMCommand<ReconcileContainerComm
": containerId=" + getContainerID() +
", peerNodes=" + peerDatanodes;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ReconcileContainerCommand that = (ReconcileContainerCommand) o;
+ return getContainerID() == that.getContainerID() &&
+ Objects.equals(peerDatanodes, that.peerDatanodes);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getContainerID(), peerDatanodes);
+ }
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestReconcileContainerTask.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestReconcileContainerTask.java
new file mode 100644
index 0000000000..04d08347ed
--- /dev/null
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestReconcileContainerTask.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.checksum;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask;
+import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+class TestReconcileContainerTask {
+ private DNContainerOperationClient mockClient;
+ private ContainerController mockController;
+
+ @BeforeEach
+ public void init() {
+ mockClient = mock(DNContainerOperationClient.class);
+ mockController = mock(ContainerController.class);
+ }
+
+ @Test
+ public void testFailedTaskStatus() throws Exception {
+ doThrow(IOException.class).when(mockController).reconcileContainer(any(),
anyLong(), any());
+ ReconcileContainerTask task = new ReconcileContainerTask(mockController,
mockClient,
+ new ReconcileContainerCommand(1, Collections.emptySet()));
+
+ assertEquals(AbstractReplicationTask.Status.QUEUED, task.getStatus());
+ task.runTask();
+ assertEquals(AbstractReplicationTask.Status.FAILED, task.getStatus());
+ }
+
+ @Test
+ public void testSuccessfulTaskStatus() {
+ ReconcileContainerTask task = new ReconcileContainerTask(mockController,
mockClient,
+ new ReconcileContainerCommand(1, Collections.emptySet()));
+
+ assertEquals(AbstractReplicationTask.Status.QUEUED, task.getStatus());
+ task.runTask();
+ assertEquals(AbstractReplicationTask.Status.DONE, task.getStatus());
+ }
+
+ @Test
+ public void testEqualityWhenContainerIDsMatch() {
+ final long containerID = 1;
+ final UUID dnID1 = UUID.randomUUID();
+
+ Set<DatanodeDetails> peerSet1 = new HashSet<>();
+ peerSet1.add(buildDn(dnID1));
+ Set<DatanodeDetails> peerSet1Other = new HashSet<>();
+ peerSet1Other.add(buildDn(dnID1));
+ Set<DatanodeDetails> peerSet2 = new HashSet<>();
+ peerSet2.add(buildDn());
+
+ ReconcileContainerTask peerSet1Task = new
ReconcileContainerTask(mockController, mockClient,
+ new ReconcileContainerCommand(containerID, peerSet1));
+ ReconcileContainerTask otherPeerSet1Task = new
ReconcileContainerTask(mockController, mockClient,
+ new ReconcileContainerCommand(containerID, peerSet1Other));
+ ReconcileContainerTask peerSet2Task = new
ReconcileContainerTask(mockController, mockClient,
+ new ReconcileContainerCommand(containerID, peerSet2));
+
+ // Same container ID and peers.
+ assertEquals(peerSet1Task, otherPeerSet1Task);
+ // Same container ID, different peers.
+ assertNotEquals(peerSet1Task, peerSet2Task);
+ }
+
+ @Test
+ public void testEqualityWhenContainerIDsDifferent() {
+ Set<DatanodeDetails> peerSet = new HashSet<>();
+ peerSet.add(buildDn());
+
+ ReconcileContainerTask id1Task = new
ReconcileContainerTask(mockController, mockClient,
+ new ReconcileContainerCommand(1, peerSet));
+ ReconcileContainerTask id2Task = new
ReconcileContainerTask(mockController, mockClient,
+ new ReconcileContainerCommand(2, peerSet));
+ ReconcileContainerTask id2NoPeersTask = new
ReconcileContainerTask(mockController, mockClient,
+ new ReconcileContainerCommand(2, Collections.emptySet()));
+
+ // Different container ID, same peers.
+ assertNotEquals(id1Task, id2Task);
+ // Different container ID, different peers.
+ assertNotEquals(id1Task, id2NoPeersTask);
+ }
+
+ private DatanodeDetails buildDn(UUID id) {
+ return DatanodeDetails.newBuilder()
+ .setUuid(id)
+ .build();
+ }
+
+ private DatanodeDetails buildDn() {
+ return DatanodeDetails.newBuilder()
+ .setUuid(UUID.randomUUID())
+ .build();
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
index b5b578554b..d4c00df38b 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.container.common;
+import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -27,6 +28,7 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerC
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import org.apache.hadoop.hdds.security.token.TokenVerifier;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
@@ -36,9 +38,12 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@@ -54,8 +59,10 @@ import
org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
@@ -73,6 +80,7 @@ import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -179,6 +187,46 @@ public final class ContainerTestUtils {
return new KeyValueContainer(kvData, new OzoneConfiguration());
}
+ /**
+ * Constructs an instance of KeyValueHandler that can be used for testing.
+ * This instance can be used for tests that do not need an ICR sender or
{@link ContainerChecksumTreeManager}.
+ */
+ public static KeyValueHandler getKeyValueHandler(ConfigurationSource config,
+ String datanodeId, ContainerSet contSet, VolumeSet volSet,
ContainerMetrics metrics) {
+ return new KeyValueHandler(config, datanodeId, contSet, volSet, metrics, c
-> { },
+ new ContainerChecksumTreeManager(config));
+ }
+
+ /**
+ * Constructs an instance of KeyValueHandler that can be used for testing.
+ * This instance can be used for tests that do not need an ICR sender,
metrics, or a
+ * {@link ContainerChecksumTreeManager}.
+ */
+ public static KeyValueHandler getKeyValueHandler(ConfigurationSource config,
+ String datanodeId, ContainerSet contSet, VolumeSet volSet) {
+ return getKeyValueHandler(config, datanodeId, contSet, volSet,
ContainerMetrics.create(config));
+ }
+
+ public static HddsDispatcher getHddsDispatcher(OzoneConfiguration conf,
+ ContainerSet contSet,
+ VolumeSet volSet,
+ StateContext context) {
+ return getHddsDispatcher(conf, contSet, volSet, context, null);
+ }
+
+ public static HddsDispatcher getHddsDispatcher(OzoneConfiguration conf,
+ ContainerSet contSet,
+ VolumeSet volSet,
+ StateContext context,
TokenVerifier verifier) {
+ ContainerMetrics metrics = ContainerMetrics.create(conf);
+ Map<ContainerType, Handler> handlers = Maps.newHashMap();
+ handlers.put(ContainerType.KeyValueContainer,
ContainerTestUtils.getKeyValueHandler(conf,
+ context.getParent().getDatanodeDetails().getUuidString(), contSet,
volSet, metrics));
+ assertEquals(1, ContainerType.values().length, "Tests only cover
KeyValueContainer type");
+ return new HddsDispatcher(
+ conf, contSet, volSet, handlers, context, metrics, verifier);
+ }
+
public static void enableSchemaV3(OzoneConfiguration conf) {
DatanodeConfiguration dc = conf.getObject(DatanodeConfiguration.class);
dc.setContainerSchemaV3Enabled(true);
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
index ab313d0ce6..fd0c81fb02 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
@@ -481,9 +481,7 @@ public class TestBlockDeletingService {
// runs so we can trigger it manually.
ContainerMetrics metrics = ContainerMetrics.create(conf);
KeyValueHandler keyValueHandler =
- new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
- metrics, c -> {
- });
+ ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid,
containerSet, volumeSet, metrics);
OzoneContainer ozoneContainer =
mockDependencies(containerSet, keyValueHandler);
BlockDeletingService svc = new BlockDeletingService(ozoneContainer,
@@ -550,9 +548,7 @@ public class TestBlockDeletingService {
createToDeleteBlocks(containerSet, 1, 3, 1);
ContainerMetrics metrics = ContainerMetrics.create(conf);
KeyValueHandler keyValueHandler =
- new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
- metrics, c -> {
- });
+ ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid,
containerSet, volumeSet, metrics);
BlockDeletingServiceTestImpl svc =
getBlockDeletingService(containerSet, conf, keyValueHandler);
svc.start();
@@ -684,9 +680,7 @@ public class TestBlockDeletingService {
ContainerMetrics metrics = ContainerMetrics.create(conf);
KeyValueHandler keyValueHandler =
- new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
- metrics, c -> {
- });
+ ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid,
containerSet, volumeSet, metrics);
BlockDeletingServiceTestImpl svc =
getBlockDeletingService(containerSet, conf, keyValueHandler);
svc.start();
@@ -792,9 +786,7 @@ public class TestBlockDeletingService {
createToDeleteBlocks(containerSet, 1, 100, 1);
ContainerMetrics metrics = ContainerMetrics.create(conf);
KeyValueHandler keyValueHandler =
- new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
- metrics, c -> {
- });
+ ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid,
containerSet, volumeSet, metrics);
BlockDeletingServiceTestImpl service =
getBlockDeletingService(containerSet, conf, keyValueHandler);
service.start();
@@ -822,9 +814,7 @@ public class TestBlockDeletingService {
createToDeleteBlocks(containerSet, 1, 3, 1);
ContainerMetrics metrics = ContainerMetrics.create(conf);
KeyValueHandler keyValueHandler =
- new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
- metrics, c -> {
- });
+ ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid,
containerSet, volumeSet, metrics);
// set timeout value as 1ns to trigger timeout behavior
long timeout = 1;
OzoneContainer ozoneContainer =
@@ -929,9 +919,7 @@ public class TestBlockDeletingService {
chunksPerBlock);
ContainerMetrics metrics = ContainerMetrics.create(conf);
KeyValueHandler keyValueHandler =
- new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
- metrics, c -> {
- });
+ ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid,
containerSet, volumeSet, metrics);
BlockDeletingServiceTestImpl service =
getBlockDeletingService(containerSet, conf, keyValueHandler);
service.start();
@@ -988,9 +976,7 @@ public class TestBlockDeletingService {
createToDeleteBlocks(containerSet, containerCount, blocksPerContainer,
chunksPerBlock);
KeyValueHandler keyValueHandler =
- new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
- ContainerMetrics.create(conf), c -> {
- });
+ ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid,
containerSet, volumeSet);
BlockDeletingServiceTestImpl service =
getBlockDeletingService(containerSet, conf, keyValueHandler);
service.start();
@@ -1047,9 +1033,7 @@ public class TestBlockDeletingService {
ContainerSet containerSet = new ContainerSet(1000);
ContainerMetrics metrics = ContainerMetrics.create(conf);
KeyValueHandler keyValueHandler =
- new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
- metrics, c -> {
- });
+ ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid,
containerSet, volumeSet, metrics);
int containerCount = 5;
int blocksPerContainer = 3;
createToDeleteBlocks(containerSet, containerCount,
@@ -1119,7 +1103,7 @@ public class TestBlockDeletingService {
ContainerSet containerSet = new ContainerSet(1000);
KeyValueContainerData contData = createToDeleteBlocks(containerSet,
numBlocks, 4);
KeyValueHandler keyValueHandler =
- new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
ContainerMetrics.create(conf), c -> { });
+ ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid,
containerSet, volumeSet);
BlockDeletingServiceTestImpl svc =
getBlockDeletingService(containerSet, conf, keyValueHandler);
svc.start();
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
index ad5ca48218..7774604127 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
@@ -279,11 +278,8 @@ public class TestSchemaOneBackwardsCompatibility {
ContainerSet containerSet = makeContainerSet();
VolumeSet volumeSet = new MutableVolumeSet(datanodeUuid, conf, null,
StorageVolume.VolumeType.DATA_VOLUME, null);
- ContainerMetrics metrics = ContainerMetrics.create(conf);
KeyValueHandler keyValueHandler =
- new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
- metrics, c -> {
- });
+ ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid,
containerSet, volumeSet);
long initialTotalSpace = newKvData().getBytesUsed();
long blockSpace = initialTotalSpace / TestDB.KEY_COUNT;
@@ -352,11 +348,8 @@ public class TestSchemaOneBackwardsCompatibility {
ContainerSet containerSet = makeContainerSet();
VolumeSet volumeSet = new MutableVolumeSet(datanodeUuid, conf, null,
StorageVolume.VolumeType.DATA_VOLUME, null);
- ContainerMetrics metrics = ContainerMetrics.create(conf);
KeyValueHandler keyValueHandler =
- new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
- metrics, c -> {
- });
+ ContainerTestUtils.getKeyValueHandler(conf, datanodeUuid,
containerSet, volumeSet);
KeyValueContainerData cData = newKvData();
try (DBHandle refCountedDB = BlockUtils.getDB(cData, conf)) {
// Read blocks that were already deleted before the upgrade.
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaTwoBackwardsCompatibility.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaTwoBackwardsCompatibility.java
index 0c4612b79f..da0d2384ab 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaTwoBackwardsCompatibility.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaTwoBackwardsCompatibility.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
@@ -135,8 +134,7 @@ public class TestSchemaTwoBackwardsCompatibility {
chunkManager = new FilePerBlockStrategy(true, blockManager, volumeSet);
containerSet = new ContainerSet(1000);
- keyValueHandler = new KeyValueHandler(conf, datanodeUuid,
- containerSet, volumeSet, ContainerMetrics.create(conf), c -> { });
+ keyValueHandler = ContainerTestUtils.getKeyValueHandler(conf,
datanodeUuid, containerSet, volumeSet);
ozoneContainer = mock(OzoneContainer.class);
when(ozoneContainer.getContainerSet()).thenReturn(containerSet);
when(ozoneContainer.getWriteChannel()).thenReturn(null);
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index 3ff8f9e625..993179d1b7 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
@@ -311,7 +312,7 @@ public class TestContainerPersistence {
KeyValueHandler kvHandler = new KeyValueHandler(conf,
datanodeId, containerSet, volumeSet, metrics,
- c -> icrReceived.incrementAndGet());
+ c -> icrReceived.incrementAndGet(), new
ContainerChecksumTreeManager(conf));
Exception exception = assertThrows(
StorageContainerException.class,
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
index 1cbd6ee470..05bebdd1b9 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.container.common.impl;
-import com.google.common.collect.Maps;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.conf.StorageUnit;
@@ -34,7 +33,6 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProtoOrBuilder;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
-import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
@@ -48,8 +46,6 @@ import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
-import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
@@ -78,7 +74,6 @@ import java.nio.file.Path;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -113,10 +108,6 @@ public class TestHddsDispatcher {
@TempDir
private File testDir;
- public static final IncrementalReportSender<Container> NO_OP_ICR_SENDER =
- c -> {
- };
-
@ContainerLayoutTestInfo.ContainerTest
public void testContainerCloseActionWhenFull(
ContainerLayoutVersion layout) throws IOException {
@@ -143,16 +134,7 @@ public class TestHddsDispatcher {
container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
scmId.toString());
containerSet.addContainer(container);
- ContainerMetrics metrics = ContainerMetrics.create(conf);
- Map<ContainerType, Handler> handlers = Maps.newHashMap();
- for (ContainerType containerType : ContainerType.values()) {
- handlers.put(containerType,
- Handler.getHandlerForContainerType(containerType, conf,
- context.getParent().getDatanodeDetails().getUuidString(),
- containerSet, volumeSet, metrics, NO_OP_ICR_SENDER));
- }
- HddsDispatcher hddsDispatcher = new HddsDispatcher(
- conf, containerSet, volumeSet, handlers, context, metrics, null);
+ HddsDispatcher hddsDispatcher =
ContainerTestUtils.getHddsDispatcher(conf, containerSet, volumeSet, context);
hddsDispatcher.setClusterId(scmId.toString());
ContainerCommandResponseProto responseOne = hddsDispatcher
.dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 1L), null);
@@ -279,16 +261,7 @@ public class TestHddsDispatcher {
container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
scmId.toString());
containerSet.addContainer(container);
- ContainerMetrics metrics = ContainerMetrics.create(conf);
- Map<ContainerType, Handler> handlers = Maps.newHashMap();
- for (ContainerType containerType : ContainerType.values()) {
- handlers.put(containerType,
- Handler.getHandlerForContainerType(containerType, conf,
- context.getParent().getDatanodeDetails().getUuidString(),
- containerSet, volumeSet, metrics, NO_OP_ICR_SENDER));
- }
- HddsDispatcher hddsDispatcher = new HddsDispatcher(
- conf, containerSet, volumeSet, handlers, context, metrics, null);
+ HddsDispatcher hddsDispatcher =
ContainerTestUtils.getHddsDispatcher(conf, containerSet, volumeSet, context);
hddsDispatcher.setClusterId(scmId.toString());
containerData.getVolume().getVolumeInfo()
.ifPresent(volumeInfo -> volumeInfo.incrementUsedSpace(50));
@@ -528,17 +501,8 @@ public class TestHddsDispatcher {
}
});
StateContext context = ContainerTestUtils.getMockContext(dd, conf);
- ContainerMetrics metrics = ContainerMetrics.create(conf);
- Map<ContainerType, Handler> handlers = Maps.newHashMap();
- for (ContainerType containerType : ContainerType.values()) {
- handlers.put(containerType,
- Handler.getHandlerForContainerType(containerType, conf,
- context.getParent().getDatanodeDetails().getUuidString(),
- containerSet, volumeSet, metrics, NO_OP_ICR_SENDER));
- }
-
- final HddsDispatcher hddsDispatcher = new HddsDispatcher(conf,
- containerSet, volumeSet, handlers, context, metrics, tokenVerifier);
+ final HddsDispatcher hddsDispatcher =
+ ContainerTestUtils.getHddsDispatcher(conf, containerSet, volumeSet,
context, tokenVerifier);
hddsDispatcher.setClusterId(scmId.toString());
return hddsDispatcher;
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
index 8f2ad307e8..27257d5a0e 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.ozone.container.common.interfaces;
-import java.util.Map;
-
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -27,13 +25,11 @@ import
org.apache.hadoop.ozone.container.common.ContainerTestUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
-import org.apache.hadoop.ozone.container.common.impl.TestHddsDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
-import com.google.common.collect.Maps;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -62,19 +58,7 @@ public class TestHandler {
DatanodeDetails datanodeDetails = mock(DatanodeDetails.class);
StateContext context = ContainerTestUtils.getMockContext(
datanodeDetails, conf);
- ContainerMetrics metrics = ContainerMetrics.create(conf);
- Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap();
- for (ContainerProtos.ContainerType containerType :
- ContainerProtos.ContainerType.values()) {
- handlers.put(containerType,
- Handler.getHandlerForContainerType(
- containerType, conf,
- context.getParent().getDatanodeDetails().getUuidString(),
- containerSet, volumeSet, metrics,
- TestHddsDispatcher.NO_OP_ICR_SENDER));
- }
- this.dispatcher = new HddsDispatcher(
- conf, containerSet, volumeSet, handlers, null, metrics, null);
+ this.dispatcher = ContainerTestUtils.getHddsDispatcher(conf, containerSet,
volumeSet, context);
}
@AfterEach
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index 6933400fba..fd4614335e 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -705,7 +705,7 @@ public class TestStateContext {
ctx.addCommand(ReplicateContainerCommand.forTest(3));
ctx.addCommand(new ClosePipelineCommand(PipelineID.randomId()));
ctx.addCommand(new CloseContainerCommand(1, PipelineID.randomId()));
- ctx.addCommand(new ReconcileContainerCommand(4, Collections.emptyList()));
+ ctx.addCommand(new ReconcileContainerCommand(4, Collections.emptySet()));
Map<SCMCommandProto.Type, Integer> summary = ctx.getCommandQueueSummary();
assertEquals(3,
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java
index d6be667f41..f27ed097d2 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java
@@ -24,6 +24,9 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
+import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
+import org.apache.hadoop.ozone.container.checksum.ReconcileContainerTask;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
@@ -39,16 +42,14 @@ import
org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
-import org.apache.ozone.test.GenericTestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import static java.util.Collections.singletonMap;
import static
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
@@ -56,7 +57,8 @@ import static org.apache.hadoop.ozone.OzoneConsts.GB;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -78,7 +80,14 @@ public class TestReconcileContainerCommandHandler {
OzoneConfiguration conf = new OzoneConfiguration();
DatanodeDetails dnDetails = randomDatanodeDetails();
- subject = new ReconcileContainerCommandHandler("");
+
+ ReplicationSupervisor mockSupervisor = mock(ReplicationSupervisor.class);
+ doAnswer(invocation -> {
+ ((ReconcileContainerTask)invocation.getArguments()[0]).runTask();
+ return null;
+ }).when(mockSupervisor).addTask(any());
+
+ subject = new ReconcileContainerCommandHandler(mockSupervisor,
mock(DNContainerOperationClient.class));
context = ContainerTestUtils.getMockContext(dnDetails, conf);
containerSet = new ContainerSet(1000);
@@ -91,7 +100,7 @@ public class TestReconcileContainerCommandHandler {
assertEquals(NUM_CONTAINERS, containerSet.containerCount());
Handler containerHandler = new KeyValueHandler(new OzoneConfiguration(),
dnDetails.getUuidString(), containerSet,
- mock(VolumeSet.class), mock(ContainerMetrics.class), icrSender);
+ mock(VolumeSet.class), mock(ContainerMetrics.class), icrSender, new
ContainerChecksumTreeManager(conf));
ContainerController controller = new ContainerController(containerSet,
singletonMap(ContainerProtos.ContainerType.KeyValueContainer,
containerHandler));
ozoneContainer = mock(OzoneContainer.class);
@@ -114,80 +123,37 @@ public class TestReconcileContainerCommandHandler {
init(layout, icrSender);
for (int id = 1; id <= NUM_CONTAINERS; id++) {
- ReconcileContainerCommand cmd = new ReconcileContainerCommand(id,
Collections.emptyList());
+ ReconcileContainerCommand cmd = new ReconcileContainerCommand(id,
Collections.emptySet());
subject.handle(cmd, ozoneContainer, context, null);
}
// An unknown container should not trigger a container report being sent.
ReconcileContainerCommand unknownContainerCmd = new
ReconcileContainerCommand(NUM_CONTAINERS + 1,
- Collections.emptyList());
+ Collections.emptySet());
subject.handle(unknownContainerCmd, ozoneContainer, context, null);
- waitForAllCommandsToFinish();
+ // Since the replication supervisor is mocked in this test, reports are
processed immediately.
verifyAllContainerReports(containerReportsSent);
}
+ /**
+ * Most metrics are handled by the ReplicationSupervisor. Only check the
individual metrics here.
+ */
@ContainerLayoutTestInfo.ContainerTest
public void testReconcileContainerCommandMetrics(ContainerLayoutVersion
layout) throws Exception {
- // Used to block ICR sending so that queue metrics can be checked before
the reconcile task completes.
- CountDownLatch icrLatch = new CountDownLatch(1);
- // Wait this long before completing the task.
- // This provides a lower bound on execution time.
- final long minExecTimeMillis = 500;
- // This is the lower bound on execution time of all the commands combined.
- final long expectedTotalMinExecTimeMillis = minExecTimeMillis *
NUM_CONTAINERS;
+ init(layout, c -> { });
- IncrementalReportSender<Container> icrSender = c -> {
- try {
- // Block the caller until the latch is counted down.
- // Caller can check queue metrics in the meantime.
- LOG.info("ICR sender waiting for latch");
- assertTrue(icrLatch.await(30, TimeUnit.SECONDS));
- LOG.info("ICR sender proceeding after latch");
-
- Thread.sleep(minExecTimeMillis);
- } catch (Exception ex) {
- LOG.error("ICR sender failed", ex);
- }
- };
-
- init(layout, icrSender);
+ assertEquals(0, subject.getInvocationCount());
// All commands submitted will be blocked until the latch is counted down.
for (int id = 1; id <= NUM_CONTAINERS; id++) {
- ReconcileContainerCommand cmd = new ReconcileContainerCommand(id,
Collections.emptyList());
+ ReconcileContainerCommand cmd = new ReconcileContainerCommand(id,
Collections.emptySet());
subject.handle(cmd, ozoneContainer, context, null);
}
- assertEquals(NUM_CONTAINERS, subject.getQueuedCount());
- assertEquals(0, subject.getTotalRunTime());
- assertEquals(0, subject.getAverageRunTime());
-
- // This will resume handling of the tasks.
- icrLatch.countDown();
- waitForAllCommandsToFinish();
-
assertEquals(NUM_CONTAINERS, subject.getInvocationCount());
- long totalRunTime = subject.getTotalRunTime();
- assertTrue(totalRunTime >= expectedTotalMinExecTimeMillis,
- "Total run time " + totalRunTime + "ms was not larger than the minimum
total exec time " +
- expectedTotalMinExecTimeMillis + "ms");
- long avgRunTime = subject.getAverageRunTime();
- assertTrue(avgRunTime >= minExecTimeMillis,
- "Average run time " + avgRunTime + "ms was not larger than the minimum
per task exec time " +
- minExecTimeMillis + "ms");
- }
-
- private void waitForAllCommandsToFinish() throws Exception {
- // Queue count should be decremented only after the task completes, so the
other metrics should be consistent when
- // it reaches zero.
- GenericTestUtils.waitFor(() -> {
- int qCount = subject.getQueuedCount();
- LOG.info("Waiting for queued command count to reach 0. Currently at " +
qCount);
- return qCount == 0;
- }, 500, 3000);
}
- private void verifyAllContainerReports(Map<ContainerID,
ContainerReplicaProto> reportsSent) throws Exception {
+ private void verifyAllContainerReports(Map<ContainerID,
ContainerReplicaProto> reportsSent) {
assertEquals(NUM_CONTAINERS, reportsSent.size());
for (Map.Entry<ContainerID, ContainerReplicaProto> entry:
reportsSent.entrySet()) {
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
index 6245489f13..49b109b913 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -33,9 +33,11 @@ import static org.mockito.Mockito.when;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
+import java.util.Set;
import java.util.UUID;
import com.google.protobuf.Proto2Utils;
@@ -118,7 +120,7 @@ public class TestHeartbeatEndpointTask {
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
mock(StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
- List<DatanodeDetails> peerDNs = new ArrayList<>();
+ Set<DatanodeDetails> peerDNs = new HashSet<>();
peerDNs.add(MockDatanodeDetails.randomDatanodeDetails());
peerDNs.add(MockDatanodeDetails.randomDatanodeDetails());
ReconcileContainerCommand cmd = new ReconcileContainerCommand(1, peerDNs);
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index 4527ee6d51..8e5f7f01e7 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -41,6 +41,8 @@ import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.security.token.TokenVerifier;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
+import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
@@ -48,6 +50,7 @@ import
org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
@@ -275,16 +278,11 @@ public class TestKeyValueHandler {
null, StorageVolume.VolumeType.DATA_VOLUME, null);
try {
ContainerSet cset = new ContainerSet(1000);
- int[] interval = new int[1];
- interval[0] = 2;
- ContainerMetrics metrics = new ContainerMetrics(interval);
DatanodeDetails datanodeDetails = mock(DatanodeDetails.class);
StateContext context = ContainerTestUtils.getMockContext(
datanodeDetails, conf);
- KeyValueHandler keyValueHandler = new KeyValueHandler(conf,
- context.getParent().getDatanodeDetails().getUuidString(), cset,
- volumeSet, metrics, c -> {
- });
+ KeyValueHandler keyValueHandler =
ContainerTestUtils.getKeyValueHandler(conf,
+ context.getParent().getDatanodeDetails().getUuidString(), cset,
volumeSet);
assertEquals("org.apache.hadoop.ozone.container.common" +
".volume.CapacityVolumeChoosingPolicy",
keyValueHandler.getVolumeChoosingPolicyForTesting()
@@ -294,8 +292,8 @@ public class TestKeyValueHandler {
conf.set(HDDS_DATANODE_VOLUME_CHOOSING_POLICY,
"org.apache.hadoop.ozone.container.common.impl.HddsDispatcher");
RuntimeException exception = assertThrows(RuntimeException.class,
- () -> new KeyValueHandler(conf,
context.getParent().getDatanodeDetails().getUuidString(), cset, volumeSet,
- metrics, c -> { }));
+ () -> ContainerTestUtils.getKeyValueHandler(conf,
context.getParent().getDatanodeDetails().getUuidString(),
+ cset, volumeSet));
assertThat(exception).hasMessageEndingWith(
"class org.apache.hadoop.ozone.container.common.impl.HddsDispatcher
" +
@@ -385,7 +383,7 @@ public class TestKeyValueHandler {
final KeyValueHandler kvHandler = new KeyValueHandler(conf,
datanodeId, containerSet, volumeSet, metrics,
- c -> icrReceived.incrementAndGet());
+ c -> icrReceived.incrementAndGet(), new
ContainerChecksumTreeManager(conf));
kvHandler.setClusterID(clusterId);
final ContainerCommandRequestProto createContainer =
@@ -459,8 +457,7 @@ public class TestKeyValueHandler {
// Allows checking the invocation count of the lambda.
AtomicInteger icrCount = new AtomicInteger(0);
- KeyValueHandler keyValueHandler = new KeyValueHandler(conf,
randomDatanodeDetails().getUuidString(), containerSet,
- mock(MutableVolumeSet.class), mock(ContainerMetrics.class), c -> {
+ IncrementalReportSender<Container> icrSender = c -> {
// Check that the ICR contains expected info about the container.
ContainerReplicaProto report = c.getContainerReport();
long reportedID = report.getContainerID();
@@ -470,11 +467,14 @@ public class TestKeyValueHandler {
Assertions.assertNotEquals(0, reportDataChecksum,
"Container report should have populated the checksum field with a
non-zero value.");
icrCount.incrementAndGet();
- });
+ };
+
+ KeyValueHandler keyValueHandler = new KeyValueHandler(conf,
randomDatanodeDetails().getUuidString(), containerSet,
+ mock(MutableVolumeSet.class), mock(ContainerMetrics.class), icrSender,
new ContainerChecksumTreeManager(conf));
Assertions.assertEquals(0, icrCount.get());
// This should trigger container report validation in the ICR handler
above.
- keyValueHandler.reconcileContainer(container, Collections.emptyList());
+ keyValueHandler.reconcileContainer(mock(DNContainerOperationClient.class),
container, Collections.emptySet());
Assertions.assertEquals(1, icrCount.get());
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
index f0c8a2077e..8207993218 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.ozone.ClientVersion;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
@@ -254,7 +255,7 @@ public class TestKeyValueHandlerWithUnhealthyContainer {
stateMachine.getDatanodeDetails().getUuidString(),
mock(ContainerSet.class),
mock(MutableVolumeSet.class),
- mock(ContainerMetrics.class), mockIcrSender);
+ mock(ContainerMetrics.class), mockIcrSender,
mock(ContainerChecksumTreeManager.class));
}
private KeyValueContainer getMockUnhealthyContainer() {
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
index 03901b99be..b8c43460ba 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
@@ -25,7 +25,7 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContai
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
@@ -35,7 +35,6 @@ import
org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
import org.junit.jupiter.api.AfterEach;
@@ -129,11 +128,8 @@ class TestGrpcReplicationService {
when(volumeSet.getVolumesList()).thenReturn(Collections.singletonList(
new HddsVolume.Builder(testDir).conf(conf).build()));
- ContainerMetrics metrics = ContainerMetrics.create(conf);
Handler containerHandler =
- new KeyValueHandler(conf, datanode.getUuidString(), containerSet,
- volumeSet, metrics, c -> {
- });
+ ContainerTestUtils.getKeyValueHandler(conf, datanode.getUuidString(),
containerSet, volumeSet);
containerController = new ContainerController(containerSet,
Collections.singletonMap(
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconcileContainerEventHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconcileContainerEventHandler.java
index f13b37f3ee..8e8b5bf71c 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconcileContainerEventHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconcileContainerEventHandler.java
@@ -32,7 +32,6 @@ import
org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@@ -78,9 +77,9 @@ public class ReconcileContainerEventHandler implements
EventHandler<ContainerID>
LOG.info("Reconcile container event triggered for container {} with
peers {}", containerID, allReplicaNodes);
for (DatanodeDetails replica : allReplicaNodes) {
- List<DatanodeDetails> otherReplicas = allReplicaNodes.stream()
+ Set<DatanodeDetails> otherReplicas = allReplicaNodes.stream()
.filter(other -> !other.equals(replica))
- .collect(Collectors.toList());
+ .collect(Collectors.toSet());
ReconcileContainerCommand command = new
ReconcileContainerCommand(containerID.getId(), otherReplicas);
command.setTerm(scmContext.getTermOfLeader());
publisher.fireEvent(DATANODE_COMMAND, new
CommandForDatanode<>(replica.getUuid(), command));
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
index 068cb01a96..2d1b1a4bb0 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
@@ -21,7 +21,6 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
@@ -48,7 +47,6 @@ import
org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import
org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
import
org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
@@ -148,18 +146,7 @@ public class TestContainerMetrics {
ContainerSet containerSet = new ContainerSet(1000);
StateContext context = ContainerTestUtils.getMockContext(
dd, CONF);
- ContainerMetrics metrics = ContainerMetrics.create(CONF);
- Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap();
- for (ContainerProtos.ContainerType containerType :
- ContainerProtos.ContainerType.values()) {
- handlers.put(containerType,
- Handler.getHandlerForContainerType(containerType, CONF,
- context.getParent().getDatanodeDetails().getUuidString(),
- containerSet, volumeSet, metrics,
- c -> { }));
- }
- HddsDispatcher dispatcher = new HddsDispatcher(CONF, containerSet,
- volumeSet, handlers, context, metrics, null);
+ HddsDispatcher dispatcher = ContainerTestUtils.getHddsDispatcher(CONF,
containerSet, volumeSet, context);
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())
.forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile()));
dispatcher.setClusterId(UUID.randomUUID().toString());
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
index 630c4d3149..ab95467a71 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.RatisTestHelper;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@@ -200,19 +199,7 @@ public class TestContainerServer {
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())
.forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile()));
StateContext context = ContainerTestUtils.getMockContext(dd, conf);
- ContainerMetrics metrics = ContainerMetrics.create(conf);
- Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap();
- for (ContainerProtos.ContainerType containerType :
- ContainerProtos.ContainerType.values()) {
- handlers.put(containerType,
- Handler.getHandlerForContainerType(containerType, conf,
- dd.getUuid().toString(),
- containerSet, volumeSet, metrics,
- c -> {
- }));
- }
- HddsDispatcher hddsDispatcher = new HddsDispatcher(
- conf, containerSet, volumeSet, handlers, context, metrics, null);
+ HddsDispatcher hddsDispatcher = ContainerTestUtils.getHddsDispatcher(conf,
containerSet, volumeSet, context);
hddsDispatcher.setClusterId(scmId.toString());
return hddsDispatcher;
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
index 8044685bb7..47be4daf90 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
@@ -25,7 +25,6 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -57,11 +56,9 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.RatisTestHelper;
import org.apache.hadoop.ozone.client.SecretKeyTestClient;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import
org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
import
org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
@@ -183,18 +180,7 @@ public class TestSecureContainerServer {
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())
.forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile()));
StateContext context = ContainerTestUtils.getMockContext(dd, conf);
- ContainerMetrics metrics = ContainerMetrics.create(conf);
- Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap();
- for (ContainerProtos.ContainerType containerType :
- ContainerProtos.ContainerType.values()) {
- handlers.put(containerType,
- Handler.getHandlerForContainerType(containerType, conf,
- dd.getUuid().toString(),
- containerSet, volumeSet, metrics,
- c -> { }));
- }
- HddsDispatcher hddsDispatcher = new HddsDispatcher(
- conf, containerSet, volumeSet, handlers, context, metrics,
+ HddsDispatcher hddsDispatcher = ContainerTestUtils.getHddsDispatcher(conf,
containerSet, volumeSet, context,
TokenVerifier.create(new SecurityConfig(conf), secretKeyClient));
hddsDispatcher.setClusterId(scmId.toString());
return hddsDispatcher;
diff --git
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
index 5592926bf8..a0aba2a1b1 100644
---
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
+++
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerCommands.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.helpers.DatanodeVersionFile;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
@@ -151,7 +152,9 @@ public class ContainerCommands implements Callable<Void>,
SubcommandWithParent {
volumeSet,
metrics,
containerReplicaProto -> {
- });
+ },
+ // Since this is an Ozone debug CLI, this instance is not part
of a running datanode.
+ new ContainerChecksumTreeManager(conf));
handler.setClusterID(clusterId);
handlers.put(containerType, handler);
}
diff --git
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
index d471c13462..0c525457aa 100644
---
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
+++
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
@@ -193,7 +194,9 @@ public class ClosedContainerReplicator extends
BaseFreonGenerator implements
volumeSet,
metrics,
containerReplicaProto -> {
- });
+ },
+ // Since this a Freon tool, this instance is not part of a
running datanode.
+ new ContainerChecksumTreeManager(conf));
handler.setClusterID(UUID.randomUUID().toString());
handlers.put(containerType, handler);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]