This is an automated email from the ASF dual-hosted git repository.

ndipiazza pushed a commit to branch TIKA-4547-phase1
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/TIKA-4547-phase1 by this push:
     new 8f20d8f73 TIKA-4557 create tika-proto in its own module - **Phase 1: 
Foundation (TIKA-4547)**                                                 - 
Implement StateStore abstraction                                                
 - Refactor ExpiringFetcherStore                                                
 - Add InMemoryStateStore                                                 - 
Update documentation
8f20d8f73 is described below

commit 8f20d8f734945a84deab1dddb95ee68e85688d99
Author: Nicholas DiPiazza <[email protected]>
AuthorDate: Mon Dec 8 10:14:28 2025 -0600

    TIKA-4557 create tika-proto in its own module - **Phase 1: Foundation 
(TIKA-4547)**
                                                    - Implement StateStore 
abstraction
                                                    - Refactor 
ExpiringFetcherStore
                                                    - Add InMemoryStateStore
                                                    - Update documentation
---
 tika-grpc/pom.xml                                  |   5 +
 .../tika/pipes/grpc/ExpiringFetcherStore.java      | 103 ------------
 .../apache/tika/pipes/grpc/TikaGrpcServerImpl.java |  48 ++++--
 ...FetcherStoreTest.java => FetcherStoreTest.java} |  42 ++---
 .../tika/pipes/api/statestore/StateStore.java      | 124 ++++++++++++++
 .../pipes/api/statestore/StateStoreException.java  |  35 ++++
 .../pipes/api/statestore/StateStoreFactory.java    |  28 +++
 .../pipes/core/statestore/ComponentSerializer.java | 112 ++++++++++++
 .../pipes/core/statestore/InMemoryStateStore.java  | 130 ++++++++++++++
 .../core/statestore/InMemoryStateStoreFactory.java |  53 ++++++
 .../pipes/core/statestore/StateStoreManager.java   | 109 ++++++++++++
 .../core/statestore/InMemoryStateStoreTest.java    | 187 +++++++++++++++++++++
 12 files changed, 831 insertions(+), 145 deletions(-)

diff --git a/tika-grpc/pom.xml b/tika-grpc/pom.xml
index fd4141737..387ac3000 100644
--- a/tika-grpc/pom.xml
+++ b/tika-grpc/pom.xml
@@ -218,6 +218,11 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.tika</groupId>
+      <artifactId>tika-pipes-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.tika</groupId>
       <artifactId>tika-pipes-http</artifactId>
