This is an automated email from the ASF dual-hosted git repository. simbadzina pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new f1e2ceb823e9 HDFS-13603: Do not propagate ExecutionException while initializing EDEK queues for keys. (#6860) f1e2ceb823e9 is described below commit f1e2ceb823e92ce864f7f2f327c4c0af722b4d85 Author: Yu Zhang <yzhan559...@gmail.com> AuthorDate: Mon Jun 3 09:10:06 2024 -0700 HDFS-13603: Do not propagate ExecutionException while initializing EDEK queues for keys. (#6860) --- .../hadoop/crypto/key/kms/KMSClientProvider.java | 6 +-- .../apache/hadoop/crypto/key/kms/ValueQueue.java | 20 ++++++++-- .../apache/hadoop/crypto/key/TestValueQueue.java | 43 ++++++++++++++++++++++ ...agerKeyGeneratorKeyProviderCryptoExtension.java | 9 +---- 4 files changed, 62 insertions(+), 16 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java index f0c912224f90..6ee9068ea345 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java @@ -947,11 +947,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, @Override public void warmUpEncryptedKeys(String... keyNames) throws IOException { - try { - encKeyVersionQueue.initializeQueuesForKeys(keyNames); - } catch (ExecutionException e) { - throw new IOException(e); - } + encKeyVersionQueue.initializeQueuesForKeys(keyNames); } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/ValueQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/ValueQueue.java index 58ce443146df..cbf419356343 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/ValueQueue.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/ValueQueue.java @@ -269,12 +269,24 @@ public class ValueQueue <E> { * Initializes the Value Queues for the provided keys by calling the * fill Method with "numInitValues" values * @param keyNames Array of key Names - * @throws ExecutionException executionException. + * @throws IOException if initialization fails for any provided keys */ - public void initializeQueuesForKeys(String... keyNames) - throws ExecutionException { + public void initializeQueuesForKeys(String... keyNames) throws IOException { + int successfulInitializations = 0; + ExecutionException lastException = null; + for (String keyName : keyNames) { - keyQueues.get(keyName); + try { + keyQueues.get(keyName); + successfulInitializations++; + } catch (ExecutionException e) { + lastException = e; + } + } + + if (keyNames.length > 0 && successfulInitializations != keyNames.length) { + throw new IOException(String.format("Failed to initialize %s queues for the provided keys.", + keyNames.length - successfulInitializations), lastException); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestValueQueue.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestValueQueue.java index 4805fca1d49f..6bf76b6e505f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestValueQueue.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestValueQueue.java @@ -21,19 +21,27 @@ import java.io.IOException; import java.util.Arrays; import java.util.HashSet; import java.util.Queue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.hadoop.crypto.key.kms.ValueQueue; import org.apache.hadoop.crypto.key.kms.ValueQueue.QueueRefiller; import org.apache.hadoop.crypto.key.kms.ValueQueue.SyncGenerationPolicy; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache; import org.junit.Assert; import org.junit.Test; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.spy; + public class TestValueQueue { Logger LOG = LoggerFactory.getLogger(TestValueQueue.class); @@ -111,6 +119,41 @@ public class TestValueQueue { vq.shutdown(); } + /** + * Verifies that Queue is initialized (Warmed-up) for partial keys. + */ + @Test(timeout = 30000) + public void testPartialWarmUp() throws Exception { + MockFiller filler = new MockFiller(); + ValueQueue<String> vq = + new ValueQueue<>(10, 0.5f, 30000, 1, + SyncGenerationPolicy.ALL, filler); + + @SuppressWarnings("unchecked") + LoadingCache<String, LinkedBlockingQueue<KeyProviderCryptoExtension.EncryptedKeyVersion>> kq = + (LoadingCache<String, LinkedBlockingQueue<KeyProviderCryptoExtension.EncryptedKeyVersion>>) + FieldUtils.getField(ValueQueue.class, "keyQueues", true).get(vq); + + LoadingCache<String, LinkedBlockingQueue<KeyProviderCryptoExtension.EncryptedKeyVersion>> + kqSpy = spy(kq); + doThrow(new ExecutionException(new Exception())).when(kqSpy).get("k2"); + FieldUtils.writeField(vq, "keyQueues", kqSpy, true); + + Assert.assertThrows(IOException.class, () -> vq.initializeQueuesForKeys("k1", "k2", "k3")); + verify(kqSpy, times(1)).get("k2"); + + FillInfo[] fillInfos = + {filler.getTop(), filler.getTop(), filler.getTop()}; + Assert.assertEquals(5, fillInfos[0].num); + Assert.assertEquals(5, fillInfos[1].num); + Assert.assertNull(fillInfos[2]); + + Assert.assertEquals(new HashSet<>(Arrays.asList("k1", "k3")), + new HashSet<>(Arrays.asList(fillInfos[0].key, + fillInfos[1].key))); + vq.shutdown(); + } + /** * Verifies that the refill task is executed after "checkInterval" if * num values below "lowWatermark" diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/EagerKeyGeneratorKeyProviderCryptoExtension.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/EagerKeyGeneratorKeyProviderCryptoExtension.java index 273c6733610d..bc9e6d7a9098 100644 --- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/EagerKeyGeneratorKeyProviderCryptoExtension.java +++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/EagerKeyGeneratorKeyProviderCryptoExtension.java @@ -104,13 +104,8 @@ public class EagerKeyGeneratorKeyProviderCryptoExtension } @Override - public void warmUpEncryptedKeys(String... keyNames) throws - IOException { - try { - encKeyVersionQueue.initializeQueuesForKeys(keyNames); - } catch (ExecutionException e) { - throw new IOException(e); - } + public void warmUpEncryptedKeys(String... keyNames) throws IOException { + encKeyVersionQueue.initializeQueuesForKeys(keyNames); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org