This is an automated email from the ASF dual-hosted git repository.

jojochuang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 62729b06d70 HDFS-17899. Handle InvalidEncryptionKeyException in 
Balancer Dispatcher, SPS BlockDispatcher and DataNode DataTransfer (#8383)
62729b06d70 is described below

commit 62729b06d7062e6739f8383af942ecdc0d3b1abb
Author: Zhenyu Li <[email protected]>
AuthorDate: Thu May 14 14:48:37 2026 -0400

    HDFS-17899. Handle InvalidEncryptionKeyException in Balancer Dispatcher, 
SPS BlockDispatcher and DataNode DataTransfer (#8383)
---
 .../sasl/DataEncryptionKeyFactory.java             |  10 ++
 .../hadoop/hdfs/server/balancer/Dispatcher.java    |  73 +++++++---
 .../hadoop/hdfs/server/balancer/KeyManager.java    |  18 +++
 .../hdfs/server/common/sps/BlockDispatcher.java    |  61 +++++---
 .../hadoop/hdfs/server/datanode/DataNode.java      |  51 +++++--
 .../balancer/TestDispatcherEncryptionKey.java      |  97 +++++++++++++
 .../hdfs/server/balancer/TestKeyManager.java       | 108 ++++++++++++++-
 .../server/common/sps/TestBlockDispatcher.java     | 153 +++++++++++++++++++++
 .../datanode/TestDataTransferEncryptionKey.java    |  75 ++++++++++
 9 files changed, 595 insertions(+), 51 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java
index 959cba0fb48..a1674232fd7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java
@@ -35,4 +35,14 @@ public interface DataEncryptionKeyFactory {
    * @throws IOException for any error
    */
   DataEncryptionKey newDataEncryptionKey() throws IOException;
+
+  /**
+   * Clear the cached data encryption key, so that a new key will be
+   * generated on the next call to {@link #newDataEncryptionKey()}.
+   * This is called when an InvalidEncryptionKeyException is received
+   * to force a key refresh on retry.
+   */
+  default void clearDataEncryptionKey() {
+    // no-op by default
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index acac65d7745..4fcf17f4a3b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -63,6 +63,7 @@
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import 
org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
@@ -372,29 +373,49 @@ private void dispatch() {
         LOG.info("Start moving " + this);
         assert !(reportedBlock instanceof DBlockStriped);
 
-        sock.connect(
-            NetUtils.createSocketAddr(target.getDatanodeInfo().
-                getXferAddr(Dispatcher.this.connectToDnViaHostname)),
-                HdfsConstants.READ_TIMEOUT);
-
-        // Set read timeout so that it doesn't hang forever against
-        // unresponsive nodes. Datanode normally sends IN_PROGRESS response
-        // twice within the client read timeout period (every 30 seconds by
-        // default). Here, we make it give up after 5 minutes of no response.
-        sock.setSoTimeout(HdfsConstants.READ_TIMEOUT * 5);
-        sock.setKeepAlive(true);
-
-        OutputStream unbufOut = sock.getOutputStream();
-        InputStream unbufIn = sock.getInputStream();
         ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
             reportedBlock.getBlock());
-        final KeyManager km = nnc.getKeyManager(); 
-        Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb,
-            new StorageType[]{target.storageType}, new String[0]);
-        IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
-            unbufIn, km, accessToken, target.getDatanodeInfo());
-        unbufOut = saslStreams.out;
-        unbufIn = saslStreams.in;
+        final KeyManager km = nnc.getKeyManager();
+        Token<BlockTokenIdentifier> accessToken = null;
+        OutputStream unbufOut;
+        InputStream unbufIn;
+        int encryptionKeyRetryCount = 0;
+        while (true) {
+          try {
+            accessToken = km.getAccessToken(eb,
+                new StorageType[]{target.storageType}, new String[0]);
+            sock.connect(
+                NetUtils.createSocketAddr(target.getDatanodeInfo().
+                    getXferAddr(Dispatcher.this.connectToDnViaHostname)),
+                    HdfsConstants.READ_TIMEOUT);
+
+            // Set read timeout so that it doesn't hang forever against
+            // unresponsive nodes. Datanode normally sends IN_PROGRESS
+            // response twice within the client read timeout period (every
+            // 30 seconds by default). Here, we make it give up after 5
+            // minutes of no response.
+            sock.setSoTimeout(HdfsConstants.READ_TIMEOUT * 5);
+            sock.setKeepAlive(true);
+
+            unbufOut = sock.getOutputStream();
+            unbufIn = sock.getInputStream();
+            IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+                unbufIn, km, accessToken, target.getDatanodeInfo());
+            unbufOut = saslStreams.out;
+            unbufIn = saslStreams.in;
+            break;
+          } catch (InvalidEncryptionKeyException e) {
+            IOUtils.closeSocket(sock);
+            if (!prepareRetryAfterInvalidEncryptionKey(km,
+                ++encryptionKeyRetryCount)) {
+              throw e;
+            }
+            LOG.info("Retrying connection to {} for block {} after "
+                + "InvalidEncryptionKeyException",
+                target.getDatanodeInfo(), reportedBlock.getBlock(), e);
+            sock = new Socket();
+          }
+        }
         out = new DataOutputStream(new BufferedOutputStream(unbufOut,
             ioFileBufferSize));
         in = new DataInputStream(new BufferedInputStream(unbufIn,
@@ -484,6 +505,16 @@ private void reset() {
     }
   }
 
+  private static boolean prepareRetryAfterInvalidEncryptionKey(KeyManager km,
+      int retryCount) throws IOException {
+    if (retryCount > 1) {
+      return false;
+    }
+    km.updateBlockKeys();
+    km.clearDataEncryptionKey();
+    return true;
+  }
+
   /** A class for keeping track of block locations in the dispatcher. */
   public static class DBlock extends MovedBlocks.Locations<StorageGroup> {
     public DBlock(Block block) {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
index 5644ef7d7da..3c0ed9a7eee 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java
@@ -148,6 +148,24 @@ public DataEncryptionKey newDataEncryptionKey() {
     }
   }
 
+  /**
+   * Clear the cached data encryption key, so that a new key will be generated
+   * on the next call to {@link #newDataEncryptionKey()}.
+   */
+  @Override
+  public synchronized void clearDataEncryptionKey() {
+    LOG.debug("Clearing data encryption key");
+    encryptionKey = null;
+  }
+
+  /** Update block keys from the NameNode. */
+  public void updateBlockKeys() throws IOException {
+    if (isBlockTokenEnabled) {
+      LOG.debug("Updating block keys from NameNode");
+      blockTokenSecretManager.addKeys(namenode.getBlockKeys());
+    }
+  }
+
   @Override
   public void close() {
     shouldRun = false;
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java
index f7756c74851..95e66112f5b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockDispatcher.java
@@ -27,16 +27,19 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.InetSocketAddress;
 import java.net.Socket;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import 
org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
 import 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
@@ -110,25 +113,42 @@ public BlockMovementStatus moveBlock(BlockMovingInfo 
blkMovingInfo,
     DataOutputStream out = null;
     DataInputStream in = null;
     try {
-      NetUtils.connect(sock,
-          NetUtils.createSocketAddr(
-              blkMovingInfo.getTarget().getXferAddr(connectToDnViaHostname)),
-          socketTimeout);
-      // Set read timeout so that it doesn't hang forever against
-      // unresponsive nodes. Datanode normally sends IN_PROGRESS response
-      // twice within the client read timeout period (every 30 seconds by
-      // default). Here, we make it give up after "socketTimeout * 5" period
-      // of no response.
-      sock.setSoTimeout(socketTimeout * 5);
-      sock.setKeepAlive(true);
-      OutputStream unbufOut = sock.getOutputStream();
-      InputStream unbufIn = sock.getInputStream();
-      LOG.debug("Connecting to datanode {}", blkMovingInfo.getTarget());
+      InetSocketAddress targetAddr = NetUtils.createSocketAddr(
+          blkMovingInfo.getTarget().getXferAddr(connectToDnViaHostname));
+      OutputStream unbufOut;
+      InputStream unbufIn;
+      int encryptionKeyRetryCount = 0;
+      while (true) {
+        try {
+          NetUtils.connect(sock, targetAddr, socketTimeout);
+          // Set read timeout so that it doesn't hang forever against
+          // unresponsive nodes. Datanode normally sends IN_PROGRESS response
+          // twice within the client read timeout period (every 30 seconds by
+          // default). Here, we make it give up after "socketTimeout * 5"
+          // period of no response.
+          sock.setSoTimeout(socketTimeout * 5);
+          sock.setKeepAlive(true);
+          unbufOut = sock.getOutputStream();
+          unbufIn = sock.getInputStream();
+          LOG.debug("Connecting to datanode {}", blkMovingInfo.getTarget());
 
-      IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
-          unbufIn, km, accessToken, blkMovingInfo.getTarget());
-      unbufOut = saslStreams.out;
-      unbufIn = saslStreams.in;
+          IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+              unbufIn, km, accessToken, blkMovingInfo.getTarget());
+          unbufOut = saslStreams.out;
+          unbufIn = saslStreams.in;
+          break;
+        } catch (InvalidEncryptionKeyException e) {
+          IOUtils.closeSocket(sock);
+          if (++encryptionKeyRetryCount > 1) {
+            throw e;
+          }
+          LOG.info("Retrying connection to {} for block {} after "
+              + "InvalidEncryptionKeyException",
+              blkMovingInfo.getTarget(), blkMovingInfo.getBlock(), e);
+          km.clearDataEncryptionKey();
+          sock = newSocket();
+        }
+      }
       out = new DataOutputStream(
           new BufferedOutputStream(unbufOut, ioFileBufferSize));
       in = new DataInputStream(
@@ -175,4 +195,9 @@ private static void receiveResponse(DataInputStream in) 
throws IOException {
     String logInfo = "reportedBlock move is failed";
     DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
   }
+
+  @VisibleForTesting
+  Socket newSocket() {
+    return new Socket();
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 7ee9a4b1729..2faa9c79a6e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -192,6 +192,7 @@
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import 
org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
@@ -3063,10 +3064,6 @@ public void run() {
         final String dnAddr = targets[0].getXferAddr(connectToDnViaHostname);
         InetSocketAddress curTarget = NetUtils.createSocketAddr(dnAddr);
         LOG.debug("Connecting to datanode {}", dnAddr);
-        sock = newSocket();
-        NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
-        sock.setTcpNoDelay(dnConf.getDataTransferServerTcpNoDelay());
-        sock.setSoTimeout(targets.length * dnConf.socketTimeout);
 
         //
         // Header info
@@ -3077,15 +3074,38 @@ public void run() {
 
         long writeTimeout = dnConf.socketWriteTimeout + 
                             HdfsConstants.WRITE_TIMEOUT_EXTENSION * 
(targets.length-1);
-        OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
-        InputStream unbufIn = NetUtils.getInputStream(sock);
         DataEncryptionKeyFactory keyFactory =
           getDataEncryptionKeyFactoryForBlock(b);
-        IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
-          unbufIn, keyFactory, accessToken, bpReg);
-        unbufOut = saslStreams.out;
-        unbufIn = saslStreams.in;
-        
+        OutputStream unbufOut;
+        InputStream unbufIn;
+        int encryptionKeyRetryCount = 0;
+        while (true) {
+          try {
+            sock = newSocket();
+            NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
+            sock.setTcpNoDelay(dnConf.getDataTransferServerTcpNoDelay());
+            sock.setSoTimeout(targets.length * dnConf.socketTimeout);
+
+            unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
+            unbufIn = NetUtils.getInputStream(sock);
+            IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+                unbufIn, keyFactory, accessToken, bpReg);
+            unbufOut = saslStreams.out;
+            unbufIn = saslStreams.in;
+            break;
+          } catch (InvalidEncryptionKeyException e) {
+            IOUtils.closeSocket(sock);
+            sock = null;
+            if (!prepareRetryAfterInvalidEncryptionKey(keyFactory,
+                ++encryptionKeyRetryCount)) {
+              throw e;
+            }
+            LOG.info("Retrying connection to {} for block {} after "
+                + "InvalidEncryptionKeyException",
+                curTarget, b, e);
+          }
+        }
+
         out = new DataOutputStream(new BufferedOutputStream(unbufOut,
             DFSUtilClient.getSmallBufferSize(getConf())));
         in = new DataInputStream(unbufIn);
@@ -3149,6 +3169,15 @@ public String toString() {
     }
   }
 
+  private static boolean prepareRetryAfterInvalidEncryptionKey(
+      DataEncryptionKeyFactory keyFactory, int retryCount) {
+    if (retryCount > 1) {
+      return false;
+    }
+    keyFactory.clearDataEncryptionKey();
+    return true;
+  }
+
   /***
    * Use BlockTokenSecretManager to generate block token for current user.
    */
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestDispatcherEncryptionKey.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestDispatcherEncryptionKey.java
new file mode 100644
index 00000000000..27356405a62
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestDispatcherEncryptionKey.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.balancer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/**
+ * Test Dispatcher handling of InvalidEncryptionKeyException.
+ */
+@Timeout(120)
+public class TestDispatcherEncryptionKey {
+
+  /**
+   * Verify that the dispatcher refreshes the block keys and clears the cached
+   * data encryption key before retrying InvalidEncryptionKeyException.
+   */
+  @Test
+  public void testClearEncryptionKeyOnRetry() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    CountingKeyManager km = new CountingKeyManager(conf);
+
+    assertTrue(prepareRetryAfterInvalidEncryptionKey(km, 1));
+    assertEquals(1, km.updateBlockKeysCount);
+    assertEquals(1, km.clearKeyCount);
+
+    assertFalse(prepareRetryAfterInvalidEncryptionKey(km, 2));
+    assertEquals(1, km.updateBlockKeysCount);
+    assertEquals(1, km.clearKeyCount);
+  }
+
+  private static boolean prepareRetryAfterInvalidEncryptionKey(KeyManager km,
+      int retryCount) throws Exception {
+    Method method = Dispatcher.class.getDeclaredMethod(
+        "prepareRetryAfterInvalidEncryptionKey", KeyManager.class, int.class);
+    method.setAccessible(true);
+    return (boolean) method.invoke(null, km, retryCount);
+  }
+
+  private static final class CountingKeyManager extends KeyManager {
+    private int updateBlockKeysCount;
+    private int clearKeyCount;
+
+    private CountingKeyManager(Configuration conf) throws Exception {
+      super("bp-test", createNamenode(), false, conf);
+    }
+
+    @Override
+    public void updateBlockKeys() {
+      updateBlockKeysCount++;
+    }
+
+    @Override
+    public synchronized void clearDataEncryptionKey() {
+      clearKeyCount++;
+    }
+  }
+
+  private static NamenodeProtocol createNamenode() {
+    return (NamenodeProtocol) Proxy.newProxyInstance(
+        NamenodeProtocol.class.getClassLoader(),
+        new Class<?>[]{NamenodeProtocol.class},
+        (proxy, method, args) -> {
+          if ("getBlockKeys".equals(method.getName())) {
+            return ExportedBlockKeys.DUMMY_KEYS;
+          }
+          throw new UnsupportedOperationException(method.getName());
+        });
+  }
+
+}
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestKeyManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestKeyManager.java
index 3d60a73510b..9d8cbc93049 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestKeyManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestKeyManager.java
@@ -17,11 +17,15 @@
  */
 package org.apache.hadoop.hdfs.server.balancer;
 
+import java.lang.reflect.Proxy;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.test.Whitebox;
 import org.apache.hadoop.util.FakeTimer;
@@ -31,6 +35,9 @@
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -82,4 +89,103 @@ public void testNewDataEncryptionKey() throws Exception {
     assertTrue(dekAfterExpiration.expiryDate > fakeTimer.now(),
         "KeyManager has an expired DataEncryptionKey!");
   }
-}
\ No newline at end of file
+
+  @Test
+  public void testClearDataEncryptionKey() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
+    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+
+    final long keyUpdateInterval = 2 * 1000;
+    final long tokenLifeTime = keyUpdateInterval;
+    final String blockPoolId = "bp-foo";
+    FakeTimer fakeTimer = new FakeTimer();
+    BlockTokenSecretManager btsm = new BlockTokenSecretManager(
+        keyUpdateInterval, tokenLifeTime, 0, 1, blockPoolId, null, false);
+    Whitebox.setInternalState(btsm, "timer", fakeTimer);
+
+    NamenodeProtocol namenode = createNamenode(btsm.exportKeys(), null);
+
+    KeyManager keyManager = new KeyManager(blockPoolId, namenode,
+        true, conf);
+    Whitebox.setInternalState(keyManager, "timer", fakeTimer);
+    Whitebox.setInternalState(
+        Whitebox.getInternalState(keyManager, "blockTokenSecretManager"),
+        "timer", fakeTimer);
+
+    // Get initial encryption key
+    final DataEncryptionKey dek1 = keyManager.newDataEncryptionKey();
+    assertNotNull(dek1, "Encryption key should not be null");
+
+    // Same cached key should be returned when not expired
+    final DataEncryptionKey dek1Again = keyManager.newDataEncryptionKey();
+    assertSame(dek1, dek1Again,
+        "Should return cached key when not expired");
+
+    // Clear the cached encryption key
+    keyManager.clearDataEncryptionKey();
+
+    // After clearing, a new key should be generated
+    final DataEncryptionKey dek2 = keyManager.newDataEncryptionKey();
+    assertNotNull(dek2, "New encryption key should not be null");
+    assertNotSame(dek1, dek2,
+        "Should generate a new key after clearing cached key");
+    assertTrue(dek2.expiryDate > fakeTimer.now(),
+        "New encryption key should not be expired");
+  }
+
+  @Test
+  public void testUpdateBlockKeysThenClearDataEncryptionKey()
+      throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
+    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+
+    final long keyUpdateInterval = 2 * 1000;
+    final long tokenLifeTime = keyUpdateInterval;
+    final String blockPoolId = "bp-foo";
+    FakeTimer fakeTimer = new FakeTimer();
+    BlockTokenSecretManager btsm = new BlockTokenSecretManager(
+        keyUpdateInterval, tokenLifeTime, 0, 1, blockPoolId, null, false);
+    Whitebox.setInternalState(btsm, "timer", fakeTimer);
+
+    AtomicInteger getBlockKeysCount = new AtomicInteger();
+    NamenodeProtocol namenode =
+        createNamenode(btsm.exportKeys(), getBlockKeysCount);
+
+    KeyManager keyManager = new KeyManager(blockPoolId, namenode,
+        true, conf);
+    Whitebox.setInternalState(keyManager, "timer", fakeTimer);
+    Whitebox.setInternalState(
+        Whitebox.getInternalState(keyManager, "blockTokenSecretManager"),
+        "timer", fakeTimer);
+
+    final DataEncryptionKey dek1 = keyManager.newDataEncryptionKey();
+    assertNotNull(dek1, "Encryption key should not be null");
+
+    keyManager.updateBlockKeys();
+    keyManager.clearDataEncryptionKey();
+
+    final DataEncryptionKey dek2 = keyManager.newDataEncryptionKey();
+    assertNotNull(dek2, "New encryption key should not be null");
+    assertNotSame(dek1, dek2,
+        "Should generate a new key after updating block keys and clearing");
+    assertEquals(2, getBlockKeysCount.get());
+  }
+
+  private static NamenodeProtocol createNamenode(ExportedBlockKeys blockKeys,
+      AtomicInteger getBlockKeysCount) {
+    return (NamenodeProtocol) Proxy.newProxyInstance(
+        NamenodeProtocol.class.getClassLoader(),
+        new Class<?>[]{NamenodeProtocol.class},
+        (proxy, method, args) -> {
+          if ("getBlockKeys".equals(method.getName())) {
+            if (getBlockKeysCount != null) {
+              getBlockKeysCount.incrementAndGet();
+            }
+            return blockKeys;
+          }
+          throw new UnsupportedOperationException(method.getName());
+        });
+  }
+}
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/sps/TestBlockDispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/sps/TestBlockDispatcher.java
new file mode 100644
index 00000000000..ee553de27d6
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/sps/TestBlockDispatcher.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common.sps;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.SocketAddress;
+import java.net.Socket;
+import java.nio.channels.SocketChannel;
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import 
org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/**
+ * Test BlockDispatcher class.
+ */
+@Timeout(60)
+public class TestBlockDispatcher {
+
+  /**
+   * Verify that when InvalidEncryptionKeyException is encountered during
+   * block move, the dispatcher clears the cached data encryption key before
+   * retry.
+   */
+  @Test
+  public void testClearEncryptionKeyOnRetry() throws Exception {
+    DatanodeInfo target =
+        DFSTestUtil.getDatanodeInfo("127.0.0.1", "localhost", 1);
+    DatanodeInfo source =
+        DFSTestUtil.getDatanodeInfo("127.0.0.1", "localhost", 2);
+
+    BlockMovingInfo blkMovingInfo = new BlockMovingInfo(
+        new Block(1, 100, 1001),
+        source, target,
+        StorageType.DISK, StorageType.ARCHIVE);
+
+    InvalidKeySaslClient saslClient = new InvalidKeySaslClient();
+    CountingKeyFactory km = new CountingKeyFactory();
+    ExtendedBlock eb = new ExtendedBlock("bp-1", 1, 100, 1001);
+    Token<BlockTokenIdentifier> accessToken = new Token<>();
+
+    // Use small socketTimeout (100ms) to keep test fast.
+    BlockDispatcher dispatcher = new BlockDispatcher(100, 1024, false) {
+      @Override
+      Socket newSocket() {
+        return new FakeSocket();
+      }
+    };
+
+    assertThrows(InvalidEncryptionKeyException.class,
+        () -> dispatcher.moveBlock(blkMovingInfo, saslClient, eb,
+            new FakeSocket(), km, accessToken));
+
+    assertEquals(1, km.clearCount);
+    assertEquals(2, saslClient.socketSendCount);
+  }
+
+  private static final class InvalidKeySaslClient
+      extends SaslDataTransferClient {
+    private int socketSendCount;
+
+    private InvalidKeySaslClient() {
+      super(null, null, null);
+    }
+
+    @Override
+    public IOStreamPair socketSend(Socket socket, OutputStream underlyingOut,
+        InputStream underlyingIn, DataEncryptionKeyFactory 
encryptionKeyFactory,
+        Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
+        throws InvalidEncryptionKeyException {
+      socketSendCount++;
+      throw new InvalidEncryptionKeyException("test: encryption key expired");
+    }
+  }
+
+  private static final class CountingKeyFactory
+      implements DataEncryptionKeyFactory {
+    private int clearCount;
+
+    @Override
+    public DataEncryptionKey newDataEncryptionKey() {
+      return null;
+    }
+
+    @Override
+    public void clearDataEncryptionKey() {
+      clearCount++;
+    }
+  }
+
+  private static final class FakeSocket extends Socket {
+    private final ByteArrayOutputStream out = new ByteArrayOutputStream();
+    private final ByteArrayInputStream in =
+        new ByteArrayInputStream(new byte[0]);
+
+    @Override
+    public void connect(SocketAddress endpoint, int timeout) {
+    }
+
+    @Override
+    public SocketChannel getChannel() {
+      return null;
+    }
+
+    @Override
+    public OutputStream getOutputStream() {
+      return out;
+    }
+
+    @Override
+    public InputStream getInputStream() {
+      return in;
+    }
+
+    @Override
+    public void close() {
+    }
+  }
+}
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataTransferEncryptionKey.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataTransferEncryptionKey.java
new file mode 100644
index 00000000000..aeff6ad9776
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataTransferEncryptionKey.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.lang.reflect.Method;
+
+import 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/**
+ * Test DataNode.DataTransfer handling of InvalidEncryptionKeyException.
+ */
+@Timeout(60)
+public class TestDataTransferEncryptionKey {
+
+  /**
+   * Verify that DataTransfer clears the cached data encryption key before
+   * retrying InvalidEncryptionKeyException.
+   */
+  @Test
+  public void testClearEncryptionKeyOnRetry() throws Exception {
+    CountingKeyFactory keyFactory = new CountingKeyFactory();
+
+    assertTrue(prepareRetryAfterInvalidEncryptionKey(keyFactory, 1));
+    assertEquals(1, keyFactory.clearCount);
+
+    assertFalse(prepareRetryAfterInvalidEncryptionKey(keyFactory, 2));
+    assertEquals(1, keyFactory.clearCount);
+  }
+
+  private static boolean prepareRetryAfterInvalidEncryptionKey(
+      DataEncryptionKeyFactory keyFactory, int retryCount) throws Exception {
+    Method method = DataNode.class.getDeclaredMethod(
+        "prepareRetryAfterInvalidEncryptionKey",
+        DataEncryptionKeyFactory.class, int.class);
+    method.setAccessible(true);
+    return (boolean) method.invoke(null, keyFactory, retryCount);
+  }
+
+  private static final class CountingKeyFactory
+      implements DataEncryptionKeyFactory {
+    private int clearCount;
+
+    @Override
+    public DataEncryptionKey newDataEncryptionKey() {
+      return null;
+    }
+
+    @Override
+    public void clearDataEncryptionKey() {
+      clearCount++;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to