This is an automated email from the ASF dual-hosted git repository.

broustant pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/main by this push:
     new b377848  Add KmsKeySupplier that calls a KmsClient to decrypt 
encryption keys. (#108)
b377848 is described below

commit b3778486c5073e122fe4aa6c7710528701d965fc
Author: Bruno Roustant <[email protected]>
AuthorDate: Mon Jul 1 15:29:42 2024 +0200

    Add KmsKeySupplier that calls a KmsClient to decrypt encryption keys. (#108)
---
 ENCRYPTION.md                                      |  47 ++-
 encryption/build.gradle                            |   9 +-
 .../encryption/EncryptionDirectoryFactory.java     |   9 +
 .../apache/solr/encryption/crypto/AesCtrUtil.java  |   2 +-
 .../org/apache/solr/encryption/kms/KmsClient.java  |  35 ++
 .../kms/KmsEncryptionRequestHandler.java           |  51 +++
 .../apache/solr/encryption/kms/KmsKeyCache.java    |  93 +++++
 .../apache/solr/encryption/kms/KmsKeySupplier.java | 393 +++++++++++++++++++++
 .../org/apache/solr/encryption/kms/KmsMetrics.java |  50 +++
 .../encryption/EncryptionBackupRepositoryTest.java |  18 +-
 .../solr/encryption/EncryptionHeavyLoadTest.java   |   6 +-
 .../encryption/EncryptionRequestHandlerTest.java   |   8 +-
 .../apache/solr/encryption/EncryptionTestUtil.java |  41 ++-
 .../EncryptionUpdateHandlerSplitTest.java          |   2 +-
 .../solr/encryption/EncryptionUpdateLogTest.java   |   2 +-
 .../apache/solr/encryption/TestingKeySupplier.java |  65 +---
 .../kms/KmsEncryptionRequestHandlerTest.java       |  51 +++
 .../solr/encryption/kms/KmsKeySupplierTest.java    | 341 ++++++++++++++++++
 .../solr/encryption/kms/SpyingKmsMetrics.java      |  32 ++
 .../solr/encryption/kms/TestingKmsClient.java      |  60 ++++
 .../solr/encryption/matcher/EventuallyMatcher.java |  66 ++++
 .../src/test/resources/configs/kms/schema.xml      |  35 ++
 .../src/test/resources/configs/kms/solrconfig.xml  |  82 +++++
 23 files changed, 1413 insertions(+), 85 deletions(-)

diff --git a/ENCRYPTION.md b/ENCRYPTION.md
index d8575ec..4bb948a 100644
--- a/ENCRYPTION.md
+++ b/ENCRYPTION.md
@@ -91,13 +91,15 @@ the specified folder.
 
 `keySupplierFactory` is a required parameter to specify your implementation of
 `org.apache.solr.encryption.KeySupplier.Factory`. This class is used to define 
your `KeySupplier`.
+You may use here the `org.apache.solr.encryption.kms.KmsKeySupplier` with your 
implementation of the
+`org.apache.solr.encryption.kms.KmsClient`. See more details in the 
`KmsKeySupplier` section below.
 
 `encrypterFactory` is an optional parameter to specify the 
`org.apache.solr.encryption.crypto.AesCtrEncrypterFactory`
 to use. By default `CipherAesCtrEncrypter$Factory` is used. You can change to 
`LightAesCtrEncrypter$Factory` for a
 more lightweight and efficient implementation (+10% perf), but it calls an 
internal com.sun.crypto.provider.AESCrypt()
 constructor which either logs a JDK warning (Illegal reflective access) with 
JDK 16 and below, or with JDK 17 and above
 requires to open the access to the com.sun.crypto.provider package with the 
jvm arg
-`--add-opens=java.base/com.sun.crypto.provider=ALL-UNNAMED`.
+`--add-opens=java.base/com.sun.crypto.provider=ALL-UNNAMED`. Both support 
encrypting files up to 17 TB.
 
 `EncryptionUpdateHandler` replaces the standard `DirectUpdateHandler2` (which 
it extends) to store persistently the
 encryption key id in the commit metadata. It supports all the configuration 
parameters of `DirectUpdateHandler2`.
@@ -105,7 +107,8 @@ encryption key id in the commit metadata. It supports all 
the configuration para
 `EncryptionUpdateLog` replaces the standard `UpdateLog` (which it extends) to 
support the encryption of the update
 logs.
 
-`EncryptionRequestHandler` receives (re)encryption requests. See its dedicated 
section below for its usage.
+`EncryptionRequestHandler` receives (re)encryption requests. See its dedicated 
`EncryptionRequestHandler` section below
+for its usage.
 
 `EncryptionMergePolicyFactory` is a wrapper above a delegate 
MergePolicyFactory (e.g. the standard
 `TieredMergePolicyFactory`) to ensure all index segments are re-written 
(re-encrypted).
@@ -113,6 +116,36 @@ logs.
 `EncryptionBackupRepository` ensures the encrypted files are copied encrypted 
to a delegate `BackupRepository`,
 but still verifies their checksum before the copy. It requires that you define 
a delegate `BackupRepository`
 
+## Getting keys from a Key Management System with KmsKeySupplier
+
+If you have a Key Management System to manage the encryption keys lifecycle, 
then you can use the
+`org.apache.solr.encryption.kms.KmsKeySupplier`. In this case, it requires 
that the Solr client sends some key blob
+to the `EncryptionRequestHandler` in addition to the key id. The key blob 
contains an encrypted form of the key secret
+and enough data for your KMS to decrypt it and provide the clear-text key 
secret. The key blob is stored in the
+metadata of each index file. And when needed, the `KmsKeySupplier` calls your 
KMS with your `KmsClient` to decrypt the
+key blob and store the key secret in an in-memory key cache with automatic 
wiping of the cache entries after some short
+duration.
+
+`KmsKeySupplier` requires to define `KmsEncryptionRequestHandler` as the 
`EncryptionRequestHandler`. It requires
+the parameters `tenantId` and `encryptionKeyBlob` to be sent in the 
`SolrQueryRequest` when calling
+`KmsEncryptionRequestHandler`.
+
+*solrconfig.xml*
+
+```xml
+<config>
+
+    <directoryFactory name="DirectoryFactory"
+                      
class="org.apache.solr.encryption.EncryptionDirectoryFactory">
+        <str 
name="keySupplierFactory">org.apache.solr.encryption.kms.KmsKeySupplier$Factory</str>
+        <str name="kmsClientFactory">com.yourApp.YourKmsClient$Factory</str>
+    </directoryFactory>
+
+    <requestHandler name="/admin/encrypt" 
class="org.apache.solr.encryption.kms.KmsEncryptionRequestHandler"/>
+
+</config>
+```
+
 ## Calling EncryptionRequestHandler
 
 Once Solr is set up, it is ready to encrypt. To set the encryption key id to 
use, the Solr client calls the
@@ -159,10 +192,10 @@ chosen because it is simpler.
 
 The performance benchmark was run in LUCENE-9379. Here is the summary:
 
-- An OS-level encryption is better and faster.
-- If really it’s not possible, expect an average of -20% perf impact on most 
queries, -60% on multi-term queries.
+- An OS-level encryption is faster.
+- Otherwise, expect an average of -20% perf impact on most queries, -60% on 
multi-term queries.
 - You can use the `LightAesCtrEncrypter$Factory` to get +10% perf. This is a 
simple config change. See the
-solrconfig.xml configuration section above.
+`solrconfig.xml` configuration section above.
 - You can make the Lucene Codec store its FST on heap and expect +15% perf, at 
the price of more Java heap usage. This
 requires a code change. See `org.apache.lucene.util.fst.FSTStore` 
implementations and usage in
 `org.apache.lucene.codecs.lucene90.blocktree.FieldReader`.
@@ -171,5 +204,5 @@ requires a code change. See 
`org.apache.lucene.util.fst.FSTStore` implementation
 
 The `org.apache.solr.encryption.crypto` package contains utility classes to 
stream encryption/decryption with the
 `AES/CTR/NoPadding` transformation.
-`CharStreamEncrypter` can encrypt a character stream to a base 64 encoding 
compatible with JSON, with a small
-buffer.
\ No newline at end of file
+`CharStreamEncrypter` can encrypt a character stream to a base 64 encoding 
compatible with JSON, and requires only a
+small work buffer.
\ No newline at end of file
diff --git a/encryption/build.gradle b/encryption/build.gradle
index 3247b32..336862a 100644
--- a/encryption/build.gradle
+++ b/encryption/build.gradle
@@ -37,8 +37,13 @@ dependencies {
     implementation 'org.apache.lucene:lucene-core:9.10.0'
     implementation 'com.google.code.findbugs:jsr305:3.0.2'
 
-    // commons-io and commons-codec are only required by the tool class
-    // CharStreamEncrypter, which is not used for the index encryption.
+    // Optional, used by the KmsKeySupplier.
+    implementation 'com.github.ben-manes.caffeine:caffeine:3.1.8'
+    implementation 'io.opentracing:opentracing-util:0.33.0'
+
+    // Optional, commons-io and commons-codec are only required by the
+    // tool class CharStreamEncrypter, which is not used for the index
+    // encryption.
     implementation 'commons-io:commons-io:2.11.0'
     implementation 'commons-codec:commons-codec:1.16.0'
 
diff --git 
a/encryption/src/main/java/org/apache/solr/encryption/EncryptionDirectoryFactory.java
 
b/encryption/src/main/java/org/apache/solr/encryption/EncryptionDirectoryFactory.java
index 102005f..ab88bef 100644
--- 
a/encryption/src/main/java/org/apache/solr/encryption/EncryptionDirectoryFactory.java
+++ 
b/encryption/src/main/java/org/apache/solr/encryption/EncryptionDirectoryFactory.java
@@ -57,7 +57,16 @@ public class EncryptionDirectoryFactory extends 
MMapDirectoryFactory {
   //  Right now, EncryptionDirectoryFactory extends MMapDirectoryFactory. And 
we hope we will
   //  refactor later.
 
+  /**
+   * Required Solr config parameter to define the {@link KeySupplier.Factory} 
class used
+   * to create the {@link KeySupplier}.
+   */
   public static final String PARAM_KEY_SUPPLIER_FACTORY = "keySupplierFactory";
+  /**
+   * Optional Solr config parameter to set the {@link AesCtrEncrypterFactory} 
class used
+   * to create the {@link org.apache.solr.encryption.crypto.AesCtrEncrypter}.
+   * The default is {@link CipherAesCtrEncrypter}.
+   */
   public static final String PARAM_ENCRYPTER_FACTORY = "encrypterFactory";
   /**
    * Visible for tests only - Property defining the class name of the inner 
encryption directory factory.
diff --git 
a/encryption/src/main/java/org/apache/solr/encryption/crypto/AesCtrUtil.java 
b/encryption/src/main/java/org/apache/solr/encryption/crypto/AesCtrUtil.java
index 63c397a..03f6d47 100644
--- a/encryption/src/main/java/org/apache/solr/encryption/crypto/AesCtrUtil.java
+++ b/encryption/src/main/java/org/apache/solr/encryption/crypto/AesCtrUtil.java
@@ -44,7 +44,7 @@ public class AesCtrUtil {
   public static boolean checkAesKey(byte[] key) {
     if (key.length != 16 && key.length != 24 && key.length != 32) {
       // AES requires either 128, 192 or 256 bits keys.
-      throw new IllegalArgumentException("Invalid AES key length; it must be 
either 128, 192 or 256 bits long");
+      throw new IllegalArgumentException("Invalid AES key length " + 
key.length + "; it must be either 128, 192 or 256 bits long");
     }
     return true;
   }
diff --git 
a/encryption/src/main/java/org/apache/solr/encryption/kms/KmsClient.java 
b/encryption/src/main/java/org/apache/solr/encryption/kms/KmsClient.java
new file mode 100644
index 0000000..d39e6d0
--- /dev/null
+++ b/encryption/src/main/java/org/apache/solr/encryption/kms/KmsClient.java
@@ -0,0 +1,35 @@
+package org.apache.solr.encryption.kms;
+
+import org.apache.solr.common.util.NamedList;
+
+import java.io.Closeable;
+
+/**
+ * Client that calls the Key Management System.
+ */
+public interface KmsClient extends Closeable {
+
+    /**
+     * Decrypts the key blob (ciphered form of the key) to get the clear-text 
key secret bytes.
+     * @param keyId The key id passed as parameter to the {@link 
KmsEncryptionRequestHandler}.
+     * @param keyBlob The key blob passed as parameter to the {@link 
KmsEncryptionRequestHandler}.
+     *               It contains the ciphered key to decrypt, and may contain 
other data.
+     * @param tenantId The identifier of the tenant owning the key.
+     * @param requestId A request id for tracing/log purposes.
+     * @return The clear-text key secret bytes.
+     */
+    byte[] decrypt(String keyId, String keyBlob, String tenantId, String 
requestId) throws Exception;
+
+    /**
+     * Creates a {@link KmsClient}.
+     * Only one {@link KmsClient} singleton is created.
+     */
+    interface Factory {
+
+        /**
+         * Creates a {@link KmsClient}.
+         * @param args The Solr config parameters of the {@code 
directoryFactory} section.
+         */
+        KmsClient create(NamedList<?> args) throws Exception;
+    }
+}
diff --git 
a/encryption/src/main/java/org/apache/solr/encryption/kms/KmsEncryptionRequestHandler.java
 
b/encryption/src/main/java/org/apache/solr/encryption/kms/KmsEncryptionRequestHandler.java
new file mode 100644
index 0000000..65f5989
--- /dev/null
+++ 
b/encryption/src/main/java/org/apache/solr/encryption/kms/KmsEncryptionRequestHandler.java
@@ -0,0 +1,51 @@
+package org.apache.solr.encryption.kms;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.encryption.EncryptionRequestHandler;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+
+import java.util.Map;
+
+/**
+ * Extension of {@link EncryptionRequestHandler} that gets required parameters
+ * {@link #PARAM_TENANT_ID} and {@link #PARAM_ENCRYPTION_KEY_BLOB} from the
+ * {@link SolrQueryRequest} to pass later to the {@link KmsKeySupplier}.
+ */
+public class KmsEncryptionRequestHandler extends EncryptionRequestHandler {
+
+    /**
+     * Tenant Id request parameter - required.
+     */
+    public static final String PARAM_TENANT_ID = "tenantId";
+    /**
+     * Data Key Blob request parameter - required.
+     */
+    public static final String PARAM_ENCRYPTION_KEY_BLOB = "encryptionKeyBlob";
+
+    /**
+     *  Builds the KMS key cookie based on key id and key blob parameters of 
the request.
+     *  If a required parameter is missing, this method throws a {@link 
SolrException} with
+     *  {@link SolrException.ErrorCode#BAD_REQUEST} and sets the response 
status to failure.
+     */
+    @Override
+    protected Map<String, String> buildKeyCookie(String keyId,
+                                                 SolrQueryRequest req,
+                                                 SolrQueryResponse rsp) {
+        String tenantId = getRequiredRequestParam(req, PARAM_TENANT_ID, rsp);
+        String encryptionKeyBlob = getRequiredRequestParam(req, 
PARAM_ENCRYPTION_KEY_BLOB, rsp);
+        return Map.of(
+                PARAM_TENANT_ID, tenantId,
+                PARAM_ENCRYPTION_KEY_BLOB, encryptionKeyBlob
+        );
+    }
+
+    private String getRequiredRequestParam(SolrQueryRequest req, String param, 
SolrQueryResponse rsp) {
+        String arg = req.getParams().get(param);
+        if (arg == null || arg.isEmpty()) {
+            rsp.add(STATUS, STATUS_FAILURE);
+            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, 
"Required parameter " + param + " must be present and not empty.");
+        }
+        return arg;
+    }
+}
diff --git 
a/encryption/src/main/java/org/apache/solr/encryption/kms/KmsKeyCache.java 
b/encryption/src/main/java/org/apache/solr/encryption/kms/KmsKeyCache.java
new file mode 100644
index 0000000..aaf8e2f
--- /dev/null
+++ b/encryption/src/main/java/org/apache/solr/encryption/kms/KmsKeyCache.java
@@ -0,0 +1,93 @@
+package org.apache.solr.encryption.kms;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.github.benmanes.caffeine.cache.Ticker;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * Cache of encryption keys. Keys are key ids and values are clear-text key 
secrets.
+ * Entries are wiped automatically after a time-based expiration with a 
dedicated
+ * thread which is stopped when {@link #close()} is called.
+ */
+public class KmsKeyCache implements Closeable {
+
+    /**
+     * Expected number of keys in the cache.
+     * Internally, the Caffeine cache stores entries in a ConcurrentHashMap, 
which computes
+     * the power-of-2 table size with (initialCapacity / loadFactor) + 1.
+     */
+    private static final int INITIAL_CAPACITY = 30;
+
+    private final ScheduledExecutorService executorService;
+    private final Cache<String, byte[]> keySecretCache;
+
+    /**
+     * @param cacheExpiration Each entry in the cache expires after this 
duration, and it is wiped.
+     * @param ticker Ticker to use to get the wall clock time.
+     */
+    protected KmsKeyCache(Duration cacheExpiration, Ticker ticker) {
+        executorService = Executors.newScheduledThreadPool(
+                1,
+                new DaemonThreadFactory("EncryptionKeyCache"));
+        // We use a Caffeine cache not to limit the cache size and optimize 
the removal policy,
+        // but rather to leverage the scheduled removal of expired entries, 
with a listener to
+        // wipe the key secrets from the memory when they are removed.
+        keySecretCache = Caffeine.newBuilder()
+                // No maximum size. There will be one entry per active 
encryption key per
+                // collection. We don't expect the size to go very high.
+                .initialCapacity(INITIAL_CAPACITY)
+                // Evict keys from the cache after expiration.
+                .expireAfterWrite(cacheExpiration)
+                .executor(executorService)
+                .ticker(ticker)
+                // Ensure time-based expiration by using a scheduler thread.
+                
.scheduler(Scheduler.forScheduledExecutorService(executorService))
+                // Wipe the key secret bytes in memory when it is evicted from 
the cache.
+                .removalListener(createCacheRemovalListener())
+                .build();
+    }
+
+    protected RemovalListener<String, byte[]> createCacheRemovalListener() {
+        return (keyId, secret, removalCause) -> {
+            if (secret != null) {
+                // Wipe the key secret bytes when an entry is removed, to 
guarantee
+                // the key secret is not in memory anymore.
+                Arrays.fill(secret, (byte) 0);
+            }};
+    }
+
+    /**
+     * @return The backing {@link Cache}.
+     */
+    protected Cache<String, byte[]> getCache() {
+        return keySecretCache;
+    }
+
+    @Override
+    public void close() {
+        executorService.shutdownNow();
+    }
+
+    private static class DaemonThreadFactory extends SolrNamedThreadFactory {
+
+        DaemonThreadFactory(String namePrefix) {
+            super(namePrefix);
+        }
+
+        @Override
+        public Thread newThread(Runnable r) {
+            Thread t = super.newThread(r);
+            t.setDaemon(true);
+            return t;
+        }
+    }
+}
diff --git 
a/encryption/src/main/java/org/apache/solr/encryption/kms/KmsKeySupplier.java 
b/encryption/src/main/java/org/apache/solr/encryption/kms/KmsKeySupplier.java
new file mode 100644
index 0000000..859fb47
--- /dev/null
+++ 
b/encryption/src/main/java/org/apache/solr/encryption/kms/KmsKeySupplier.java
@@ -0,0 +1,393 @@
+package org.apache.solr.encryption.kms;
+
+import com.github.benmanes.caffeine.cache.Ticker;
+
+import io.opentracing.util.GlobalTracer;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.encryption.KeySupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.lang.invoke.MethodHandles;
+import java.time.Duration;
+import java.util.Base64;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.solr.encryption.kms.KmsEncryptionRequestHandler.PARAM_ENCRYPTION_KEY_BLOB;
+import static 
org.apache.solr.encryption.kms.KmsEncryptionRequestHandler.PARAM_TENANT_ID;
+
+/**
+ * Supplies keys by decrypting them with a Key Management System (KMS).
+ * The keys are stored in their encrypted form in the index. The encrypted 
form is sent to KMS to decrypt it
+ * and get the clear-text key secret. And the key secrets are cached in memory 
during a configurable short
+ * duration, to avoid calling too often the KMS.
+ * <p>
+ * Thread safe.
+ */
+public class KmsKeySupplier implements KeySupplier {
+
+    private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+    /**
+     * Min delay between a failed call to KMS decrypt and the next attempt for 
the
+     * same key, in ns.
+     */
+    public static final long FAILED_DECRYPT_DELAY_NS = 
TimeUnit.SECONDS.toNanos(10);
+
+    /** Log when the KMS decrypt calls takes more time than this duration 
threshold, in ns. */
+    private static final long KEY_DECRYPT_DURATION_THRESHOLD_NS = 
TimeUnit.SECONDS.toNanos(3);
+
+    /**
+     * File name extensions/suffixes that do NOT need to be encrypted because 
it lacks user/external data.
+     * Other files should be encrypted.
+     * There is some human judgement here as some files may contain vague 
clues as to the shape of the data.
+     */
+    private static final Set<String> CLEARTEXT_EXTENSIONS = Set.of(
+            "doc",    // Document number, frequencies, and skip data
+            "pos",    // Positions
+            "pay",    // Payloads and offsets
+            "dvm",    // Doc values metadata
+            "fdm",    // Stored fields metadata
+            "fdx",    // Stored fields index
+            "nvd",    // Norms data
+            "nvm",    // Norms metadata
+            "vem",    // Vector metadata
+            "fnm",    // Field Infos
+            "si",     // Segment Infos
+            "cfe"     // Compound file entries
+    );
+    // Extensions known to contain sensitive user data, and thus that need to 
be encrypted:
+    // tip    - BlockTree terms index (FST)
+    // tim    - BlockTree terms
+    // tmd    - BlockTree metadata (contains first and last term)
+    // fdt    - Stored fields data
+    // dvd    - Doc values data
+    // vex    - Vector index
+    // cfs    - Compound file (contains all the above files data)
+
+    // Cleartext temporary files:
+    private static final String TMP_EXTENSION = "tmp";
+    private static final String TMP_DOC_IDS = "-doc_ids"; // FieldsIndexWriter
+    private static final String TMP_FILE_POINTERS = "file_pointers"; // 
FieldsIndexWriter
+
+    private static final Base64.Encoder ID_ENCODER = 
Base64.getEncoder().withoutPadding();
+
+    private final KmsClient kmsClient;
+    private final KmsKeyCache kmsKeyCache;
+    private final ConcurrentMap<String, Long> nextDecryptTimeCache;
+    private final KmsMetrics kmsMetrics;
+    private final Ticker ticker;
+
+    protected KmsKeySupplier(
+            KmsClient kmsClient,
+            KmsKeyCache kmsKeyCache,
+            ConcurrentMap<String, Long> nextDecryptTimeCache,
+            KmsMetrics kmsMetrics,
+            Ticker ticker) {
+        this.kmsClient = kmsClient;
+        this.kmsKeyCache = kmsKeyCache;
+        this.nextDecryptTimeCache = nextDecryptTimeCache;
+        this.kmsMetrics = kmsMetrics;
+        this.ticker = ticker;
+    }
+
+    @Override
+    public boolean shouldEncrypt(String fileName) {
+        return shouldEncryptFile(fileName);
+    }
+
+    public static boolean shouldEncryptFile(String fileName) {
+        String extension = IndexFileNames.getExtension(fileName);
+        if (extension == null) {
+            // segments and pending_segments are never passed as parameter of 
this method.
+            assert !fileName.startsWith(IndexFileNames.SEGMENTS) && 
!fileName.startsWith(IndexFileNames.PENDING_SEGMENTS);
+        } else if (CLEARTEXT_EXTENSIONS.contains(extension)) {
+            // The file extension tells us it does not need to be encrypted.
+            return false;
+        } else if (extension.equals(TMP_EXTENSION)) {
+            // We know some tmp files do not need to be encrypted.
+            int tmpCounterIndex = fileName.lastIndexOf('_');
+            assert tmpCounterIndex != -1;
+            return !endsWith(fileName, TMP_DOC_IDS, tmpCounterIndex)
+                    && !endsWith(fileName, TMP_FILE_POINTERS, tmpCounterIndex);
+        }
+        // By default, all other files should be encrypted.
+        return true;
+    }
+
+    private static boolean endsWith(String s, String suffix, int endIndex) {
+        // Inspired from JDK String where endsWith calls startsWith.
+        // Here we look for [suffix] from index [endIndex - suffix.length()].
+        // This is equivalent to
+        // s.substring(0, endIndex).endsWith(suffix)
+        // without creating a substring.
+        return s.startsWith(suffix, endIndex - suffix.length());
+    }
+
+    @Nullable
+    @Override
+    public Map<String, String> getKeyCookie(String keyId, Map<String, String> 
params) throws IOException {
+        // With this KeySupplier implementation, the key cookie (encrypted 
form) is expected to be passed
+        // to the EncryptionRequestHandler.
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public byte[] getKeySecret(String keyId, Function<String, Map<String, 
String>> cookieSupplier) throws IOException {
+        // This method must be thread safe because there is only one instance 
of KeySupplier
+        // per EncryptionDirectoryFactory.
+        try {
+            if (keyId == null) {
+                // Short-circuit the key cache.
+                return decryptOrDelay(null, cookieSupplier);
+            }
+            return kmsKeyCache.getCache().get(keyId, k -> decryptOrDelay(k, 
cookieSupplier));
+        } catch (UncheckedIOException e) {
+            throw e.getCause();
+        }
+    }
+
+    private byte[] decryptOrDelay(String keyId, Function<String, Map<String, 
String>> cookieSupplier) {
+        // Check if a call to decrypt the same key failed recently.
+        String decryptTimeKeyId = keyId == null ?
+                // If the key id is null, use this key blob string instead.
+                // This way we can still use the anti-flood mechanism.
+                cookieSupplier.apply(null).get(PARAM_ENCRYPTION_KEY_BLOB)
+                : keyId;
+        Long nextTimeNs = nextDecryptTimeCache.get(decryptTimeKeyId);
+        if (nextTimeNs != null && ticker.read() < nextTimeNs) {
+            // The key decryption failed very recently.
+            // Do not flood-call KMS, fail with a delay exception.
+            throw new 
SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+                    "Delaying the next attempt to decrypt key " + 
decryptTimeKeyId);
+        }
+        try {
+            // Call KMS to decrypt the key.
+            byte[] keySecret = decrypt(keyId, cookieSupplier);
+            if (nextTimeNs != null) {
+                nextDecryptTimeCache.remove(decryptTimeKeyId);
+            }
+            return keySecret;
+        } catch (RuntimeException e) {
+            // The call to KMS failed for some reason (network, 
unavailability, certificate, key id).
+            // Record the next time to attempt the same call.
+            nextDecryptTimeCache.put(decryptTimeKeyId, ticker.read() + 
FAILED_DECRYPT_DELAY_NS);
+            throw e;
+        }
+    }
+
+    private byte[] decrypt(String keyId, Function<String, Map<String, String>> 
cookieSupplier) {
+        boolean success = false;
+        try {
+            kmsMetrics.incKeyDecrypt();
+            Map<String, String> keyCookie = cookieSupplier.apply(keyId);
+            long startTimeNs = ticker.read();
+            byte[] keySecret = kmsClient.decrypt(
+                    keyId,
+                    keyCookie.get(PARAM_ENCRYPTION_KEY_BLOB),
+                    keyCookie.get(PARAM_TENANT_ID),
+                    getRequestId());
+            long timeEndNs;
+            if ((timeEndNs = ticker.read()) - startTimeNs >= 
KEY_DECRYPT_DURATION_THRESHOLD_NS) {
+                kmsMetrics.incSlowKeyDecrypt();
+                log.warn("decrypt took {} ms", 
TimeUnit.NANOSECONDS.toMillis(timeEndNs - startTimeNs));
+            }
+            success = true;
+            return keySecret;
+        } catch (RuntimeException e) {
+            log.error("failed to decrypt DEK", e);
+            throw e;
+        } catch (Exception e) {
+            log.error("failed to decrypt DEK", e);
+            throw new RuntimeException(e);
+        } finally {
+            if (!success) {
+                kmsMetrics.incFailedKeyDecrypt();
+            }
+        }
+    }
+
+    /**
+     * Gets the traceId of the active {@link io.opentracing.Span} if 
available, otherwise generates a randomId.
+     */
+    protected String getRequestId() {
+        String requestId;
+        if (GlobalTracer.get().activeSpan() != null) {
+            requestId = GlobalTracer.get().activeSpan().context().toTraceId();
+        } else {
+            // Generate a random ID faster than UUID's as it does not rely on 
java.security.SecureRandom.
+            byte[] bytes = new byte[16];
+            ThreadLocalRandom.current().nextBytes(bytes);
+            requestId = ID_ENCODER.encodeToString(bytes);
+        }
+        return requestId;
+    }
+
+    @Override
+    public void close() {
+        IOUtils.closeQuietly(kmsKeyCache);
+        IOUtils.closeQuietly(kmsClient);
+    }
+
+    /**
+     * Creates a singleton {@link KmsKeySupplier} calling a singleton {@link 
KmsClient}.
+     * This factory is used only once in {@link 
org.apache.solr.encryption.EncryptionDirectoryFactory}
+     * to create a single {@link KmsKeySupplier}.
+     */
+    public static class Factory implements KeySupplier.Factory {
+
+        /**
+         * Required Solr config parameter to define the {@link 
KmsClient.Factory} class used
+         * to create the {@link KmsClient} singleton.
+         */
+        public static final String PARAM_KMS_CLIENT_FACTORY = 
"kmsClientFactory";
+        /**
+         * Optional Solr config parameter to set the key cache expiration, in 
minutes.
+         * The default expiration is {@link #KEY_CACHE_EXPIRATION}.
+         */
+        public static final String PARAM_KEY_CACHE_EXPIRATION = 
"keyCache.expirationMin";
+        /** Default expiration of each key cache entry. */
+        public static final Duration KEY_CACHE_EXPIRATION = 
Duration.ofMinutes(15);
+
+        protected static final String KEY_SUPPLIER = 
KmsKeySupplier.class.getName();
+
+        private KeySupplier keySupplier;
+
+        @Override
+        public void init(NamedList<?> args, CoreContainer coreContainer) {
+            // The keySupplier is a singleton stored in the CoreContainer 
object cache.
+            // It is Closeable, so it will be closed automatically when the 
CoreContainer closes the ObjectCache.
+            keySupplier = coreContainer
+                    .getObjectCache()
+                    .computeIfAbsent(KEY_SUPPLIER, KeySupplier.class, k -> 
createKeySupplier(args, coreContainer));
+        }
+
+        private KeySupplier createKeySupplier(NamedList<?> args, CoreContainer 
coreContainer) {
+            KmsMetrics kmsMetrics = null;
+            KmsClient kmsClient = null;
+            KmsKeyCache kmsKeyCache = null;
+            boolean success = false;
+            try {
+                kmsMetrics = createEncryptionMetrics(coreContainer);
+                KmsClient.Factory kmsClientFactory = getKmsClientFactory(args, 
coreContainer);
+                kmsClient = kmsClientFactory.create(args);
+                kmsKeyCache = createKeyCache(getKeyCacheExpiration(args));
+                KeySupplier keySupplier = createKeySupplier(kmsClient, 
kmsKeyCache, kmsMetrics);
+                log.info("KmsKeySupplier singleton created");
+                success = true;
+                return keySupplier;
+            } catch (Throwable t) {
+                // If something fails during the creation of the KMS client, 
return an InvalidKeySupplier.
+                // That way, Solr can be used normally if the encryption is 
not used.
+                // But any attempt to use the InvalidKeySupplier will throw an 
exception with the root cause.
+                log.error("Failed to create the key supplier; encryption is 
not available", t);
+                return new InvalidKeySupplier(t);
+            } finally {
+                if (!success) {
+                    IOUtils.closeQuietly(kmsClient);
+                    IOUtils.closeQuietly(kmsKeyCache);
+                    if (kmsMetrics != null) {
+                        kmsMetrics.incFailedKmsInit();
+                    }
+                }
+            }
+        }
+
+        private KmsClient.Factory getKmsClientFactory(NamedList<?> args, 
CoreContainer coreContainer) {
+            String kmsClientFactoryClass = 
args._getStr(PARAM_KMS_CLIENT_FACTORY, System.getProperty("solr." + 
PARAM_KMS_CLIENT_FACTORY));
+            if (kmsClientFactoryClass == null) {
+                throw new IllegalArgumentException("Missing " + 
PARAM_KMS_CLIENT_FACTORY + " argument for " + getClass().getName());
+            }
+            return 
coreContainer.getResourceLoader().newInstance(kmsClientFactoryClass, 
KmsClient.Factory.class);
+        }
+
+        protected Duration getKeyCacheExpiration(NamedList<?> args) {
+            Duration keyCacheExpiration;
+            String expirationString = (String) 
args.get(PARAM_KEY_CACHE_EXPIRATION);
+            if (expirationString == null) {
+                keyCacheExpiration = KEY_CACHE_EXPIRATION;
+            } else {
+                try {
+                    keyCacheExpiration = 
Duration.ofMinutes(Long.parseLong(expirationString));
+                } catch (NumberFormatException e) {
+                    throw new 
SolrException(SolrException.ErrorCode.SERVER_ERROR,
+                            "invalid integer value '" + expirationString
+                                    + "' for parameter " + 
PARAM_KEY_CACHE_EXPIRATION);
+                }
+            }
+            return keyCacheExpiration;
+        }
+
+        protected KmsKeyCache createKeyCache(Duration cacheExpiration) {
+            return new KmsKeyCache(cacheExpiration, Ticker.systemTicker());
+        }
+
+        protected KmsMetrics createEncryptionMetrics(CoreContainer 
coreContainer) {
+            return new KmsMetrics(coreContainer);
+        }
+
+        protected KeySupplier createKeySupplier(
+                KmsClient kmsClient,
+                KmsKeyCache kmsKeyCache,
+                KmsMetrics kmsMetrics) {
+            return new KmsKeySupplier(
+                    kmsClient,
+                    kmsKeyCache,
+                    new ConcurrentHashMap<>(),
+                    kmsMetrics,
+                    Ticker.systemTicker());
+        }
+
+        @Override
+        public KeySupplier create() throws IOException {
+            // Return the KmsKeySupplier singleton instead of creating a new 
instance.
+            return keySupplier;
+        }
+
+        /**
+         * Provided when the construction of the {@link KmsKeySupplier} fails.
+         */
+        private static class InvalidKeySupplier implements KeySupplier {
+
+            final Throwable cause;
+
+            InvalidKeySupplier(Throwable cause) {
+                this.cause = cause;
+            }
+
+            @Override
+            public boolean shouldEncrypt(String fileName) {
+                return KmsKeySupplier.shouldEncryptFile(fileName);
+            }
+
+            @Override
+            public Map<String, String> getKeyCookie(String keyId, Map<String, 
String> params) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public byte[] getKeySecret(String keyId, Function<String, 
Map<String, String>> cookieSupplier) {
+                throw new SolrException(
+                        SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+                        "Not available as KmsKeySupplier.Factory failed to 
initialize",
+                        cause);
+            }
+
+            @Override
+            public void close() {}
+        }
+    }
+}
diff --git 
a/encryption/src/main/java/org/apache/solr/encryption/kms/KmsMetrics.java 
b/encryption/src/main/java/org/apache/solr/encryption/kms/KmsMetrics.java
new file mode 100644
index 0000000..9db86ed
--- /dev/null
+++ b/encryption/src/main/java/org/apache/solr/encryption/kms/KmsMetrics.java
@@ -0,0 +1,50 @@
+package org.apache.solr.encryption.kms;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.solr.core.CoreContainer;
+
+/**
+ * Metrics for calls to the Key Management System.
+ */
+public class KmsMetrics {
+
+    private static final String KMS_METRICS_SCOPE = "ADMIN./admin/encrypt.kms";
+
+    private static final String KMS_INIT_METRIC = KMS_METRICS_SCOPE + ".init";
+    private static final String KMS_INIT_NUM_ERRORS = KMS_INIT_METRIC + 
".numErrors";
+
+    private static final String KEY_DECRYPT_METRIC = KMS_METRICS_SCOPE + 
".decrypt";
+    private static final String KEY_DECRYPT_TOTAL = KEY_DECRYPT_METRIC + 
".total";
+    private static final String KEY_DECRYPT_NUM_ERRORS = KEY_DECRYPT_METRIC + 
".numErrors";
+    private static final String KEY_DECRYPT_NUM_SLOW = KEY_DECRYPT_METRIC + 
".numSlowOps";
+
+    protected final Counter numFailedKmsInit;
+    protected final Counter numKeyDecrypt;
+    protected final Counter numFailedKeyDecrypt;
+    protected final Counter numSlowKeyDecrypt;
+
+    public KmsMetrics(CoreContainer coreContainer) {
+        MetricRegistry metricRegistry = 
coreContainer.getMetricManager().registry("solr.node");
+        numFailedKmsInit = metricRegistry.counter(KMS_INIT_NUM_ERRORS);
+        numKeyDecrypt = metricRegistry.counter(KEY_DECRYPT_TOTAL);
+        numFailedKeyDecrypt = metricRegistry.counter(KEY_DECRYPT_NUM_ERRORS);
+        numSlowKeyDecrypt = metricRegistry.counter(KEY_DECRYPT_NUM_SLOW);
+    }
+
+    public void incFailedKmsInit() {
+        numFailedKmsInit.inc();
+    }
+
+    public void incKeyDecrypt() {
+        numKeyDecrypt.inc();
+    }
+
+    public void incFailedKeyDecrypt() {
+        numFailedKeyDecrypt.inc();
+    }
+
+    public void incSlowKeyDecrypt() {
+        numSlowKeyDecrypt.inc();
+    }
+}
diff --git 
a/encryption/src/test/java/org/apache/solr/encryption/EncryptionBackupRepositoryTest.java
 
b/encryption/src/test/java/org/apache/solr/encryption/EncryptionBackupRepositoryTest.java
index 3795ab7..8624aaa 100644
--- 
a/encryption/src/test/java/org/apache/solr/encryption/EncryptionBackupRepositoryTest.java
+++ 
b/encryption/src/test/java/org/apache/solr/encryption/EncryptionBackupRepositoryTest.java
@@ -65,7 +65,7 @@ public class EncryptionBackupRepositoryTest extends 
AbstractBackupRepositoryTest
     }
 
     @Override
-    protected URI getBaseUri() throws URISyntaxException {
+    protected URI getBaseUri() {
         return baseUri;
     }
 
@@ -112,6 +112,7 @@ public class EncryptionBackupRepositoryTest extends 
AbstractBackupRepositoryTest
                 // When we copy the encrypted file with the 
LocalFileSystemRepository,
                 // then it fails because the encrypted checksum is invalid.
                 String destinationFolder = "destination-folder";
+                deleteRepoDirIfExists(encryptionRepoName, destinationFolder, 
repoFactory, resourceLoader);
                 expectThrows(
                         CorruptIndexException.class,
                         () -> copyFileToRepo(fsSourceDir, fileName, 
localRepoName, destinationFolder, repoFactory, resourceLoader));
@@ -199,6 +200,7 @@ public class EncryptionBackupRepositoryTest extends 
AbstractBackupRepositoryTest
                 // then it succeeds because the checksum is not verified,
                 // and the file is copied in encrypted form.
                 String destinationFolder = "destination-folder";
+                deleteRepoDirIfExists(encryptionRepoName, destinationFolder, 
repoFactory, resourceLoader);
                 copyFileToRepo(encSourceDir, fileName, encryptionRepoName, 
destinationFolder, repoFactory, resourceLoader);
                 // Check the copy starts with the encryption magic, not the 
regular codec magic, this means it is encrypted.
                 assertEquals(ENCRYPTION_MAGIC, readCodecMagic(fileName, 
encryptionRepoName, destinationFolder, repoFactory, resourceLoader));
@@ -233,6 +235,20 @@ public class EncryptionBackupRepositoryTest extends 
AbstractBackupRepositoryTest
         return new PluginInfo("repository", attrs, initArgs, null);
     }
 
+    private void deleteRepoDirIfExists(
+            String repoName,
+            String destinationFolder,
+            BackupRepositoryFactory repoFactory,
+            SolrResourceLoader resourceLoader)
+            throws IOException {
+        try (BackupRepository repo = repoFactory.newInstance(resourceLoader, 
repoName)) {
+            URI destinationDir = repo.resolve(getBaseUri(), destinationFolder);
+            if (repo.exists(destinationDir)) {
+                repo.deleteDirectory(destinationDir);
+            }
+        }
+    }
+
     private void copyFileToRepo(
             Directory dir,
             String fileName,
diff --git 
a/encryption/src/test/java/org/apache/solr/encryption/EncryptionHeavyLoadTest.java
 
b/encryption/src/test/java/org/apache/solr/encryption/EncryptionHeavyLoadTest.java
index 3459cf6..e5971d5 100644
--- 
a/encryption/src/test/java/org/apache/solr/encryption/EncryptionHeavyLoadTest.java
+++ 
b/encryption/src/test/java/org/apache/solr/encryption/EncryptionHeavyLoadTest.java
@@ -28,7 +28,6 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.runner.RunWith;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -55,7 +54,6 @@ import static 
org.apache.solr.encryption.TestingKeySupplier.KEY_ID_3;
  * files are decrypted correctly when refreshing the index searcher after each
  * commit.
  */
-@RunWith(com.carrotsearch.randomizedtesting.RandomizedRunner.class)
 public class EncryptionHeavyLoadTest extends SolrCloudTestCase {
 
   // Change the test duration manually to run longer, e.g. 20 minutes.
@@ -91,7 +89,7 @@ public class EncryptionHeavyLoadTest extends 
SolrCloudTestCase {
   public static void beforeClass() throws Exception {
     EncryptionTestUtil.setInstallDirProperty();
     cluster = new MiniSolrCloudCluster.Builder(2, createTempDir())
-      .addConfig("config", EncryptionTestUtil.getConfigPath("collection1"))
+      .addConfig("config", EncryptionTestUtil.getRandomConfigPath())
       .configure();
   }
 
@@ -211,7 +209,7 @@ public class EncryptionHeavyLoadTest extends 
SolrCloudTestCase {
     return random.nextFloat() <= PROBABILITY_OF_WAITING_ENCRYPTION_COMPLETION;
   }
 
-  private EncryptionStatus sendEncryptionRequests(String keyId) {
+  private EncryptionStatus sendEncryptionRequests(String keyId) throws 
Exception {
     EncryptionStatus encryptionStatus = testUtil.encrypt(keyId);
     print("encrypt keyId=" + keyId + " => response success=" + 
encryptionStatus.isSuccess() + " complete=" + encryptionStatus.isComplete());
     return encryptionStatus;
diff --git 
a/encryption/src/test/java/org/apache/solr/encryption/EncryptionRequestHandlerTest.java
 
b/encryption/src/test/java/org/apache/solr/encryption/EncryptionRequestHandlerTest.java
index a943bae..30cfe2b 100644
--- 
a/encryption/src/test/java/org/apache/solr/encryption/EncryptionRequestHandlerTest.java
+++ 
b/encryption/src/test/java/org/apache/solr/encryption/EncryptionRequestHandlerTest.java
@@ -58,7 +58,7 @@ public class EncryptionRequestHandlerTest extends 
SolrCloudTestCase {
     System.setProperty(PROPERTY_INNER_ENCRYPTION_DIRECTORY_FACTORY, 
MockFactory.class.getName());
     EncryptionTestUtil.setInstallDirProperty();
     cluster = new MiniSolrCloudCluster.Builder(2, createTempDir())
-      .addConfig("config", EncryptionTestUtil.getConfigPath("collection1"))
+      .addConfig("config", EncryptionTestUtil.getRandomConfigPath())
       .configure();
   }
 
@@ -74,7 +74,11 @@ public class EncryptionRequestHandlerTest extends 
SolrCloudTestCase {
     solrClient = cluster.getSolrClient();
     CollectionAdminRequest.createCollection(collectionName, 2, 
2).process(solrClient);
     cluster.waitForActiveCollection(collectionName, 2, 4);
-    testUtil = new EncryptionTestUtil(solrClient, collectionName);
+    testUtil = createEncryptionTestUtil(solrClient, collectionName);
+  }
+
+  protected EncryptionTestUtil createEncryptionTestUtil(CloudSolrClient 
solrClient, String collectionName) {
+    return new EncryptionTestUtil(solrClient, collectionName);
   }
 
   @Override
diff --git 
a/encryption/src/test/java/org/apache/solr/encryption/EncryptionTestUtil.java 
b/encryption/src/test/java/org/apache/solr/encryption/EncryptionTestUtil.java
index ebb9538..3321bef 100644
--- 
a/encryption/src/test/java/org/apache/solr/encryption/EncryptionTestUtil.java
+++ 
b/encryption/src/test/java/org/apache/solr/encryption/EncryptionTestUtil.java
@@ -33,6 +33,7 @@ import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.RetryUtil;
+import org.apache.solr.encryption.kms.TestingKmsClient;
 import org.junit.Assert;
 
 import java.io.IOException;
@@ -40,11 +41,14 @@ import java.nio.file.Path;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
+import static org.apache.lucene.tests.util.LuceneTestCase.random;
 import static 
org.apache.solr.encryption.EncryptionRequestHandler.ENCRYPTION_STATE;
 import static org.apache.solr.encryption.EncryptionRequestHandler.PARAM_KEY_ID;
 import static 
org.apache.solr.encryption.EncryptionRequestHandler.STATE_COMPLETE;
 import static org.apache.solr.encryption.EncryptionRequestHandler.STATUS;
 import static 
org.apache.solr.encryption.EncryptionRequestHandler.STATUS_SUCCESS;
+import static 
org.apache.solr.encryption.kms.KmsEncryptionRequestHandler.PARAM_ENCRYPTION_KEY_BLOB;
+import static 
org.apache.solr.encryption.kms.KmsEncryptionRequestHandler.PARAM_TENANT_ID;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -53,6 +57,16 @@ import static org.junit.Assert.assertTrue;
  */
 public class EncryptionTestUtil {
 
+  public static final String TENANT_ID = "tenantIdSolr";
+  public static final String KEY_BLOB = "{" +
+          "\"keyId\":\"%s\"," +
+          "\"keyVersion\":\"0-a-4-a-2\"," +
+          "\"cipherBlob\":\"a+K/8+p+l0\"," +
+          "\"iv\":\"A/k\"," +
+          "\"algorithm\":\"AES-GCM\"," +
+          "\"auth\":\"Q-Z\"," +
+          "}";
+
   private final CloudSolrClient cloudSolrClient;
   private final String collectionName;
   private int docId;
@@ -83,6 +97,13 @@ public class EncryptionTestUtil {
     return SolrTestCaseJ4.getFile("src/test/resources/configs/" + 
configDir).toPath();
   }
 
+  /**
+   * Gets the path of random sub-dir of the encryption module test config.
+   */
+  public static Path getRandomConfigPath() {
+    return getConfigPath(random().nextBoolean() ? "collection1" : "kms");
+  }
+
   /**
    * Adds one doc per provided text, and commits.
    */
@@ -104,9 +125,11 @@ public class EncryptionTestUtil {
     assertEquals(expectedNumResults, response.getResults().size());
   }
 
-  public EncryptionStatus encrypt(String keyId) {
+  public EncryptionStatus encrypt(String keyId) throws Exception {
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set(PARAM_KEY_ID, keyId);
+    params.set(PARAM_TENANT_ID, TENANT_ID);
+    params.set(PARAM_ENCRYPTION_KEY_BLOB, generateKeyBlob(keyId));
     GenericSolrRequest encryptRequest = new 
GenericSolrRequest(SolrRequest.METHOD.GET, "/admin/encrypt", params);
     EncryptionStatus encryptionStatus = new EncryptionStatus();
     forAllReplicas(replica -> {
@@ -117,16 +140,26 @@ public class EncryptionTestUtil {
     return encryptionStatus;
   }
 
-  public void encryptAndExpectCompletion(String keyId) {
+  private String generateKeyBlob(String keyId) throws Exception {
+    return TestingKmsClient.singleton == null ?
+            generateMockKeyBlob(keyId)
+            : TestingKmsClient.singleton.generateKeyBlob(keyId, TENANT_ID);
+  }
+
+  public static String generateMockKeyBlob(String keyId) {
+    return String.format(KEY_BLOB, keyId);
+  }
+
+  public void encryptAndExpectCompletion(String keyId) throws Exception {
     encryptAndCheck(keyId, true);
   }
 
-  public void encryptAndWaitForCompletion(String keyId) throws 
InterruptedException {
+  public void encryptAndWaitForCompletion(String keyId) throws Exception {
     encryptAndCheck(keyId, false);
     waitUntilEncryptionIsComplete(keyId);
   }
 
-  private void encryptAndCheck(String keyId, boolean expectComplete) {
+  private void encryptAndCheck(String keyId, boolean expectComplete) throws 
Exception {
     EncryptionStatus encryptionStatus = encrypt(keyId);
     assertTrue(encryptionStatus.isSuccess());
     assertEquals(expectComplete, encryptionStatus.isComplete());
diff --git 
a/encryption/src/test/java/org/apache/solr/encryption/EncryptionUpdateHandlerSplitTest.java
 
b/encryption/src/test/java/org/apache/solr/encryption/EncryptionUpdateHandlerSplitTest.java
index 4e40031..bd4bf0d 100644
--- 
a/encryption/src/test/java/org/apache/solr/encryption/EncryptionUpdateHandlerSplitTest.java
+++ 
b/encryption/src/test/java/org/apache/solr/encryption/EncryptionUpdateHandlerSplitTest.java
@@ -50,7 +50,7 @@ public class EncryptionUpdateHandlerSplitTest extends 
SolrCloudTestCase {
     System.setProperty("solr.updateHandler", 
TestEncryptionUpdateHandler.class.getName());
     EncryptionTestUtil.setInstallDirProperty();
     cluster = new MiniSolrCloudCluster.Builder(2, createTempDir())
-      .addConfig("config", EncryptionTestUtil.getConfigPath("collection1"))
+      .addConfig("config", EncryptionTestUtil.getRandomConfigPath())
       .configure();
   }
 
diff --git 
a/encryption/src/test/java/org/apache/solr/encryption/EncryptionUpdateLogTest.java
 
b/encryption/src/test/java/org/apache/solr/encryption/EncryptionUpdateLogTest.java
index 50fe76b..049efa1 100644
--- 
a/encryption/src/test/java/org/apache/solr/encryption/EncryptionUpdateLogTest.java
+++ 
b/encryption/src/test/java/org/apache/solr/encryption/EncryptionUpdateLogTest.java
@@ -63,7 +63,7 @@ public class EncryptionUpdateLogTest extends 
SolrCloudTestCase {
     EncryptionTestUtil.setInstallDirProperty();
     System.setProperty("solr.updateLog", 
TestingEncryptionUpdateLog.class.getName());
     cluster = new MiniSolrCloudCluster.Builder(NUM_SHARDS, createTempDir())
-      .addConfig("config", EncryptionTestUtil.getConfigPath("collection1"))
+      .addConfig("config", EncryptionTestUtil.getRandomConfigPath())
       .configure();
   }
 
diff --git 
a/encryption/src/test/java/org/apache/solr/encryption/TestingKeySupplier.java 
b/encryption/src/test/java/org/apache/solr/encryption/TestingKeySupplier.java
index 3dafea0..32e75c1 100644
--- 
a/encryption/src/test/java/org/apache/solr/encryption/TestingKeySupplier.java
+++ 
b/encryption/src/test/java/org/apache/solr/encryption/TestingKeySupplier.java
@@ -16,9 +16,9 @@
  */
 package org.apache.solr.encryption;
 
-import org.apache.lucene.index.IndexFileNames;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.CoreContainer;
+import org.apache.solr.encryption.kms.KmsKeySupplier;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
@@ -26,7 +26,6 @@ import java.util.Base64;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Set;
 import java.util.function.Function;
 
 /**
@@ -46,74 +45,17 @@ public class TestingKeySupplier implements KeySupplier {
   public static final byte[] KEY_SECRET_1 = 
"12345678901234567890123456789012".getBytes(StandardCharsets.UTF_8);
   public static final byte[] KEY_SECRET_2 = 
"34567890123456789012345678901234".getBytes(StandardCharsets.UTF_8);
   public static final byte[] KEY_SECRET_3 = 
"78901234567890123456789012345678".getBytes(StandardCharsets.UTF_8);
-  private static final Map<String, byte[]> KEY_SECRETS = Map.of(KEY_ID_1, 
KEY_SECRET_1,
+  public static final Map<String, byte[]> KEY_SECRETS = Map.of(KEY_ID_1, 
KEY_SECRET_1,
                                                                 KEY_ID_2, 
KEY_SECRET_2,
                                                                 KEY_ID_3, 
KEY_SECRET_3);
 
   private static final String KEY_BLOB_PARAM = "keyBlob";
 
-  /**
-   * File name extensions/suffixes that do NOT need to be encrypted because it 
lacks user/external data.
-   * Other files should be encrypted.
-   * There is some human judgement here as some files may contain vague clues 
as to the shape of the data.
-   */
-  private static final Set<String> CLEARTEXT_EXTENSIONS = Set.of(
-    "doc",    // Document number, frequencies, and skip data
-    "pos",    // Positions
-    "pay",    // Payloads and offsets
-    "dvm",    // Doc values metadata
-    "fdm",    // Stored fields metadata
-    "fdx",    // Stored fields index
-    "nvd",    // Norms data
-    "nvm",    // Norms metadata
-    "fnm",    // Field Infos
-    "si",     // Segment Infos
-    "cfe"     // Compound file entries
-    );
-  // Extensions known to contain sensitive user data, and thus that need to be 
encrypted:
-  // tip    - BlockTree terms index (FST)
-  // tim    - BlockTree terms
-  // tmd    - BlockTree metadata (contains first and last term)
-  // fdt    - Stored fields data
-  // dvd    - Doc values data
-  // cfs    - Compound file (contains all the above files data)
-
-  // Cleartext temporary files:
-  private static final String TMP_EXTENSION = "tmp";
-  private static final String TMP_DOC_IDS = "-doc_ids"; // FieldsIndexWriter
-  private static final String TMP_FILE_POINTERS = "file_pointers"; // 
FieldsIndexWriter
-
   private TestingKeySupplier() {}
 
   @Override
   public boolean shouldEncrypt(String fileName) {
-    String extension = IndexFileNames.getExtension(fileName);
-    if (extension == null) {
-      // segments and pending_segments are never passed as parameter of this 
method.
-      assert !fileName.startsWith(IndexFileNames.SEGMENTS) && 
!fileName.startsWith(IndexFileNames.PENDING_SEGMENTS);
-    } else if (CLEARTEXT_EXTENSIONS.contains(extension)) {
-      // The file extension tells us it does not need to be encrypted.
-      return false;
-    } else if (extension.equals(TMP_EXTENSION)) {
-      // We know some tmp files do not need to be encrypted.
-      int tmpCounterIndex = fileName.lastIndexOf('_');
-      assert tmpCounterIndex != -1;
-      if (endsWith(fileName, TMP_DOC_IDS, tmpCounterIndex)
-      || endsWith(fileName, TMP_FILE_POINTERS, tmpCounterIndex)) {
-        return false;
-      }
-    }
-    // By default, all other files should be encrypted.
-    return true;
-  }
-
-  private static boolean endsWith(String s, String suffix, int endIndex) {
-    // Inspired from JDK String where endsWith calls startsWith.
-    // Here we look for [suffix] from index [endIndex - suffix.length()].
-    // This is equivalent to
-    // s.substring(0, endIndex).endsWith(suffix)
-    // without creating a substring.
-    return s.startsWith(suffix, endIndex - suffix.length());
+    return KmsKeySupplier.shouldEncryptFile(fileName);
   }
 
   @Override
@@ -179,7 +121,6 @@ public class TestingKeySupplier implements KeySupplier {
 
     @Override
     public void init(NamedList<?> args, CoreContainer coreContainer) {
-      // Do nothing.
     }
 
     @Override
diff --git 
a/encryption/src/test/java/org/apache/solr/encryption/kms/KmsEncryptionRequestHandlerTest.java
 
b/encryption/src/test/java/org/apache/solr/encryption/kms/KmsEncryptionRequestHandlerTest.java
new file mode 100644
index 0000000..24780a6
--- /dev/null
+++ 
b/encryption/src/test/java/org/apache/solr/encryption/kms/KmsEncryptionRequestHandlerTest.java
@@ -0,0 +1,51 @@
+package org.apache.solr.encryption.kms;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.request.GenericSolrRequest;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.encryption.EncryptionTestUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.apache.solr.encryption.EncryptionRequestHandler.PARAM_KEY_ID;
+import static 
org.apache.solr.encryption.kms.KmsEncryptionRequestHandler.PARAM_ENCRYPTION_KEY_BLOB;
+import static 
org.apache.solr.encryption.kms.KmsEncryptionRequestHandler.PARAM_TENANT_ID;
+
+/**
+ * Tests {@link KmsEncryptionRequestHandler}.
+ */
+public class KmsEncryptionRequestHandlerTest extends SolrCloudTestCase {
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    EncryptionTestUtil.setInstallDirProperty();
+    cluster = new MiniSolrCloudCluster.Builder(2, createTempDir())
+            .addConfig("config", 
EncryptionTestUtil.getConfigPath("collection1"))
+            .configure();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    cluster.shutdown();
+  }
+
+  @Test(expected = SolrException.class)
+  public void testEncryptRequest_NoEncryptionKeyBlobParam() throws Exception {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(PARAM_KEY_ID, "keyId");
+    params.set(PARAM_TENANT_ID, "tenantId");
+    cluster.getSolrClient().request(new 
GenericSolrRequest(SolrRequest.METHOD.GET, "/admin/encrypt", params));
+  }
+
+  @Test(expected = SolrException.class)
+  public void testEncryptRequest_NoTenantIdParam() throws Exception {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(PARAM_KEY_ID, "keyId");
+    params.set(PARAM_ENCRYPTION_KEY_BLOB, "keyBlob");
+    cluster.getSolrClient().request(new 
GenericSolrRequest(SolrRequest.METHOD.GET, "/admin/encrypt", params));
+  }
+}
\ No newline at end of file
diff --git 
a/encryption/src/test/java/org/apache/solr/encryption/kms/KmsKeySupplierTest.java
 
b/encryption/src/test/java/org/apache/solr/encryption/kms/KmsKeySupplierTest.java
new file mode 100644
index 0000000..3fb89ed
--- /dev/null
+++ 
b/encryption/src/test/java/org/apache/solr/encryption/kms/KmsKeySupplierTest.java
@@ -0,0 +1,341 @@
+package org.apache.solr.encryption.kms;
+
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import com.github.benmanes.caffeine.cache.Ticker;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.encryption.EncryptionTestUtil;
+import org.apache.solr.encryption.KeySupplier;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+import static 
org.apache.solr.encryption.kms.KmsEncryptionRequestHandler.PARAM_ENCRYPTION_KEY_BLOB;
+import static 
org.apache.solr.encryption.kms.KmsEncryptionRequestHandler.PARAM_TENANT_ID;
+import static 
org.apache.solr.encryption.kms.KmsKeySupplier.Factory.PARAM_KMS_CLIENT_FACTORY;
+import static org.apache.solr.encryption.matcher.EventuallyMatcher.eventually;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+/**
+ * Tests {@link KmsKeySupplier} and its {@link KmsKeyCache}.
+ */
+public class KmsKeySupplierTest extends SolrTestCaseJ4 {
+
+    private static final Map<String, String> SECRETS = Map.of(
+            "k1", "s1",
+            "k2", "s2"
+    );
+    private static final String BLOB_SUFFIX = "Blob";
+
+    private SpyingKeySupplierFactory keySupplierFactory;
+    private MockKmsClient kmsClient;
+    private CookieSupplier cookieSupplier;
+    private KeySupplier keySupplier;
+
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        initCore("solrconfig.xml", "schema.xml", 
EncryptionTestUtil.getConfigPath().toString());
+        keySupplierFactory = new SpyingKeySupplierFactory();
+        NamedList<String> args = new NamedList<>();
+        args.add(PARAM_KMS_CLIENT_FACTORY, 
TestingKmsClient.Factory.class.getName());
+        keySupplierFactory.init(args, h.getCoreContainer());
+        keySupplierFactory.kmsMetrics.reset();
+        kmsClient = keySupplierFactory.kmsClient;
+        cookieSupplier = new CookieSupplier();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        try {
+            if (keySupplier != null) {
+                keySupplier.close();
+            }
+            deleteCore();
+        } finally {
+            super.tearDown();
+        }
+    }
+
+    @Test
+    public void testKeyCacheMissAndHit() throws Exception {
+        keySupplier = keySupplierFactory.create();
+
+        verifyCacheMiss("k1");
+        verifyCacheHit("k1");
+        verifyCacheMiss("k2");
+        verifyCacheHit("k2");
+        verifyCacheHit("k1");
+
+        assertKeyDecryptMetrics(2, 0);
+    }
+
+    @Test
+    public void testKeyCacheExpiration() throws Exception {
+        Duration cacheExpiration = Duration.ofMillis(300);
+        AtomicLong timeNs = new AtomicLong(System.nanoTime());
+        keySupplierFactory.setCacheExpiration(cacheExpiration);
+        keySupplierFactory.setTicker(timeNs::get);
+        keySupplier = keySupplierFactory.create();
+
+        verifyCacheMiss("k1");
+        verifyCacheHit("k1");
+        assertEquals(List.of(), keySupplierFactory.removedKeyIds);
+        // Verify that without interaction, the entry is removed after the 
expiration.
+        // We use the test ticker to simulate the time elapsed.
+        timeNs.addAndGet(cacheExpiration.getNano() * 2L);
+        // Wait a bit for the Caffeine cache to read the ticker. Caffeine 
batches checks
+        // and operations each second; it is not configurable.
+        assertThat(() -> keySupplierFactory.removedKeyIds, 
eventually(equalTo(List.of("k1")), Duration.ofSeconds(2)));
+
+        keySupplierFactory.removedKeyIds.clear();
+        verifyCacheMiss("k1");
+        verifyCacheHit("k1");
+        assertEquals(List.of(), keySupplierFactory.removedKeyIds);
+
+        assertKeyDecryptMetrics(2, 0);
+    }
+
+    @Test
+    public void testExceptionHandling() throws Exception {
+        AtomicLong timeNs = new AtomicLong(System.nanoTime());
+        keySupplierFactory.setTicker(timeNs::get);
+        keySupplier = keySupplierFactory.create();
+
+        // Verify that an unknown key exception is propagated.
+        try {
+            keySupplier.getKeySecret("k3", cookieSupplier);
+            fail("Expected exception not thrown");
+        } catch (NoSuchElementException e) {
+            // Expected.
+        }
+
+        // Verify that any KMS exception is propagated.
+        try {
+            kmsClient.simulatedException = new Exception("Test");
+            keySupplier.getKeySecret("k1", cookieSupplier);
+            fail("Expected exception not thrown");
+        } catch (RuntimeException e) {
+            // Expected.
+            assertEquals(kmsClient.simulatedException, e.getCause());
+            kmsClient.simulatedException = null;
+        }
+
+        // Verify that the next same call also fails with a special delay 
exception.
+        try {
+            keySupplier.getKeySecret("k1", cookieSupplier);
+            fail("Expected exception not thrown");
+        } catch (SolrException e) {
+            // Expected.
+            assertEquals(SolrException.ErrorCode.SERVICE_UNAVAILABLE.code, 
e.code());
+        }
+
+        // Verify that any IO exception is propagated.
+        timeNs.addAndGet(KmsKeySupplier.FAILED_DECRYPT_DELAY_NS);
+        try {
+            kmsClient.simulatedException = new IOException("Test");
+            keySupplier.getKeySecret("k1", cookieSupplier);
+            fail("Expected exception not thrown");
+        } catch (RuntimeException e) {
+            // Expected.
+            assertEquals(kmsClient.simulatedException, e.getCause());
+            kmsClient.simulatedException = null;
+        }
+
+        assertKeyDecryptMetrics(3,3);
+    }
+
+    @Test
+    public void testDelayAfterFailedDecrypt() throws Exception {
+        AtomicLong timeNs = new AtomicLong(System.nanoTime());
+        keySupplierFactory.setTicker(timeNs::get);
+        keySupplier = keySupplierFactory.create();
+
+        // When a call to KMS decrypt fails with an exception.
+        try {
+            kmsClient.simulatedException = new Exception("Test");
+            keySupplier.getKeySecret("k1", cookieSupplier);
+            fail("Expected exception not thrown");
+        } catch (RuntimeException e) {
+            // Expected.
+            assertEquals(kmsClient.simulatedException, e.getCause());
+            kmsClient.simulatedException = null;
+        }
+
+        // Then the next same call also fails with a special delay exception.
+        try {
+            keySupplier.getKeySecret("k1", cookieSupplier);
+            fail("Expected exception not thrown");
+        } catch (SolrException e) {
+            // Expected.
+            assertEquals(SolrException.ErrorCode.SERVICE_UNAVAILABLE.code, 
e.code());
+        }
+
+        // And when the delay expires.
+        timeNs.addAndGet(KmsKeySupplier.FAILED_DECRYPT_DELAY_NS);
+
+        // Then the same call succeeds.
+        verifyCacheMiss("k1");
+        verifyCacheHit("k1");
+
+        assertKeyDecryptMetrics(2,1);
+    }
+
+    private void verifyCacheMiss(String key) throws Exception {
+        cookieSupplier.processedKeyIds.clear();
+        kmsClient.processedKeyIds.clear();
+        byte[] keySecret = keySupplier.getKeySecret(key, cookieSupplier);
+        assertEquals(SECRETS.get(key), secret(keySecret));
+        assertEquals(List.of(key), cookieSupplier.processedKeyIds);
+        assertEquals(List.of(key), kmsClient.processedKeyIds);
+    }
+
+    private void verifyCacheHit(String key) throws Exception {
+        cookieSupplier.processedKeyIds.clear();
+        kmsClient.processedKeyIds.clear();
+        byte[] keySecret = keySupplier.getKeySecret(key, cookieSupplier);
+        assertEquals(SECRETS.get(key), secret(keySecret));
+        assertEquals(List.of(), cookieSupplier.processedKeyIds);
+        assertEquals(List.of(), kmsClient.processedKeyIds);
+    }
+
+    private static String secret(byte[] bytes) {
+        return new String(bytes, StandardCharsets.UTF_8);
+    }
+
+    private void assertKeyDecryptMetrics(int expectedNumKeyDecrypt, int 
expectedNumFailedKeyDecrypt) {
+        assertEquals(expectedNumKeyDecrypt, 
keySupplierFactory.kmsMetrics.getNumKeyDecrypt());
+        assertEquals(expectedNumFailedKeyDecrypt, 
keySupplierFactory.kmsMetrics.getNumFailedKeyDecrypt());
+    }
+
+    private static class MockKmsClient implements KmsClient {
+
+        List<String> processedKeyIds = Collections.synchronizedList(new 
ArrayList<>());
+        volatile Exception simulatedException;
+        boolean closeCalled;
+
+        @Override
+        public byte[] decrypt(String keyId, String dataKeyBlob, String 
tenantId, String requestId)
+                throws Exception {
+            if (simulatedException != null) {
+                throw simulatedException;
+            }
+            assertTrue(dataKeyBlob.endsWith(BLOB_SUFFIX));
+            String expectedKeyId = dataKeyBlob.substring(0, 
dataKeyBlob.length() - BLOB_SUFFIX.length());
+            assertEquals(expectedKeyId, keyId);
+            processedKeyIds.add(keyId);
+            String secret = SECRETS.get(keyId);
+            if (secret == null) {
+                throw new NoSuchElementException();
+            }
+            return secret.getBytes(StandardCharsets.UTF_8);
+        }
+
+        @Override
+        public void close() {
+            closeCalled = true;
+        }
+    }
+
+    private static class CookieSupplier implements Function<String, 
Map<String, String>> {
+
+        List<String> processedKeyIds = Collections.synchronizedList(new 
ArrayList<>());
+
+        @Override
+        public Map<String, String> apply(String keyId) {
+            processedKeyIds.add(keyId);
+            return Map.of(PARAM_ENCRYPTION_KEY_BLOB, keyId + BLOB_SUFFIX, 
PARAM_TENANT_ID, "tenantId");
+        }
+    }
+
+    private static class SpyingKeySupplierFactory extends 
KmsKeySupplier.Factory {
+
+        CoreContainer coreContainer;
+        MockKmsClient kmsClient = new MockKmsClient();
+        Duration cacheExpiration;
+        Ticker ticker = Ticker.systemTicker();
+        SpyingKmsMetrics kmsMetrics;
+        List<String> removedKeyIds = Collections.synchronizedList(new 
ArrayList<>());
+
+        @Override
+        public void init(NamedList<?> args, CoreContainer coreContainer) {
+            coreContainer.getObjectCache().remove(KEY_SUPPLIER);
+            super.init(args, coreContainer);
+            this.coreContainer = coreContainer;
+        }
+
+        void setCacheExpiration(Duration cacheExpiration) {
+            this.cacheExpiration = cacheExpiration;
+        }
+
+        void setTicker(Ticker ticker) throws IOException {
+            this.ticker = ticker;
+            KeySupplier keySupplier = (KeySupplier) 
coreContainer.getObjectCache().remove(KEY_SUPPLIER);
+            keySupplier.close();
+            NamedList<String> args = new NamedList<>();
+            args.add(PARAM_KMS_CLIENT_FACTORY, 
TestingKmsClient.Factory.class.getName());
+            init(args, coreContainer);
+        }
+
+        @Override
+        protected Duration getKeyCacheExpiration(NamedList<?> args) {
+            if (cacheExpiration != null) {
+                return cacheExpiration;
+            }
+            return super.getKeyCacheExpiration(args);
+        }
+
+        @Override
+        protected KmsKeyCache createKeyCache(Duration cacheExpiration) {
+            return new SpyingKeyCache(cacheExpiration, ticker);
+        }
+
+        @Override
+        protected KeySupplier createKeySupplier(
+                KmsClient kmsClient,
+                KmsKeyCache kmsKeyCache,
+                KmsMetrics kmsMetrics) {
+            return new KmsKeySupplier(
+                    this.kmsClient,
+                    kmsKeyCache,
+                    new ConcurrentHashMap<>(),
+                    kmsMetrics,
+                    ticker);
+        }
+
+        @Override
+        protected KmsMetrics createEncryptionMetrics(CoreContainer 
coreContainer) {
+            return kmsMetrics = new SpyingKmsMetrics(coreContainer);
+        }
+
+        class SpyingKeyCache extends KmsKeyCache {
+
+            SpyingKeyCache(Duration cacheExpiration, Ticker ticker) {
+                super(cacheExpiration, ticker);
+            }
+
+            @Override
+            protected RemovalListener<String, byte[]> 
createCacheRemovalListener() {
+                RemovalListener<String, byte[]> delegate = 
super.createCacheRemovalListener();
+                return (keyId, secret, removalCause) -> {
+                    removedKeyIds.add(keyId);
+                    delegate.onRemoval(keyId, secret, removalCause);
+                };
+            }
+        }
+    }
+}
diff --git 
a/encryption/src/test/java/org/apache/solr/encryption/kms/SpyingKmsMetrics.java 
b/encryption/src/test/java/org/apache/solr/encryption/kms/SpyingKmsMetrics.java
new file mode 100644
index 0000000..0d2dd18
--- /dev/null
+++ 
b/encryption/src/test/java/org/apache/solr/encryption/kms/SpyingKmsMetrics.java
@@ -0,0 +1,32 @@
+package org.apache.solr.encryption.kms;
+
+import org.apache.solr.core.CoreContainer;
+
+/**
+ * Spies {@link KmsMetrics} and allows tests to reset them.
+ */
+public class SpyingKmsMetrics extends KmsMetrics {
+
+    public SpyingKmsMetrics(CoreContainer coreContainer) {
+        super(coreContainer);
+    }
+
+    public void reset() {
+        numFailedKmsInit.dec(numFailedKmsInit.getCount());
+        numKeyDecrypt.dec(numKeyDecrypt.getCount());
+        numFailedKeyDecrypt.dec(numFailedKeyDecrypt.getCount());
+        numSlowKeyDecrypt.dec(numSlowKeyDecrypt.getCount());
+    }
+
+    public long getNumFailedKmsInit() {
+        return numFailedKmsInit.getCount();
+    }
+
+    public long getNumKeyDecrypt() {
+        return numKeyDecrypt.getCount();
+    }
+
+    public long getNumFailedKeyDecrypt() {
+        return numFailedKeyDecrypt.getCount();
+    }
+}
diff --git 
a/encryption/src/test/java/org/apache/solr/encryption/kms/TestingKmsClient.java 
b/encryption/src/test/java/org/apache/solr/encryption/kms/TestingKmsClient.java
new file mode 100644
index 0000000..5666c0c
--- /dev/null
+++ 
b/encryption/src/test/java/org/apache/solr/encryption/kms/TestingKmsClient.java
@@ -0,0 +1,60 @@
+package org.apache.solr.encryption.kms;
+
+import org.apache.solr.common.util.NamedList;
+
+import static org.apache.solr.encryption.EncryptionTestUtil.TENANT_ID;
+import static 
org.apache.solr.encryption.EncryptionTestUtil.generateMockKeyBlob;
+import static org.apache.solr.encryption.TestingKeySupplier.KEY_SECRETS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Mock {@link KmsClient} implementation for tests.
+ * <p>
+ * Extending classes could override {@link #decrypt} and {@link 
#generateKeyBlob} to run tests
+ * with real encryption keys.
+ */
+public class TestingKmsClient implements KmsClient {
+
+    public static volatile TestingKmsClient singleton;
+
+    protected TestingKmsClient() {
+        singleton = this;
+    }
+
+    @Override
+    public byte[] decrypt(String keyId, String keyBlob, String tenantId, 
String requestId) {
+        byte[] secret = KEY_SECRETS.get(keyId);
+        assertNotNull("No key defined for " + keyId, secret);
+        // See EncryptionTestUtil.KEY_BLOB structure.
+        assertTrue("Invalid key blob", keyBlob.startsWith("{\"keyId\":\"" + 
keyId + "\""));
+        assertEquals(TENANT_ID, tenantId);
+        assertNotNull(requestId);
+        assertNotEquals("", requestId);
+        return secret;
+    }
+
+    /**
+     * Generates the key blob.
+     */
+    public String generateKeyBlob(String keyId, String tenantId) throws 
Exception {
+        return generateMockKeyBlob(keyId);
+    }
+
+    @Override
+    public void close() {
+    }
+
+    /**
+     * Creates a {@link TestingKmsClient}.
+     */
+    public static class Factory implements KmsClient.Factory {
+
+        @Override
+        public KmsClient create(NamedList<?> args) {
+            return new TestingKmsClient();
+        }
+    }
+}
diff --git 
a/encryption/src/test/java/org/apache/solr/encryption/matcher/EventuallyMatcher.java
 
b/encryption/src/test/java/org/apache/solr/encryption/matcher/EventuallyMatcher.java
new file mode 100644
index 0000000..8a28360
--- /dev/null
+++ 
b/encryption/src/test/java/org/apache/solr/encryption/matcher/EventuallyMatcher.java
@@ -0,0 +1,66 @@
+package org.apache.solr.encryption.matcher;
+
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.function.Supplier;
+
+/**
+ * A Hamcrest matcher that retries in loop a condition and fails only after a 
timeout expired.
+ */
+public class EventuallyMatcher<T> extends TypeSafeMatcher<Supplier<T>> {
+
+    private static final Duration DEFAULT_TIMEOUT = Duration.ofMillis(5000);
+    private static final long SLEEP_TIME_MS = 500;
+
+    private final Matcher<T> matcher;
+    private final Duration timeout;
+
+    private EventuallyMatcher(Matcher<T> matcher) {
+        this(matcher, DEFAULT_TIMEOUT);
+    }
+
+    private EventuallyMatcher(Matcher<T> matcher, Duration timeout) {
+        this.matcher = matcher;
+        this.timeout = timeout;
+    }
+
+    public static <T> EventuallyMatcher<T> eventually(Matcher<T> matcher) {
+        return new EventuallyMatcher<T>(matcher);
+    }
+
+    public static <T> EventuallyMatcher<T> eventually(Matcher<T> matcher, 
Duration timeout) {
+        return new EventuallyMatcher<T>(matcher, timeout);
+    }
+
+    @Override
+    protected boolean matchesSafely(Supplier<T> item) {
+        Instant start = Instant.now();
+        while (Duration.between(start, Instant.now()).compareTo(timeout) < 0) {
+            T val = item.get();
+            if (val != null && matcher.matches(val)) {
+                return true;
+            }
+            try {
+                Thread.sleep(SLEEP_TIME_MS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                break;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public void describeTo(Description description) {
+        description.appendDescriptionOf(matcher);
+    }
+
+    @Override
+    public void describeMismatchSafely(Supplier<T> item, Description 
mismatchDescription) {
+        mismatchDescription.appendText(item.get().toString());
+    }
+}
diff --git a/encryption/src/test/resources/configs/kms/schema.xml 
b/encryption/src/test/resources/configs/kms/schema.xml
new file mode 100644
index 0000000..72bc2a8
--- /dev/null
+++ b/encryption/src/test/resources/configs/kms/schema.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+    <fieldType name="string" class="solr.StrField"/>
+    <fieldType name="int" class="${solr.tests.IntegerFieldType}" 
docValues="${solr.tests.numeric.dv}" precisionStep="0" omitNorms="true" 
positionIncrementGap="0"/>
+    <fieldType name="long" class="${solr.tests.LongFieldType}" 
docValues="${solr.tests.numeric.dv}" precisionStep="0" omitNorms="true" 
positionIncrementGap="0"/>
+    <fieldType name="text" class="solr.TextField">
+        <analyzer>
+            <tokenizer class="solr.StandardTokenizerFactory"/>
+            <filter class="solr.LowerCaseFilterFactory"/>
+        </analyzer>
+    </fieldType>
+    <field name="_version_" type="long" indexed="true" stored="true"/>
+    <field name="_root_" type="string" indexed="true" stored="true" 
multiValued="false" required="false"/>
+    <field name="id" type="string" indexed="true" stored="true"/>
+    <field name="text" type="text" indexed="true" stored="false"/>
+    <field name="multi" type="text" indexed="true" stored="false" 
multiValued="true"/>
+    <dynamicField name="*_s"  type="string"  indexed="true"  stored="true" />
+    <uniqueKey>id</uniqueKey>
+</schema>
diff --git a/encryption/src/test/resources/configs/kms/solrconfig.xml 
b/encryption/src/test/resources/configs/kms/solrconfig.xml
new file mode 100644
index 0000000..bde6922
--- /dev/null
+++ b/encryption/src/test/resources/configs/kms/solrconfig.xml
@@ -0,0 +1,82 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- solrconfig.xml that enables the index and update log encryption, and uses 
the KmsKeySupplier. -->
+
+<config>
+
+  <dataDir>${solr.data.dir:}</dataDir>
+
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+  <requestHandler name="/select" class="solr.SearchHandler">
+    <lst name="defaults">
+      <str name="echoParams">explicit</str>
+      <str name="indent">true</str>
+      <str name="df">text</str>
+    </lst>
+  </requestHandler>
+
+  <!-- EncryptionDirectoryFactory opens index files with EncryptionDirectory. 
-->
+  <directoryFactory name="DirectoryFactory"
+                    
class="org.apache.solr.encryption.EncryptionDirectoryFactory">
+    <!-- KeySupplier provides encryption keys. -->
+    <str 
name="keySupplierFactory">org.apache.solr.encryption.kms.KmsKeySupplier$Factory</str>
+    <str 
name="kmsClientFactory">org.apache.solr.encryption.kms.TestingKmsClient$Factory</str>
+    <!-- AesCtrEncrypter implements the encryption transformation 
AES/CTR/NoPadding. -->
+    <str 
name="encrypterFactory">org.apache.solr.encryption.crypto.LightAesCtrEncrypter$Factory</str>
+  </directoryFactory>
+
+  <!-- EncryptionUpdateHandler transfers the encryption key ids from a commit 
to the next. -->
+  <updateHandler 
class="${solr.updateHandler:org.apache.solr.encryption.EncryptionUpdateHandler}">
+    <!-- EncryptionUpdateLog encrypts transaction logs if the index is 
encrypted. -->
+    <updateLog 
class="${solr.updateLog:org.apache.solr.encryption.EncryptionUpdateLog}"/>
+  </updateHandler>
+
+  <!-- Encryption handler can be called to trigger the encryption of the index 
and update log. -->
+  <requestHandler name="/admin/encrypt" 
class="org.apache.solr.encryption.kms.KmsEncryptionRequestHandler"/>
+
+  <indexConfig>
+    <mergeScheduler 
class="${solr.mscheduler:org.apache.lucene.index.ConcurrentMergeScheduler}"/>
+
+    <!-- Chain of MergePolicy factories:
+         - EncryptionMergePolicy detects when a force-merge is triggered with 
a special max
+           number of segments equal to Integer.MAX_VALUE, in this case it 
merges (rewrites)
+           individually each segment which is not encrypted with the latest 
active key id.
+         - TieredMergePolicy is the standard merge policy.
+    -->
+    <mergePolicyFactory 
class="org.apache.solr.encryption.EncryptionMergePolicyFactory">
+      <str name="wrapped.prefix">delegate</str>
+      <str 
name="delegate.class">org.apache.solr.index.TieredMergePolicyFactory</str>
+    </mergePolicyFactory>
+  </indexConfig>
+
+  <backup>
+    <!-- Encryption backup repository delegates to another backup repository. 
-->
+    <repository name="encryptionBackupRepository" 
class="org.apache.solr.encryption.EncryptionBackupRepository" default="true">
+      <str name="delegateRepoName">localBackupRepository</str>
+    </repository>
+    <repository name="localBackupRepository" 
class="org.apache.solr.core.backup.repository.LocalFileSystemRepository">
+      <str name="location">/solr/backup_data</str>
+    </repository>
+  </backup>
+
+</config>

Reply via email to