This is an automated email from the ASF dual-hosted git repository.
tomscut pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8b564569f18 HDFS-17818. Fix serial fsimage transfer during checkpoint
with multiple namenodes (#7862)
8b564569f18 is described below
commit 8b564569f18906a11f91ab3f55a467edc36e9adf
Author: caozhiqiang <[email protected]>
AuthorDate: Fri Nov 28 17:48:15 2025 +0800
HDFS-17818. Fix serial fsimage transfer during checkpoint with multiple
namenodes (#7862)
Reviewed-by: Tao Li <[email protected]>
Signed-off-by: Tao Li <[email protected]>
---
.../java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 3 ++
.../hdfs/server/namenode/CheckpointConf.java | 13 +++++
.../server/namenode/ha/StandbyCheckpointer.java | 3 +-
.../src/main/resources/hdfs-default.xml | 11 ++++
.../server/namenode/ha/TestStandbyCheckpoints.java | 62 +++++++++++++++++++++-
5 files changed, 90 insertions(+), 2 deletions(-)
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index c72bc50c0fb..df9e3907bda 100755
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -259,6 +259,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT = 1000000;
public static final String DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_KEY =
"dfs.namenode.checkpoint.max-retries";
public static final int DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_DEFAULT = 3;
+ public static final String
DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_KEY =
+ "dfs.namenode.checkpoint.parallel.upload.enabled";
+ public static final boolean
DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_DEFAULT = false;
public static final String
DFS_NAMENODE_MISSING_CHECKPOINT_PERIODS_BEFORE_SHUTDOWN_KEY =
"dfs.namenode.missing.checkpoint.periods.before.shutdown";
public static final int
DFS_NAMENODE_MISSING_CHECKPOINT_PERIODS_BEFORE_SHUTDOWN_DEFAULT = 3;
public static final String DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY =
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java
index 4df170d7716..a5f8049c950 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java
@@ -54,6 +54,12 @@ public class CheckpointConf {
*/
private double quietMultiplier;
+ /**
+ * Whether enable the standby namenode to upload fsiamge to multiple other
namenodes in
+ * parallel, in the cluster with observer namenodes.
+ */
+ private final boolean parallelUploadEnabled;
+
public CheckpointConf(Configuration conf) {
checkpointCheckPeriod = conf.getTimeDuration(
DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY,
@@ -68,6 +74,9 @@ public CheckpointConf(Configuration conf) {
legacyOivImageDir = conf.get(DFS_NAMENODE_LEGACY_OIV_IMAGE_DIR_KEY);
quietMultiplier =
conf.getDouble(DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_KEY,
DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_DEFAULT);
+ parallelUploadEnabled = conf.getBoolean(
+ DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_KEY,
+ DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_DEFAULT);
warnForDeprecatedConfigs(conf);
}
@@ -106,4 +115,8 @@ public String getLegacyOivImageDir() {
public double getQuietPeriod() {
return this.checkpointPeriod * this.quietMultiplier;
}
+
+ public boolean isParallelUploadEnabled() {
+ return parallelUploadEnabled;
+ }
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
index e93a384c99d..0488a17886c 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
@@ -249,7 +249,8 @@ private void doCheckpoint() throws InterruptedException,
IOException {
// Do this in a separate thread to avoid blocking transition to active,
but don't allow more
// than the expected number of tasks to run or queue up
// See HDFS-4816
- ExecutorService executor = new ThreadPoolExecutor(0,
activeNNAddresses.size(), 100,
+ int poolSize = checkpointConf.isParallelUploadEnabled() ?
activeNNAddresses.size() : 0;
+ ExecutorService executor = new ThreadPoolExecutor(poolSize,
activeNNAddresses.size(), 100,
TimeUnit.MILLISECONDS, new
LinkedBlockingQueue<Runnable>(activeNNAddresses.size()),
uploadThreadFactory);
// for right now, just match the upload to the nn address by convention.
There is no need to
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index b9d8b67dc12..2b889dd2adc 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1397,6 +1397,17 @@
</description>
</property>
+<property>
+ <name>dfs.namenode.checkpoint.parallel.upload.enabled</name>
+ <value>false</value>
+ <description>
+ If true, the CheckpointNode will upload the checkpoint image to multiple
other
+ NameNodes in parallel, in the cluster with observer namenodes. You should
+ make sure the network bandwidth is sufficient.
+ If false, the fsimage will be uploaded serially to multiple namenodes.
+ </description>
+</property>
+
<property>
<name>dfs.namenode.checkpoint.check.quiet-multiplier</name>
<value>1.5</value>
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
index e010193bf3e..25926f695af 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
@@ -484,7 +484,67 @@ public Boolean get() {
// Assert that former active did not accept the canceled checkpoint file.
assertEquals(0, nns[0].getFSImage().getMostRecentCheckpointTxId());
}
-
+
+ /**
+ * Test standby namenode upload fsiamge to multiple other namenodes in
parallel, in the
+ * cluster with observer namenodes.
+ */
+ @Test
+ @Timeout(value = 300)
+ public void testCheckpointParallelUpload() throws Exception {
+ // Set dfs.namenode.checkpoint.txns differently on the first NN to avoid it
+ // doing checkpoint when it becomes a standby
+ cluster.getConfiguration(0).setInt(
+ DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 1000);
+
+ // don't compress, we want a big image
+ for (int i = 0; i < NUM_NNS; i++) {
+ cluster.getConfiguration(i).setBoolean(
+ DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, false);
+ }
+
+ // Throttle SBN upload to make it hang during upload to ANN, and enable
parallel upload fsimage.
+ for (int i = 1; i < NUM_NNS; i++) {
+ cluster.getConfiguration(i).setLong(
+ DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY, 100);
+ cluster.getConfiguration(i).setBoolean(
+ DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_KEY,
true);
+ }
+ for (int i = 0; i < NUM_NNS; i++) {
+ cluster.restartNameNode(i);
+ }
+
+ // update references to each of the nns
+ setNNs();
+
+ cluster.transitionToActive(0);
+
+ doEdits(0, 100);
+
+ for (int i = 1; i < NUM_NNS; i++) {
+ HATestUtil.waitForStandbyToCatchUp(nns[0], nns[i]);
+ HATestUtil.waitForCheckpoint(cluster, i, ImmutableList.of(104));
+ }
+ cluster.transitionToStandby(0);
+ cluster.transitionToActive(1);
+
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ int transferThreadCount = 0;
+ ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+ ThreadInfo[] threads = threadBean.getThreadInfo(
+ threadBean.getAllThreadIds(), 1);
+ for (ThreadInfo thread: threads) {
+ if (thread.getThreadName().startsWith("TransferFsImageUpload")) {
+ transferThreadCount++;
+ }
+ }
+ return transferThreadCount == NUM_NNS - 1;
+ }
+ }, 1000, 30000);
+ }
+
/**
* Make sure that clients will receive StandbyExceptions even when a
* checkpoint is in progress on the SBN, and therefore the
StandbyCheckpointer
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]