diff --git 
a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/ExpiringFetcherStore.java 
b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/ExpiringFetcherStore.java
deleted file mode 100644
index 70553d771..000000000
--- 
a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/ExpiringFetcherStore.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.grpc;
-
-import java.time.Instant;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.tika.pipes.api.fetcher.Fetcher;
-import org.apache.tika.plugins.ExtensionConfig;
-
-public class ExpiringFetcherStore implements AutoCloseable {
-    private static final Logger LOG = 
LoggerFactory.getLogger(ExpiringFetcherStore.class);
-    public static final long EXPIRE_JOB_INITIAL_DELAY = 1L;
-    private final Map<String, Fetcher> fetchers = 
Collections.synchronizedMap(new HashMap<>());
-    private final Map<String, ExtensionConfig> fetcherConfigs = 
Collections.synchronizedMap(new HashMap<>());
-    private final Map<String, Instant> fetcherLastAccessed = 
Collections.synchronizedMap(new HashMap<>());
-
-    private final ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
-
-    public ExpiringFetcherStore(int expireAfterSeconds, int 
checkForExpiredFetchersDelaySeconds) {
-        executorService.scheduleAtFixedRate(() -> {
-            Set<String> expired = new HashSet<>();
-            for (String fetcherPluginId : fetchers.keySet()) {
-                Instant lastAccessed = 
fetcherLastAccessed.get(fetcherPluginId);
-                if (lastAccessed == null) {
-                    LOG.error("Detected a fetcher with no last access time. 
FetcherName={}", fetcherPluginId);
-                    expired.add(fetcherPluginId);
-                } else if (Instant
-                        .now()
-                        
.isAfter(lastAccessed.plusSeconds(expireAfterSeconds))) {
-                    LOG.info("Detected stale fetcher {} hasn't been accessed 
in {} seconds. " + "Deleting.", fetcherPluginId, Instant
-                            .now()
-                            .getEpochSecond() - lastAccessed.getEpochSecond());
-                    expired.add(fetcherPluginId);
-                }
-            }
-            for (String expiredFetcherId : expired) {
-                deleteFetcher(expiredFetcherId);
-            }
-        }, EXPIRE_JOB_INITIAL_DELAY, checkForExpiredFetchersDelaySeconds, 
TimeUnit.SECONDS);
-    }
-
-    public boolean deleteFetcher(String fetcherPluginId) {
-        boolean success = fetchers.remove(fetcherPluginId) != null;
-        fetcherConfigs.remove(fetcherPluginId);
-        fetcherLastAccessed.remove(fetcherPluginId);
-        return success;
-    }
-
-    public Map<String, Fetcher> getFetchers() {
-        return fetchers;
-    }
-
-    public Map<String, ExtensionConfig> getFetcherConfigs() {
-        return fetcherConfigs;
-    }
-
-    /**
-     * This method will get the fetcher, but will also log the access the 
fetcher as having
-     * been accessed. This prevents the scheduled job from removing the stale 
fetcher.
-     */
-    public <T extends Fetcher> T getFetcherAndLogAccess(String 
fetcherPluginId) {
-        fetcherLastAccessed.put(fetcherPluginId, Instant.now());
-        return (T) fetchers.get(fetcherPluginId);
-    }
-
-    public <T extends Fetcher> void createFetcher(T fetcher, ExtensionConfig 
config) {
-        String id = fetcher.getExtensionConfig().id();
-
-        fetchers.put(id, fetcher);
-        fetcherConfigs.put(id, config);
-        getFetcherAndLogAccess(id);
-    }
-
-    @Override
-    public void close() {
-        executorService.shutdownNow();
-    }
-}
diff --git 
a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java 
b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
index cfea3203c..bf45eca0d 100644
--- a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
+++ b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
@@ -78,9 +78,13 @@ import org.apache.tika.pipes.api.PipesResult;
 import org.apache.tika.pipes.api.emitter.EmitKey;
 import org.apache.tika.pipes.api.fetcher.FetchKey;
 import org.apache.tika.pipes.api.fetcher.Fetcher;
+import org.apache.tika.pipes.api.statestore.StateStore;
+import org.apache.tika.pipes.core.FetcherStore;
 import org.apache.tika.pipes.core.PipesClient;
 import org.apache.tika.pipes.core.PipesConfig;
+import org.apache.tika.pipes.core.statestore.StateStoreManager;
 import org.apache.tika.plugins.ExtensionConfig;
+import org.apache.tika.plugins.TikaPluginManager;
 import org.apache.tika.utils.XMLReaderUtils;
 
 class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
@@ -97,7 +101,8 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
      */
     PipesConfig pipesConfig;
     PipesClient pipesClient;
-    ExpiringFetcherStore expiringFetcherStore;
+    FetcherStore fetcherStore;
+    StateStore stateStore;
 
     String tikaConfigPath;
 
@@ -114,10 +119,23 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
             tikaConfigFile = tmpTikaConfigFile;
             tikaConfigPath = tikaConfigFile.getAbsolutePath();
         }
-        pipesConfig = 
TikaLoader.load(tikaConfigFile.toPath()).configs().load("pipes", 
PipesConfig.class);
+
+        TikaLoader tikaLoader = TikaLoader.load(tikaConfigFile.toPath());
+        pipesConfig = tikaLoader.configs().load("pipes", PipesConfig.class);
         pipesClient = new PipesClient(pipesConfig);
 
