This is an automated email from the ASF dual-hosted git repository.
sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 60215826fd HDDS-9883. Recon - Improve the performance of processing
IncrementalContainerReport from DN (#5793)
60215826fd is described below
commit 60215826fd082563ae74a526f04c24eb8d3865b4
Author: Devesh Kumar Singh <[email protected]>
AuthorDate: Thu Jan 4 20:27:52 2024 +0530
HDDS-9883. Recon - Improve the performance of processing
IncrementalContainerReport from DN (#5793)
---
.../org/apache/hadoop/ozone/OzoneConfigKeys.java | 8 ++++
.../common/src/main/resources/ozone-default.xml | 26 ++++++++++++
.../hdds/scm/server/ContainerReportQueue.java | 8 ++++
.../scm/server/SCMDatanodeHeartbeatDispatcher.java | 13 ++++++
.../hadoop/ozone/TestOzoneConfigurationFields.java | 5 ++-
.../hadoop/ozone/recon/ReconServerConfigKeys.java | 17 ++++++++
.../org/apache/hadoop/ozone/recon/ReconUtils.java | 29 +++++++++++++
.../ozone/recon/scm/ReconContainerManager.java | 4 ++
.../ozone/recon/scm/ReconContainerReportQueue.java | 47 ++++++++++++++++++++++
.../ReconIncrementalContainerReportHandler.java | 33 +++++++++------
.../scm/ReconStorageContainerManagerFacade.java | 29 ++++++++++++-
...TestReconIncrementalContainerReportHandler.java | 15 ++++++-
12 files changed, 218 insertions(+), 16 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index f124e24141..21c89cc3c8 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -668,6 +668,14 @@ public final class OzoneConfigKeys {
public static final String OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION =
"ozone.scm.close.container.wait.duration";
+ public static final String HDDS_SCM_CLIENT_RPC_TIME_OUT =
+ "hdds.scmclient.rpc.timeout";
+ public static final String HDDS_SCM_CLIENT_MAX_RETRY_TIMEOUT =
+ "hdds.scmclient.max.retry.timeout";
+ public static final String HDDS_SCM_CLIENT_FAILOVER_MAX_RETRY =
+ "hdds.scmclient.failover.max.retry";
+
+
/**
* There is no need to instantiate this class.
*/
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index c70a9630f0..079362f916 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3118,6 +3118,32 @@
SCM snapshot.
</description>
</property>
+ <property>
+ <name>ozone.recon.scmclient.rpc.timeout</name>
+ <value>1m</value>
+ <tag>OZONE, RECON, SCM</tag>
+ <description>
+ RpcClient timeout on waiting for the response from SCM when Recon
connects to SCM.
+ </description>
+ </property>
+ <property>
+ <name>ozone.recon.scmclient.max.retry.timeout</name>
+ <value>6s</value>
+ <tag>OZONE, RECON, SCM</tag>
+ <description>
+ Max retry timeout for SCM Client when Recon connects to SCM. This config
is used to
+ dynamically compute the max retry count for SCM Client when failover
happens. Check the
+ SCMClientConfig class getRetryCount method.
+ </description>
+ </property>
+ <property>
+ <name>ozone.recon.scmclient.failover.max.retry</name>
+ <value>3</value>
+ <tag>OZONE, RECON, SCM</tag>
+ <description>
+ Max retry count for SCM Client when failover happens.
+ </description>
+ </property>
<property>
<name>ozone.recon.om.socket.timeout</name>
<value>5s</value>
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java
index b08b525a86..bffddff87b 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java
@@ -112,6 +112,9 @@ public class ContainerReportQueue
// 2. Add ICR report or merge to previous ICR
List<ContainerReport> dataList = dataMap.get(uuidString);
+ if (mergeIcr(val, dataList)) {
+ return true;
+ }
dataList.add(val);
++capacity;
orderingQueue.add(uuidString);
@@ -375,4 +378,9 @@ public class ContainerReportQueue
}
return 0;
}
+
+ protected boolean mergeIcr(ContainerReport val,
+ List<ContainerReport> dataList) {
+ return false;
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index aaadbbbcb9..38db618ef5 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -279,6 +279,7 @@ public final class SCMDatanodeHeartbeatDispatcher {
public interface ContainerReport {
DatanodeDetails getDatanodeDetails();
ContainerReportType getType();
+ void mergeReport(ContainerReport val);
}
/**
@@ -334,6 +335,9 @@ public final class SCMDatanodeHeartbeatDispatcher {
return getDatanodeDetails().toString() + ", {type: " + getType()
+ ", size: " + getReport().getReportsList().size() + "}";
}
+
+ @Override
+ public void mergeReport(ContainerReport nextReport) { }
}
/**
@@ -374,6 +378,15 @@ public final class SCMDatanodeHeartbeatDispatcher {
return getDatanodeDetails().toString() + ", {type: " + getType()
+ ", size: " + getReport().getReportList().size() + "}";
}
+
+ @Override
+ public void mergeReport(ContainerReport nextReport) {
+ if (nextReport.getType() == ContainerReportType.ICR) {
+ getReport().getReportList().addAll(
+ ((ReportFromDatanode<IncrementalContainerReportProto>) nextReport)
+ .getReport().getReportList());
+ }
+ }
}
/**
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index cb29d61e1a..1a437be813 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@ -141,7 +141,10 @@ public class TestOzoneConfigurationFields extends
TestConfigurationFieldsBase {
ScmConfigKeys.OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY,
ScmConfigKeys.OZONE_SCM_HA_PREFIX,
S3GatewayConfigKeys.OZONE_S3G_FSO_DIRECTORY_CREATION_ENABLED,
- HddsConfigKeys.HDDS_DATANODE_VOLUME_MIN_FREE_SPACE_PERCENT
+ HddsConfigKeys.HDDS_DATANODE_VOLUME_MIN_FREE_SPACE_PERCENT,
+ OzoneConfigKeys.HDDS_SCM_CLIENT_RPC_TIME_OUT,
+ OzoneConfigKeys.HDDS_SCM_CLIENT_MAX_RETRY_TIMEOUT,
+ OzoneConfigKeys.HDDS_SCM_CLIENT_FAILOVER_MAX_RETRY
));
}
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
index b3c601c4c1..ab87bda441 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
@@ -168,6 +168,23 @@ public final class ReconServerConfigKeys {
public static final String
OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT = "1m";
+
+ public static final String OZONE_RECON_SCM_CLIENT_RPC_TIME_OUT_KEY =
+ "ozone.recon.scmclient.rpc.timeout";
+
+ public static final String OZONE_RECON_SCM_CLIENT_RPC_TIME_OUT_DEFAULT =
"1m";
+
+ public static final String OZONE_RECON_SCM_CLIENT_MAX_RETRY_TIMEOUT_KEY =
+ "ozone.recon.scmclient.max.retry.timeout";
+
+ public static final String OZONE_RECON_SCM_CLIENT_MAX_RETRY_TIMEOUT_DEFAULT =
+ "6s";
+
+ public static final String OZONE_RECON_SCM_CLIENT_FAILOVER_MAX_RETRY_KEY =
+ "ozone.recon.scmclient.failover.max.retry";
+
+ public static final int
+ OZONE_RECON_SCM_CLIENT_FAILOVER_MAX_RETRY_DEFAULT = 3;
/**
* Private constructor for utility class.
*/
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
index 0d0c57fbe3..c548561073 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
@@ -29,6 +29,9 @@ import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
import com.google.common.base.Preconditions;
import com.google.inject.Singleton;
@@ -36,7 +39,9 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmUtils;
import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.IOUtils;
@@ -44,6 +49,9 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_CONTAINER_REPORT_QUEUE_SIZE_DEFAULT;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_THREAD_POOL_SIZE_DEFAULT;
import static org.apache.hadoop.hdds.server.ServerUtils.getDirectoryFromConfig;
import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DB_DIR;
@@ -51,9 +59,11 @@ import static org.jooq.impl.DSL.currentTimestamp;
import static org.jooq.impl.DSL.select;
import static org.jooq.impl.DSL.using;
+import org.apache.hadoop.ozone.recon.scm.ReconContainerReportQueue;
import
org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
import org.hadoop.ozone.recon.schema.tables.pojos.GlobalStats;
+import org.jetbrains.annotations.NotNull;
import org.jooq.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,6 +86,25 @@ public class ReconUtils {
return new ReconUtils().getReconDbDir(conf, OZONE_RECON_SCM_DB_DIR);
}
+ @NotNull
+ public static List<BlockingQueue<SCMDatanodeHeartbeatDispatcher
+ .ContainerReport>> initContainerReportQueue(
+ OzoneConfiguration configuration) {
+ int threadPoolSize =
+ configuration.getInt(ScmUtils.getContainerReportConfPrefix()
+ + ".thread.pool.size",
+ OZONE_SCM_EVENT_THREAD_POOL_SIZE_DEFAULT);
+ int queueSize = configuration.getInt(
+ ScmUtils.getContainerReportConfPrefix() + ".queue.size",
+ OZONE_SCM_EVENT_CONTAINER_REPORT_QUEUE_SIZE_DEFAULT);
+ List<BlockingQueue<SCMDatanodeHeartbeatDispatcher.ContainerReport>> queues
=
+ new ArrayList<>();
+ for (int i = 0; i < threadPoolSize; ++i) {
+ queues.add(new ReconContainerReportQueue(queueSize));
+ }
+ return queues;
+ }
+
/**
* Get configured Recon DB directory value based on config. If not present,
* fallback to ozone.metadata.dirs
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
index e4108c2834..9d7c88dfc4 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
@@ -459,4 +459,8 @@ public class ReconContainerManager extends
ContainerManagerImpl {
return pipelineToOpenContainer;
}
+ @VisibleForTesting
+ public StorageContainerServiceProvider getScmClient() {
+ return scmClient;
+ }
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerReportQueue.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerReportQueue.java
new file mode 100644
index 0000000000..8d5f92eda4
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerReportQueue.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.recon.scm;
+
+import org.apache.hadoop.hdds.scm.server.ContainerReportQueue;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
+import
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReport;
+
+import java.util.List;
+
+/**
+ * Customized queue to handle multiple ICR report together.
+ */
+public class ReconContainerReportQueue extends ContainerReportQueue {
+
+ public ReconContainerReportQueue(int queueSize) {
+ super(queueSize);
+ }
+
+ @Override
+ protected boolean mergeIcr(ContainerReport val,
+ List<ContainerReport> dataList) {
+ if (!dataList.isEmpty()) {
+ if (SCMDatanodeHeartbeatDispatcher.ContainerReportType.ICR
+ == dataList.get(dataList.size() - 1).getType()) {
+ dataList.get(dataList.size() - 1).mergeReport(val);
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
index 18d995d053..1f2b1d5cf2 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
@@ -24,8 +24,8 @@ import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
-import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -69,24 +69,33 @@ public class ReconIncrementalContainerReportHandler
ReconContainerManager containerManager =
(ReconContainerManager) getContainerManager();
+ try {
+ containerManager.checkAndAddNewContainerBatch(
+ report.getReport().getReportList());
+ } catch (Exception ioEx) {
+ LOG.error("Exception while checking and adding new container.", ioEx);
+ return;
+ }
boolean success = true;
for (ContainerReplicaProto replicaProto :
report.getReport().getReportList()) {
+ ContainerID id = ContainerID.valueOf(replicaProto.getContainerID());
+ ContainerInfo container = null;
try {
- final ContainerID id = ContainerID.valueOf(
- replicaProto.getContainerID());
try {
- containerManager.checkAndAddNewContainer(id, replicaProto.getState(),
- report.getDatanodeDetails());
- } catch (Exception ioEx) {
- LOG.error("Exception while checking and adding new container.",
ioEx);
- return;
+ container = getContainerManager().getContainer(id);
+ // Ensure we reuse the same ContainerID instance in containerInfo
+ id = container.containerID();
+ } finally {
+ if (replicaProto.getState().equals(
+ ContainerReplicaProto.State.DELETED)) {
+ getNodeManager().removeContainer(dd, id);
+ } else {
+ getNodeManager().addContainer(dd, id);
+ }
}
- getNodeManager().addContainer(dd, id);
processContainerReplica(dd, replicaProto, publisher);
- } catch (ContainerNotFoundException e) {
- success = false;
- LOG.warn("Container {} not found!", replicaProto.getContainerID());
+ success = true;
} catch (NodeNotFoundException ex) {
success = false;
LOG.error("Received ICR from unknown datanode {}.",
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index 464ec1a5ee..556c619419 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -99,11 +99,21 @@ import
org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
import org.apache.hadoop.ozone.recon.tasks.ContainerSizeCountTask;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig;
import com.google.inject.Inject;
+
import static
org.apache.hadoop.hdds.recon.ReconConfigKeys.RECON_SCM_CONFIG_PREFIX;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_REPORT_EXEC_WAIT_THRESHOLD_DEFAULT;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_REPORT_QUEUE_WAIT_THRESHOLD_DEFAULT;
import static
org.apache.hadoop.hdds.scm.server.StorageContainerManager.buildRpcServerStartMessage;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_SCM_CLIENT_FAILOVER_MAX_RETRY;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_SCM_CLIENT_MAX_RETRY_TIMEOUT;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_SCM_CLIENT_RPC_TIME_OUT;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CLIENT_FAILOVER_MAX_RETRY_DEFAULT;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CLIENT_FAILOVER_MAX_RETRY_KEY;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CLIENT_MAX_RETRY_TIMEOUT_DEFAULT;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CLIENT_MAX_RETRY_TIMEOUT_KEY;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CLIENT_RPC_TIME_OUT_DEFAULT;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CLIENT_RPC_TIME_OUT_KEY;
import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY;
import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DEFAULT;
@@ -182,6 +192,23 @@ public class ReconStorageContainerManagerFacade
.setSCM(this)
.build();
this.ozoneConfiguration = getReconScmConfiguration(conf);
+ long scmClientRPCTimeOut = conf.getTimeDuration(
+ OZONE_RECON_SCM_CLIENT_RPC_TIME_OUT_KEY,
+ OZONE_RECON_SCM_CLIENT_RPC_TIME_OUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ long scmClientMaxRetryTimeOut = conf.getTimeDuration(
+ OZONE_RECON_SCM_CLIENT_MAX_RETRY_TIMEOUT_KEY,
+ OZONE_RECON_SCM_CLIENT_MAX_RETRY_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ int scmClientFailOverMaxRetryCount = conf.getInt(
+ OZONE_RECON_SCM_CLIENT_FAILOVER_MAX_RETRY_KEY,
+ OZONE_RECON_SCM_CLIENT_FAILOVER_MAX_RETRY_DEFAULT);
+
+ conf.setLong(HDDS_SCM_CLIENT_RPC_TIME_OUT, scmClientRPCTimeOut);
+ conf.setLong(HDDS_SCM_CLIENT_MAX_RETRY_TIMEOUT, scmClientMaxRetryTimeOut);
+ conf.setLong(HDDS_SCM_CLIENT_FAILOVER_MAX_RETRY,
+ scmClientFailOverMaxRetryCount);
+
this.scmStorageConfig = new ReconStorageConfig(conf, reconUtils);
this.clusterMap = new NetworkTopologyImpl(conf);
this.dbStore = DBStoreBuilder
@@ -283,7 +310,7 @@ public class ReconStorageContainerManagerFacade
ScmUtils.getContainerReportConfPrefix() + ".execute.wait.threshold",
OZONE_SCM_EVENT_REPORT_EXEC_WAIT_THRESHOLD_DEFAULT);
List<BlockingQueue<ContainerReport>> queues
- = ScmUtils.initContainerReportQueue(ozoneConfiguration);
+ = ReconUtils.initContainerReportQueue(ozoneConfiguration);
List<ThreadPoolExecutor> executors
= FixedThreadPoolWithAffinityExecutor.initializeExecutorPool(
threadNamePrefix, queues);
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java
index e37226f965..00e0a56aab 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java
@@ -30,7 +30,9 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
@@ -65,18 +67,28 @@ public class TestReconIncrementalContainerReportHandler
private HDDSLayoutVersionManager versionManager;
@Test
- public void testProcessICR() throws IOException, NodeNotFoundException {
+ public void testProcessICR()
+ throws IOException, NodeNotFoundException, TimeoutException {
ContainerID containerID = ContainerID.valueOf(100L);
DatanodeDetails datanodeDetails = randomDatanodeDetails();
IncrementalContainerReportFromDatanode reportMock =
mock(IncrementalContainerReportFromDatanode.class);
when(reportMock.getDatanodeDetails()).thenReturn(datanodeDetails);
+
+ ContainerWithPipeline containerWithPipeline = getTestContainer(
+ containerID.getId(), OPEN);
+ List<ContainerWithPipeline> containerWithPipelineList = new ArrayList<>();
+ containerWithPipelineList.add(containerWithPipeline);
+ ReconContainerManager containerManager = getContainerManager();
IncrementalContainerReportProto containerReport =
getIncrementalContainerReportProto(containerID,
State.OPEN,
datanodeDetails.getUuidString());
when(reportMock.getReport()).thenReturn(containerReport);
+ when(getContainerManager().getScmClient()
+ .getExistContainerWithPipelinesInBatch(any(
+ ArrayList.class))).thenReturn(containerWithPipelineList);
final String path =
GenericTestUtils.getTempPath(UUID.randomUUID().toString());
@@ -97,7 +109,6 @@ public class TestReconIncrementalContainerReportHandler
nodeManager.register(datanodeDetails, null, null);
- ReconContainerManager containerManager = getContainerManager();
ReconIncrementalContainerReportHandler reconIcr =
new ReconIncrementalContainerReportHandler(nodeManager,
containerManager, SCMContext.emptyContext());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]