-        expiringFetcherStore = new 
ExpiringFetcherStore(pipesConfig.getStaleFetcherTimeoutSeconds(),
+        // Load StateStore for distributed state management
+        try {
+            TikaPluginManager pluginManager = 
TikaPluginManager.load(tikaLoader.getConfig());
+            stateStore = StateStoreManager.load(pluginManager, 
tikaLoader.getConfig());
+            LOG.info("Loaded StateStore: {}", stateStore.getClass().getName());
+        } catch (Exception e) {
+            LOG.warn("Failed to load StateStore from config, using default 
in-memory store", e);
+            stateStore = StateStoreManager.createDefault();
+        }
+
+        fetcherStore = new FetcherStore(stateStore,
+                pipesConfig.getStaleFetcherTimeoutSeconds(),
                 pipesConfig.getStaleFetcherDelaySeconds());
         this.tikaConfigPath = tikaConfigPath;
         try {
@@ -139,10 +157,10 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
         for (int i = 0; i < fetchersElement.getChildNodes().getLength(); ++i) {
             
fetchersElement.removeChild(fetchersElement.getChildNodes().item(i));
         }
-        for (var fetcherEntry : expiringFetcherStore.getFetchers().entrySet()) 
{
+        for (var fetcherEntry : fetcherStore.getFetchers().entrySet()) {
             Fetcher fetcherObject = fetcherEntry.getValue();
             Map<String, Object> fetcherConfigParams = 
OBJECT_MAPPER.convertValue(
-                    
expiringFetcherStore.getFetcherConfigs().get(fetcherEntry.getKey()),
+                    
fetcherStore.getFetcherConfigs().get(fetcherEntry.getKey()),
                     new TypeReference<>() {
                     });
             Element fetcher = tikaConfigDoc.createElement("fetcher");
@@ -219,7 +237,7 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
     private void fetchAndParseImpl(FetchAndParseRequest request,
                                    StreamObserver<FetchAndParseReply> 
responseObserver) {
         Fetcher fetcher =
-                
expiringFetcherStore.getFetcherAndLogAccess(request.getFetcherId());
+                fetcherStore.getFetcherAndLogAccess(request.getFetcherId());
         if (fetcher == null) {
             throw new RuntimeException(
                     "Could not find fetcher with name " + 
request.getFetcherId());
@@ -230,7 +248,7 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
             String additionalFetchConfigJson = 
request.getAdditionalFetchConfigJson();
             if (StringUtils.isNotBlank(additionalFetchConfigJson)) {
                 // The fetch and parse has the option to specify additional 
configuration
-                ExtensionConfig abstractConfig = expiringFetcherStore
+                ExtensionConfig abstractConfig = fetcherStore
                         .getFetcherConfigs()
                         .get(fetcher.getExtensionConfig().id());
                 ConfigContainer configContainer = new ConfigContainer();
@@ -302,12 +320,12 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
                 Initializable initializable = (Initializable) abstractFetcher;
                 initializable.initialize(tikaParamsMap);
             }
-            if (expiringFetcherStore.deleteFetcher(name)) {
+            if (fetcherStore.deleteFetcher(name)) {
                 LOG.info("Updating fetcher {}", name);
             } else {
                 LOG.info("Creating new fetcher {}", name);
             }
-            expiringFetcherStore.createFetcher(abstractFetcher, configObject);
+            fetcherStore.createFetcher(abstractFetcher, configObject);
         } catch (ClassNotFoundException | InstantiationException | 
IllegalAccessException |
                  InvocationTargetException | NoSuchMethodException | 
TikaConfigException e) {
             throw new RuntimeException(e);
@@ -336,8 +354,8 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
                            StreamObserver<GetFetcherReply> responseObserver) {
         GetFetcherReply.Builder getFetcherReply = GetFetcherReply.newBuilder();
         ExtensionConfig abstractConfig =
-                
expiringFetcherStore.getFetcherConfigs().get(request.getFetcherId());
-        Fetcher abstractFetcher = 
expiringFetcherStore.getFetchers().get(request.getFetcherId());
+                fetcherStore.getFetcherConfigs().get(request.getFetcherId());
+        Fetcher abstractFetcher = 
fetcherStore.getFetchers().get(request.getFetcherId());
         if (abstractFetcher == null || abstractConfig == null) {
             
responseObserver.onError(StatusProto.toStatusException(notFoundStatus(request.getFetcherId())));
             return;
@@ -355,7 +373,7 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
     public void listFetchers(ListFetchersRequest request,
                              StreamObserver<ListFetchersReply> 
responseObserver) {
         ListFetchersReply.Builder listFetchersReplyBuilder = 
ListFetchersReply.newBuilder();
-        for (Map.Entry<String, ExtensionConfig> fetcherConfig : 
expiringFetcherStore.getFetcherConfigs()
+        for (Map.Entry<String, ExtensionConfig> fetcherConfig : 
fetcherStore.getFetcherConfigs()
                                                                                
     .entrySet()) {
             GetFetcherReply.Builder replyBuilder = 
saveFetcherReply(fetcherConfig);
             
listFetchersReplyBuilder.addGetFetcherReplies(replyBuilder.build());
@@ -367,9 +385,9 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
     private GetFetcherReply.Builder saveFetcherReply(
             Map.Entry<String, ExtensionConfig> fetcherConfig) {
         Fetcher abstractFetcher =
-                expiringFetcherStore.getFetchers().get(fetcherConfig.getKey());
+                fetcherStore.getFetchers().get(fetcherConfig.getKey());
         ExtensionConfig abstractConfig =
-                
expiringFetcherStore.getFetcherConfigs().get(fetcherConfig.getKey());
+                fetcherStore.getFetcherConfigs().get(fetcherConfig.getKey());
         GetFetcherReply.Builder replyBuilder =
                 
GetFetcherReply.newBuilder().setFetcherClass(abstractFetcher.getClass().getName())
                         
.setFetcherId(abstractFetcher.getExtensionConfig().id());
@@ -417,6 +435,6 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
     }
 
     private boolean deleteFetcher(String id) {
-        return expiringFetcherStore.deleteFetcher(id);
+        return fetcherStore.deleteFetcher(id);
     }
 }
diff --git 
a/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/ExpiringFetcherStoreTest.java
 b/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/FetcherStoreTest.java
similarity index 60%
rename from 
tika-grpc/src/test/java/org/apache/tika/pipes/grpc/ExpiringFetcherStoreTest.java
rename to 
tika-grpc/src/test/java/org/apache/tika/pipes/grpc/FetcherStoreTest.java
index 21356a5ca..769a9f93b 100644
--- 
a/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/ExpiringFetcherStoreTest.java
+++ b/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/FetcherStoreTest.java
@@ -16,33 +16,31 @@
  */
 package org.apache.tika.pipes.grpc;
 
-import static org.junit.jupiter.api.Assertions.assertNull;
-
-import java.io.IOException;
 import java.io.InputStream;
 import java.time.Duration;
+import java.util.concurrent.TimeUnit;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.pipes.api.fetcher.Fetcher;
+import org.apache.tika.pipes.api.statestore.StateStore;
+import org.apache.tika.pipes.core.FetcherStore;
+import org.apache.tika.pipes.core.statestore.StateStoreManager;
 import org.apache.tika.plugins.ExtensionConfig;
 
-class ExpiringFetcherStoreTest {
-
-    private static final ObjectMapper MAPPER = new ObjectMapper();
-
+class FetcherStoreTest {
     @Test
-    void createFetcher() throws Exception {
-        try (ExpiringFetcherStore expiringFetcherStore = new 
ExpiringFetcherStore(1, 5)) {
+    void createAndDeleteFetcher() throws Exception {
+        StateStore stateStore = StateStoreManager.createDefault();
+        try (FetcherStore fetcherStore = new FetcherStore(stateStore, 2000L, 
1000L)) {
             Fetcher fetcher = new Fetcher() {
                 @Override
-                public InputStream fetch(String fetchKey, Metadata metadata, 
ParseContext parseContext) throws TikaException, IOException {
+                public InputStream fetch(String fetchKey, Metadata metadata,
+                                         ParseContext parseContext) {
                     return null;
                 }
 
@@ -51,22 +49,12 @@ class ExpiringFetcherStoreTest {
                     return new ExtensionConfig("nick", "factory-plugin-id", 
"{}");
                 }
             };
-            expiringFetcherStore.createFetcher(fetcher, 
fetcher.getExtensionConfig());
-
-            Assertions.assertNotNull(expiringFetcherStore
-                    .getFetchers()
-                    .get(fetcher.getExtensionConfig().id()));
-
-            Awaitility
-                    .await()
-                    .atMost(Duration.ofSeconds(60))
-                    .until(() -> expiringFetcherStore
-                            .getFetchers()
-                            .get(fetcher.getExtensionConfig().id()) == null);
-
-            assertNull(expiringFetcherStore
-                    .getFetcherConfigs()
+            fetcherStore.createFetcher(fetcher, fetcher.getExtensionConfig());
+            Assertions.assertNotNull(fetcherStore.getFetchers()
                     .get(fetcher.getExtensionConfig().id()));
+            Awaitility.await().atMost(Duration.ofSeconds(10))
+                    .pollInterval(1000L, TimeUnit.MILLISECONDS)
+                    .until(() -> 
fetcherStore.getFetchers().get(fetcher.getExtensionConfig().id()) == null);
         }
     }
 }
diff --git 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/statestore/StateStore.java
 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/statestore/StateStore.java
new file mode 100644
index 000000000..e33bbe859
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/statestore/StateStore.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.api.statestore;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.tika.plugins.TikaExtension;
+
+/**
+ * StateStore provides a key-value store abstraction for distributed state 
management
+ * in Tika Pipes. This enables sharing of Fetcher, Emitter, and PipesIterator 
configurations
+ * across multiple nodes in a cluster.
+ * <p>
+ * Implementations can be in-memory (for single-node deployments) or backed by
+ * distributed systems like Apache Ignite, Redis, or Hazelcast.
+ * <p>
+ * Keys use namespace prefixes to organize different types of data:
+ * <ul>
+ *   <li>fetcher:* - Fetcher configurations and instances</li>
+ *   <li>emitter:* - Emitter configurations and instances</li>
+ *   <li>pipesiterator:* - PipesIterator configurations</li>
+ * </ul>
+ */
+public interface StateStore extends TikaExtension, AutoCloseable {
+
+    /**
+     * Store a value associated with the given key.
+     *
+     * @param key the key to store the value under
+     * @param value the byte array value to store
+     * @throws StateStoreException if the operation fails
+     */
+    void put(String key, byte[] value) throws StateStoreException;
+
+    /**
+     * Retrieve the value associated with the given key.
+     *
+     * @param key the key to retrieve
+     * @return the byte array value, or null if the key doesn't exist
+     * @throws StateStoreException if the operation fails
+     */
+    byte[] get(String key) throws StateStoreException;
+
+    /**
+     * Delete the value associated with the given key.
+     *
+     * @param key the key to delete
+     * @return true if the key was deleted, false if it didn't exist
+     * @throws StateStoreException if the operation fails
+     */
+    boolean delete(String key) throws StateStoreException;
+
+    /**
+     * List all keys currently in the store.
+     *
+     * @return a set of all keys
+     * @throws StateStoreException if the operation fails
+     */
+    Set<String> listKeys() throws StateStoreException;
+
+    /**
+     * Update the last access time for the given key.
+     * This is used for expiration tracking.
+     *
+     * @param key the key to update
+     * @param accessTime the access time to record
+     * @throws StateStoreException if the operation fails
+     */
+    void updateAccessTime(String key, Instant accessTime) throws 
StateStoreException;
+
+    /**
+     * Get the last access time for the given key.
+     *
+     * @param key the key to query
+     * @return the last access time, or null if not tracked or key doesn't 
exist
+     * @throws StateStoreException if the operation fails
+     */
+    Instant getAccessTime(String key) throws StateStoreException;
+
+    /**
+     * Initialize the state store with the given configuration.
+     * This is called once before the store is used.
+     *
+     * @param config configuration parameters for the store
+     * @throws StateStoreException if initialization fails
+     */
+    void initialize(Map<String, String> config) throws StateStoreException;
+
+    /**
+     * Check if a key exists in the store.
+     *
+     * @param key the key to check
+     * @return true if the key exists, false otherwise
+     * @throws StateStoreException if the operation fails
+     */
+    default boolean containsKey(String key) throws StateStoreException {
+        return get(key) != null;
+    }
+
+    /**
+     * Close the state store and release any resources.
+     * After this is called, the store should not be used.
+     *
+     * @throws StateStoreException if closing fails
+     */
+    @Override
+    void close() throws StateStoreException;
+}
diff --git 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/statestore/StateStoreException.java
 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/statestore/StateStoreException.java
new file mode 100644
index 000000000..983fb1e7e
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/statestore/StateStoreException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.api.statestore;
+
+/**
+ * Exception thrown when operations on a StateStore fail.
+ */
+public class StateStoreException extends Exception {
+
+    public StateStoreException(String message) {
+        super(message);
+    }
+
+    public StateStoreException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public StateStoreException(Throwable cause) {
+        super(cause);
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/statestore/StateStoreFactory.java
 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/statestore/StateStoreFactory.java
new file mode 100644
index 000000000..ee386afbe
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/statestore/StateStoreFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.api.statestore;
+
+import org.apache.tika.plugins.TikaExtensionFactory;
+
+/**
+ * Factory interface for creating StateStore instances.
+ * Implementations should be annotated with @Extension to be discovered
+ * by the PF4J plugin system.
+ */
+public interface StateStoreFactory extends TikaExtensionFactory<StateStore> {
+
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/ComponentSerializer.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/ComponentSerializer.java
new file mode 100644
index 000000000..9a805927e
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/ComponentSerializer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.statestore;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.tika.pipes.api.statestore.StateStoreException;
+import org.apache.tika.plugins.ExtensionConfig;
+
+/**
+ * Helper class for serializing and deserializing components to/from 
StateStore.
+ * Uses JSON serialization via Jackson ObjectMapper.
+ */
+public class ComponentSerializer {
+
+    private final ObjectMapper objectMapper;
+
+    public ComponentSerializer(ObjectMapper objectMapper) {
+        this.objectMapper = objectMapper;
+    }
+
+    public ComponentSerializer() {
+        this(new ObjectMapper());
+    }
+
+    /**
+     * Serialize an ExtensionConfig to bytes.
+     *
+     * @param config the config to serialize
+     * @return byte array representation
+     * @throws StateStoreException if serialization fails
+     */
+    public byte[] serializeConfig(ExtensionConfig config) throws 
StateStoreException {
+        try {
+            return 
objectMapper.writeValueAsString(config).getBytes(StandardCharsets.UTF_8);
+        } catch (IOException e) {
+            throw new StateStoreException("Failed to serialize 
ExtensionConfig", e);
+        }
+    }
+
+    /**
+     * Deserialize an ExtensionConfig from bytes.
+     *
+     * @param data the byte array to deserialize
+     * @return the deserialized ExtensionConfig
+     * @throws StateStoreException if deserialization fails
+     */
+    public ExtensionConfig deserializeConfig(byte[] data) throws 
StateStoreException {
+        if (data == null) {
+            return null;
+        }
+        try {
+            String json = new String(data, StandardCharsets.UTF_8);
+            return objectMapper.readValue(json, ExtensionConfig.class);
+        } catch (IOException e) {
+            throw new StateStoreException("Failed to deserialize 
ExtensionConfig", e);
+        }
+    }
+
+    /**
+     * Serialize a generic object to bytes.
+     *
+     * @param obj the object to serialize
+     * @return byte array representation
+     * @throws StateStoreException if serialization fails
+     */
+    public byte[] serialize(Object obj) throws StateStoreException {
+        try {
+            return 
objectMapper.writeValueAsString(obj).getBytes(StandardCharsets.UTF_8);
+        } catch (IOException e) {
+            throw new StateStoreException("Failed to serialize object", e);
+        }
+    }
+
+    /**
+     * Deserialize a generic object from bytes.
+     *
+     * @param data the byte array to deserialize
+     * @param clazz the class to deserialize to
+     * @param <T> the type parameter
+     * @return the deserialized object
+     * @throws StateStoreException if deserialization fails
+     */
+    public <T> T deserialize(byte[] data, Class<T> clazz) throws 
StateStoreException {
+        if (data == null) {
+            return null;
+        }
+        try {
+            String json = new String(data, StandardCharsets.UTF_8);
+            return objectMapper.readValue(json, clazz);
+        } catch (IOException e) {
+            throw new StateStoreException("Failed to deserialize object", e);
+        }
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/InMemoryStateStore.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/InMemoryStateStore.java
new file mode 100644
index 000000000..305172ad2
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/InMemoryStateStore.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.statestore;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.tika.pipes.api.statestore.StateStore;
+import org.apache.tika.pipes.api.statestore.StateStoreException;
+import org.apache.tika.plugins.ExtensionConfig;
+
+/**
+ * In-memory implementation of StateStore using ConcurrentHashMap.
+ * This is the default implementation used for single-node deployments.
+ * <p>
+ * This implementation provides thread-safe operations but does not
+ * persist state across restarts or share state across JVM instances.
+ */
+public class InMemoryStateStore implements StateStore {
+
+    private final ConcurrentHashMap<String, byte[]> data = new 
ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, Instant> accessTimes = new 
ConcurrentHashMap<>();
+    private volatile boolean closed = false;
+    private ExtensionConfig extensionConfig;
+
+    @Override
+    public void put(String key, byte[] value) throws StateStoreException {
+        checkNotClosed();
+        if (key == null) {
+            throw new StateStoreException("Key cannot be null");
+        }
+        if (value == null) {
+            throw new StateStoreException("Value cannot be null");
+        }
+        data.put(key, value);
+    }
+
+    @Override
+    public byte[] get(String key) throws StateStoreException {
+        checkNotClosed();
+        if (key == null) {
+            throw new StateStoreException("Key cannot be null");
+        }
+        return data.get(key);
+    }
+
+    @Override
+    public boolean delete(String key) throws StateStoreException {
+        checkNotClosed();
+        if (key == null) {
+            throw new StateStoreException("Key cannot be null");
+        }
+        accessTimes.remove(key);
+        return data.remove(key) != null;
+    }
+
+    @Override
+    public Set<String> listKeys() throws StateStoreException {
+        checkNotClosed();
+        // Return union of both data keys and accessTimes keys
+        // This allows expiration logic to find both config and access time 
entries
+        Set<String> allKeys = new java.util.HashSet<>(data.keySet());
+        allKeys.addAll(accessTimes.keySet());
+        return allKeys;
+    }
+
+    @Override
+    public void updateAccessTime(String key, Instant accessTime) throws 
StateStoreException {
+        checkNotClosed();
+        if (key == null) {
+            throw new StateStoreException("Key cannot be null");
+        }
+        if (accessTime == null) {
+            throw new StateStoreException("Access time cannot be null");
+        }
+        accessTimes.put(key, accessTime);
+    }
+
+    @Override
+    public Instant getAccessTime(String key) throws StateStoreException {
+        checkNotClosed();
+        if (key == null) {
+            throw new StateStoreException("Key cannot be null");
+        }
+        return accessTimes.get(key);
+    }
+
+    @Override
+    public void initialize(Map<String, String> config) throws 
StateStoreException {
+        // No initialization needed for in-memory store
+    }
+
+    @Override
+    public void close() throws StateStoreException {
+        closed = true;
+        data.clear();
+        accessTimes.clear();
+    }
+
+    @Override
+    public ExtensionConfig getExtensionConfig() {
+        return extensionConfig;
+    }
+
+    public void setExtensionConfig(ExtensionConfig extensionConfig) {
+        this.extensionConfig = extensionConfig;
+    }
+
+    private void checkNotClosed() throws StateStoreException {
+        if (closed) {
+            throw new StateStoreException("StateStore has been closed");
+        }
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/InMemoryStateStoreFactory.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/InMemoryStateStoreFactory.java
new file mode 100644
index 000000000..bee273292
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/InMemoryStateStoreFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.statestore;
+
+import java.io.IOException;
+
+import org.pf4j.Extension;
+
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.pipes.api.statestore.StateStore;
+import org.apache.tika.pipes.api.statestore.StateStoreException;
+import org.apache.tika.pipes.api.statestore.StateStoreFactory;
+import org.apache.tika.plugins.ExtensionConfig;
+
+/**
+ * Factory for creating InMemoryStateStore instances.
+ */
+@Extension
+public class InMemoryStateStoreFactory implements StateStoreFactory {
+
+    @Override
+    public String getName() {
+        return "in-memory";
+    }
+
+    @Override
+    public StateStore buildExtension(ExtensionConfig extensionConfig)
+            throws IOException, TikaConfigException {
+        InMemoryStateStore store = new InMemoryStateStore();
+        store.setExtensionConfig(extensionConfig);
+        // In-memory store doesn't need configuration, but we initialize it 
anyway
+        try {
+            store.initialize(null);
+        } catch (StateStoreException e) {
+            throw new TikaConfigException("Failed to initialize 
InMemoryStateStore", e);
+        }
+        return store;
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/StateStoreManager.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/StateStoreManager.java
new file mode 100644
index 000000000..09e5a31c7
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/StateStoreManager.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.statestore;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.pf4j.PluginManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.config.loader.TikaJsonConfig;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.pipes.api.statestore.StateStore;
+import org.apache.tika.pipes.api.statestore.StateStoreException;
+import org.apache.tika.pipes.api.statestore.StateStoreFactory;
+import org.apache.tika.plugins.PluginComponentLoader;
+
+/**
+ * Manager for loading StateStore implementations from configuration.
+ * This follows the same pattern as FetcherManager and EmitterManager.
+ */
+public class StateStoreManager {
+
+    public static final String CONFIG_KEY = "stateStore";
+    private static final Logger LOG = 
LoggerFactory.getLogger(StateStoreManager.class);
+
+    /**
+     * Load a StateStore from the given configuration.
+     * If no state store is configured, returns an InMemoryStateStore by 
default.
+     *
+     * @param pluginManager the plugin manager for loading factories
+     * @param tikaJsonConfig the Tika configuration
+     * @return a configured StateStore instance
+     * @throws IOException if loading fails
+     * @throws TikaConfigException if configuration is invalid
+     */
+    public static StateStore load(PluginManager pluginManager, TikaJsonConfig 
tikaJsonConfig)
+            throws IOException, TikaConfigException {
+        JsonNode stateStoreNode = tikaJsonConfig.getRootNode().get(CONFIG_KEY);
+
+        if (stateStoreNode == null) {
+            LOG.info("No state store configured, using default 
InMemoryStateStore");
+            InMemoryStateStore store = new InMemoryStateStore();
+            try {
+                store.initialize(null);
+            } catch (StateStoreException e) {
+                throw new TikaConfigException("Failed to initialize default 
InMemoryStateStore", e);
+            }
+            return store;
+        }
+
+        try {
+            Map<String, StateStore> stores =
+                PluginComponentLoader.loadInstances(pluginManager,
+                                                   StateStoreFactory.class,
+                                                   stateStoreNode);
+
+            if (stores.isEmpty()) {
+                throw new TikaConfigException("No state store instances loaded 
from configuration");
+            }
+
+            if (stores.size() > 1) {
+                LOG.warn("Multiple state stores configured, using the first 
one: {}",
+                        stores.keySet().iterator().next());
+            }
+
+            // Return the first (and typically only) state store
+            StateStore store = stores.values().iterator().next();
+            LOG.info("Loaded state store: {}", store.getClass().getName());
+            return store;
+
+        } catch (Exception e) {
+            throw new TikaConfigException("Failed to load state store from 
configuration", e);
+        }
+    }
+
+    /**
+     * Create a default in-memory state store.
+     * This is useful for testing and single-node deployments.
+     *
+     * @return a new InMemoryStateStore instance
+     * @throws TikaConfigException if initialization fails
+     */
+    public static StateStore createDefault() throws TikaConfigException {
+        InMemoryStateStore store = new InMemoryStateStore();
+        try {
+            store.initialize(null);
+        } catch (StateStoreException e) {
+            throw new TikaConfigException("Failed to initialize default 
InMemoryStateStore", e);
+        }
+        return store;
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/statestore/InMemoryStateStoreTest.java
 
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/statestore/InMemoryStateStoreTest.java
new file mode 100644
index 000000000..a9b943628
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/statestore/InMemoryStateStoreTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.statestore;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.Set;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.tika.pipes.api.statestore.StateStore;
+import org.apache.tika.pipes.api.statestore.StateStoreException;
+
+public class InMemoryStateStoreTest {
+
+    private StateStore stateStore;
+
+    @BeforeEach
+    public void setUp() throws StateStoreException {
+        stateStore = new InMemoryStateStore();
+        stateStore.initialize(null);
+    }
+
+    @AfterEach
+    public void tearDown() throws StateStoreException {
+        if (stateStore != null) {
+            stateStore.close();
+        }
+    }
+
+    @Test
+    public void testPutAndGet() throws StateStoreException {
+        byte[] value = "test-value".getBytes(StandardCharsets.UTF_8);
+        stateStore.put("test-key", value);
+
+        byte[] retrieved = stateStore.get("test-key");
+        assertNotNull(retrieved);
+        assertArrayEquals(value, retrieved);
+    }
+
+    @Test
+    public void testGetNonExistent() throws StateStoreException {
+        byte[] retrieved = stateStore.get("non-existent");
+        assertNull(retrieved);
+    }
+
+    @Test
+    public void testDelete() throws StateStoreException {
+        byte[] value = "test-value".getBytes(StandardCharsets.UTF_8);
+        stateStore.put("test-key", value);
+
+        assertTrue(stateStore.delete("test-key"));
+        assertNull(stateStore.get("test-key"));
+    }
+
+    @Test
+    public void testDeleteNonExistent() throws StateStoreException {
+        assertFalse(stateStore.delete("non-existent"));
+    }
+
+    @Test
+    public void testListKeys() throws StateStoreException {
+        stateStore.put("key1", "value1".getBytes(StandardCharsets.UTF_8));
+        stateStore.put("key2", "value2".getBytes(StandardCharsets.UTF_8));
+        stateStore.put("key3", "value3".getBytes(StandardCharsets.UTF_8));
+
+        Set<String> keys = stateStore.listKeys();
+        assertEquals(3, keys.size());
+        assertTrue(keys.contains("key1"));
+        assertTrue(keys.contains("key2"));
+        assertTrue(keys.contains("key3"));
+    }
+
+    @Test
+    public void testAccessTime() throws StateStoreException {
+        Instant now = Instant.now();
+        stateStore.updateAccessTime("test-key", now);
+
+        Instant retrieved = stateStore.getAccessTime("test-key");
+        assertNotNull(retrieved);
+        assertEquals(now, retrieved);
+    }
+
+    @Test
+    public void testAccessTimeNonExistent() throws StateStoreException {
+        Instant retrieved = stateStore.getAccessTime("non-existent");
+        assertNull(retrieved);
+    }
+
+    @Test
+    public void testAccessTimeDeletedWithKey() throws StateStoreException {
+        byte[] value = "test-value".getBytes(StandardCharsets.UTF_8);
+        Instant now = Instant.now();
+
+        stateStore.put("test-key", value);
+        stateStore.updateAccessTime("test-key", now);
+
+        stateStore.delete("test-key");
+
+        assertNull(stateStore.getAccessTime("test-key"));
+    }
+
+    @Test
+    public void testContainsKey() throws StateStoreException {
+        byte[] value = "test-value".getBytes(StandardCharsets.UTF_8);
+        stateStore.put("test-key", value);
+
+        assertTrue(stateStore.containsKey("test-key"));
+        assertFalse(stateStore.containsKey("non-existent"));
+    }
+
+    @Test
+    public void testNullKeyThrows() {
+        assertThrows(StateStoreException.class, () -> stateStore.put(null, new 
byte[0]));
+        assertThrows(StateStoreException.class, () -> stateStore.get(null));
+        assertThrows(StateStoreException.class, () -> stateStore.delete(null));
+        assertThrows(StateStoreException.class,
+                () -> stateStore.updateAccessTime(null, Instant.now()));
+        assertThrows(StateStoreException.class, () -> 
stateStore.getAccessTime(null));
+    }
+
+    @Test
+    public void testNullValueThrows() {
+        assertThrows(StateStoreException.class, () -> stateStore.put("key", 
null));
+    }
+
+    @Test
+    public void testNullAccessTimeThrows() {
+        assertThrows(StateStoreException.class, () -> 
stateStore.updateAccessTime("key", null));
+    }
+
+    @Test
+    public void testOperationsAfterCloseThrow() throws StateStoreException {
+        stateStore.close();
+
+        assertThrows(StateStoreException.class,
+                () -> stateStore.put("key", 
"value".getBytes(StandardCharsets.UTF_8)));
+        assertThrows(StateStoreException.class, () -> stateStore.get("key"));
+        assertThrows(StateStoreException.class, () -> 
stateStore.delete("key"));
+        assertThrows(StateStoreException.class, () -> stateStore.listKeys());
+        assertThrows(StateStoreException.class,
+                () -> stateStore.updateAccessTime("key", Instant.now()));
+        assertThrows(StateStoreException.class, () -> 
stateStore.getAccessTime("key"));
+    }
+
+    @Test
+    public void testNamespacePrefixes() throws StateStoreException {
+        // Test that keys with different namespace prefixes are handled 
correctly
+        stateStore.put("fetcher:config:my-fetcher",
+                "fetcher-data".getBytes(StandardCharsets.UTF_8));
+        stateStore.put("emitter:config:my-emitter",
+                "emitter-data".getBytes(StandardCharsets.UTF_8));
+        stateStore.put("pipesiterator:my-iterator",
+                "iterator-data".getBytes(StandardCharsets.UTF_8));
+
+        Set<String> keys = stateStore.listKeys();
+        assertEquals(3, keys.size());
+
+        assertNotNull(stateStore.get("fetcher:config:my-fetcher"));
+        assertNotNull(stateStore.get("emitter:config:my-emitter"));
+        assertNotNull(stateStore.get("pipesiterator:my-iterator"));
+    }
+}

Reply via email to