This is an automated email from the ASF dual-hosted git repository.
ndipiazza pushed a commit to branch TIKA-4547-phase1
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/TIKA-4547-phase1 by this push:
new 8f20d8f73 TIKA-4557 create tika-proto in its own module - **Phase 1:
Foundation (TIKA-4547)** -
Implement StateStore abstraction
- Refactor ExpiringFetcherStore
- Add InMemoryStateStore -
Update documentation
8f20d8f73 is described below
commit 8f20d8f734945a84deab1dddb95ee68e85688d99
Author: Nicholas DiPiazza <[email protected]>
AuthorDate: Mon Dec 8 10:14:28 2025 -0600
TIKA-4557 create tika-proto in its own module - **Phase 1: Foundation
(TIKA-4547)**
- Implement StateStore
abstraction
- Refactor
ExpiringFetcherStore
- Add InMemoryStateStore
- Update documentation
---
tika-grpc/pom.xml | 5 +
.../tika/pipes/grpc/ExpiringFetcherStore.java | 103 ------------
.../apache/tika/pipes/grpc/TikaGrpcServerImpl.java | 48 ++++--
...FetcherStoreTest.java => FetcherStoreTest.java} | 42 ++---
.../tika/pipes/api/statestore/StateStore.java | 124 ++++++++++++++
.../pipes/api/statestore/StateStoreException.java | 35 ++++
.../pipes/api/statestore/StateStoreFactory.java | 28 +++
.../pipes/core/statestore/ComponentSerializer.java | 112 ++++++++++++
.../pipes/core/statestore/InMemoryStateStore.java | 130 ++++++++++++++
.../core/statestore/InMemoryStateStoreFactory.java | 53 ++++++
.../pipes/core/statestore/StateStoreManager.java | 109 ++++++++++++
.../core/statestore/InMemoryStateStoreTest.java | 187 +++++++++++++++++++++
12 files changed, 831 insertions(+), 145 deletions(-)
diff --git a/tika-grpc/pom.xml b/tika-grpc/pom.xml
index fd4141737..387ac3000 100644
--- a/tika-grpc/pom.xml
+++ b/tika-grpc/pom.xml
@@ -218,6 +218,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.tika</groupId>
+ <artifactId>tika-pipes-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.tika</groupId>
<artifactId>tika-pipes-http</artifactId>
diff --git
a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/ExpiringFetcherStore.java
b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/ExpiringFetcherStore.java
deleted file mode 100644
index 70553d771..000000000
---
a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/ExpiringFetcherStore.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.grpc;
-
-import java.time.Instant;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.tika.pipes.api.fetcher.Fetcher;
-import org.apache.tika.plugins.ExtensionConfig;
-
-public class ExpiringFetcherStore implements AutoCloseable {
- private static final Logger LOG =
LoggerFactory.getLogger(ExpiringFetcherStore.class);
- public static final long EXPIRE_JOB_INITIAL_DELAY = 1L;
- private final Map<String, Fetcher> fetchers =
Collections.synchronizedMap(new HashMap<>());
- private final Map<String, ExtensionConfig> fetcherConfigs =
Collections.synchronizedMap(new HashMap<>());
- private final Map<String, Instant> fetcherLastAccessed =
Collections.synchronizedMap(new HashMap<>());
-
- private final ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
-
- public ExpiringFetcherStore(int expireAfterSeconds, int
checkForExpiredFetchersDelaySeconds) {
- executorService.scheduleAtFixedRate(() -> {
- Set<String> expired = new HashSet<>();
- for (String fetcherPluginId : fetchers.keySet()) {
- Instant lastAccessed =
fetcherLastAccessed.get(fetcherPluginId);
- if (lastAccessed == null) {
- LOG.error("Detected a fetcher with no last access time.
FetcherName={}", fetcherPluginId);
- expired.add(fetcherPluginId);
- } else if (Instant
- .now()
-
.isAfter(lastAccessed.plusSeconds(expireAfterSeconds))) {
- LOG.info("Detected stale fetcher {} hasn't been accessed
in {} seconds. " + "Deleting.", fetcherPluginId, Instant
- .now()
- .getEpochSecond() - lastAccessed.getEpochSecond());
- expired.add(fetcherPluginId);
- }
- }
- for (String expiredFetcherId : expired) {
- deleteFetcher(expiredFetcherId);
- }
- }, EXPIRE_JOB_INITIAL_DELAY, checkForExpiredFetchersDelaySeconds,
TimeUnit.SECONDS);
- }
-
- public boolean deleteFetcher(String fetcherPluginId) {
- boolean success = fetchers.remove(fetcherPluginId) != null;
- fetcherConfigs.remove(fetcherPluginId);
- fetcherLastAccessed.remove(fetcherPluginId);
- return success;
- }
-
- public Map<String, Fetcher> getFetchers() {
- return fetchers;
- }
-
- public Map<String, ExtensionConfig> getFetcherConfigs() {
- return fetcherConfigs;
- }
-
- /**
- * This method will get the fetcher, but will also log the access the
fetcher as having
- * been accessed. This prevents the scheduled job from removing the stale
fetcher.
- */
- public <T extends Fetcher> T getFetcherAndLogAccess(String
fetcherPluginId) {
- fetcherLastAccessed.put(fetcherPluginId, Instant.now());
- return (T) fetchers.get(fetcherPluginId);
- }
-
- public <T extends Fetcher> void createFetcher(T fetcher, ExtensionConfig
config) {
- String id = fetcher.getExtensionConfig().id();
-
- fetchers.put(id, fetcher);
- fetcherConfigs.put(id, config);
- getFetcherAndLogAccess(id);
- }
-
- @Override
- public void close() {
- executorService.shutdownNow();
- }
-}
diff --git
a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
index cfea3203c..bf45eca0d 100644
--- a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
+++ b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
@@ -78,9 +78,13 @@ import org.apache.tika.pipes.api.PipesResult;
import org.apache.tika.pipes.api.emitter.EmitKey;
import org.apache.tika.pipes.api.fetcher.FetchKey;
import org.apache.tika.pipes.api.fetcher.Fetcher;
+import org.apache.tika.pipes.api.statestore.StateStore;
+import org.apache.tika.pipes.core.FetcherStore;
import org.apache.tika.pipes.core.PipesClient;
import org.apache.tika.pipes.core.PipesConfig;
+import org.apache.tika.pipes.core.statestore.StateStoreManager;
import org.apache.tika.plugins.ExtensionConfig;
+import org.apache.tika.plugins.TikaPluginManager;
import org.apache.tika.utils.XMLReaderUtils;
class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
@@ -97,7 +101,8 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
*/
PipesConfig pipesConfig;
PipesClient pipesClient;
- ExpiringFetcherStore expiringFetcherStore;
+ FetcherStore fetcherStore;
+ StateStore stateStore;
String tikaConfigPath;
@@ -114,10 +119,23 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
tikaConfigFile = tmpTikaConfigFile;
tikaConfigPath = tikaConfigFile.getAbsolutePath();
}
- pipesConfig =
TikaLoader.load(tikaConfigFile.toPath()).configs().load("pipes",
PipesConfig.class);
+
+ TikaLoader tikaLoader = TikaLoader.load(tikaConfigFile.toPath());
+ pipesConfig = tikaLoader.configs().load("pipes", PipesConfig.class);
pipesClient = new PipesClient(pipesConfig);
- expiringFetcherStore = new
ExpiringFetcherStore(pipesConfig.getStaleFetcherTimeoutSeconds(),
+ // Load StateStore for distributed state management
+ try {
+ TikaPluginManager pluginManager =
TikaPluginManager.load(tikaLoader.getConfig());
+ stateStore = StateStoreManager.load(pluginManager,
tikaLoader.getConfig());
+ LOG.info("Loaded StateStore: {}", stateStore.getClass().getName());
+ } catch (Exception e) {
+ LOG.warn("Failed to load StateStore from config, using default
in-memory store", e);
+ stateStore = StateStoreManager.createDefault();
+ }
+
+ fetcherStore = new FetcherStore(stateStore,
+ pipesConfig.getStaleFetcherTimeoutSeconds(),
pipesConfig.getStaleFetcherDelaySeconds());
this.tikaConfigPath = tikaConfigPath;
try {
@@ -139,10 +157,10 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
for (int i = 0; i < fetchersElement.getChildNodes().getLength(); ++i) {
fetchersElement.removeChild(fetchersElement.getChildNodes().item(i));
}
- for (var fetcherEntry : expiringFetcherStore.getFetchers().entrySet())
{
+ for (var fetcherEntry : fetcherStore.getFetchers().entrySet()) {
Fetcher fetcherObject = fetcherEntry.getValue();
Map<String, Object> fetcherConfigParams =
OBJECT_MAPPER.convertValue(
-
expiringFetcherStore.getFetcherConfigs().get(fetcherEntry.getKey()),
+
fetcherStore.getFetcherConfigs().get(fetcherEntry.getKey()),
new TypeReference<>() {
});
Element fetcher = tikaConfigDoc.createElement("fetcher");
@@ -219,7 +237,7 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
private void fetchAndParseImpl(FetchAndParseRequest request,
StreamObserver<FetchAndParseReply>
responseObserver) {
Fetcher fetcher =
-
expiringFetcherStore.getFetcherAndLogAccess(request.getFetcherId());
+ fetcherStore.getFetcherAndLogAccess(request.getFetcherId());
if (fetcher == null) {
throw new RuntimeException(
"Could not find fetcher with name " +
request.getFetcherId());
@@ -230,7 +248,7 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
String additionalFetchConfigJson =
request.getAdditionalFetchConfigJson();
if (StringUtils.isNotBlank(additionalFetchConfigJson)) {
// The fetch and parse has the option to specify additional
configuration
- ExtensionConfig abstractConfig = expiringFetcherStore
+ ExtensionConfig abstractConfig = fetcherStore
.getFetcherConfigs()
.get(fetcher.getExtensionConfig().id());
ConfigContainer configContainer = new ConfigContainer();
@@ -302,12 +320,12 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
Initializable initializable = (Initializable) abstractFetcher;
initializable.initialize(tikaParamsMap);
}
- if (expiringFetcherStore.deleteFetcher(name)) {
+ if (fetcherStore.deleteFetcher(name)) {
LOG.info("Updating fetcher {}", name);
} else {
LOG.info("Creating new fetcher {}", name);
}
- expiringFetcherStore.createFetcher(abstractFetcher, configObject);
+ fetcherStore.createFetcher(abstractFetcher, configObject);
} catch (ClassNotFoundException | InstantiationException |
IllegalAccessException |
InvocationTargetException | NoSuchMethodException |
TikaConfigException e) {
throw new RuntimeException(e);
@@ -336,8 +354,8 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
StreamObserver<GetFetcherReply> responseObserver) {
GetFetcherReply.Builder getFetcherReply = GetFetcherReply.newBuilder();
ExtensionConfig abstractConfig =
-
expiringFetcherStore.getFetcherConfigs().get(request.getFetcherId());
- Fetcher abstractFetcher =
expiringFetcherStore.getFetchers().get(request.getFetcherId());
+ fetcherStore.getFetcherConfigs().get(request.getFetcherId());
+ Fetcher abstractFetcher =
fetcherStore.getFetchers().get(request.getFetcherId());
if (abstractFetcher == null || abstractConfig == null) {
responseObserver.onError(StatusProto.toStatusException(notFoundStatus(request.getFetcherId())));
return;
@@ -355,7 +373,7 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
public void listFetchers(ListFetchersRequest request,
StreamObserver<ListFetchersReply>
responseObserver) {
ListFetchersReply.Builder listFetchersReplyBuilder =
ListFetchersReply.newBuilder();
- for (Map.Entry<String, ExtensionConfig> fetcherConfig :
expiringFetcherStore.getFetcherConfigs()
+ for (Map.Entry<String, ExtensionConfig> fetcherConfig :
fetcherStore.getFetcherConfigs()
.entrySet()) {
GetFetcherReply.Builder replyBuilder =
saveFetcherReply(fetcherConfig);
listFetchersReplyBuilder.addGetFetcherReplies(replyBuilder.build());
@@ -367,9 +385,9 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
private GetFetcherReply.Builder saveFetcherReply(
Map.Entry<String, ExtensionConfig> fetcherConfig) {
Fetcher abstractFetcher =
- expiringFetcherStore.getFetchers().get(fetcherConfig.getKey());
+ fetcherStore.getFetchers().get(fetcherConfig.getKey());
ExtensionConfig abstractConfig =
-
expiringFetcherStore.getFetcherConfigs().get(fetcherConfig.getKey());
+ fetcherStore.getFetcherConfigs().get(fetcherConfig.getKey());
GetFetcherReply.Builder replyBuilder =
GetFetcherReply.newBuilder().setFetcherClass(abstractFetcher.getClass().getName())
.setFetcherId(abstractFetcher.getExtensionConfig().id());
@@ -417,6 +435,6 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
}
private boolean deleteFetcher(String id) {
- return expiringFetcherStore.deleteFetcher(id);
+ return fetcherStore.deleteFetcher(id);
}
}
diff --git
a/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/ExpiringFetcherStoreTest.java
b/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/FetcherStoreTest.java
similarity index 60%
rename from
tika-grpc/src/test/java/org/apache/tika/pipes/grpc/ExpiringFetcherStoreTest.java
rename to
tika-grpc/src/test/java/org/apache/tika/pipes/grpc/FetcherStoreTest.java
index 21356a5ca..769a9f93b 100644
---
a/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/ExpiringFetcherStoreTest.java
+++ b/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/FetcherStoreTest.java
@@ -16,33 +16,31 @@
*/
package org.apache.tika.pipes.grpc;
-import static org.junit.jupiter.api.Assertions.assertNull;
-
-import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
+import java.util.concurrent.TimeUnit;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.pipes.api.fetcher.Fetcher;
+import org.apache.tika.pipes.api.statestore.StateStore;
+import org.apache.tika.pipes.core.FetcherStore;
+import org.apache.tika.pipes.core.statestore.StateStoreManager;
import org.apache.tika.plugins.ExtensionConfig;
-class ExpiringFetcherStoreTest {
-
- private static final ObjectMapper MAPPER = new ObjectMapper();
-
+class FetcherStoreTest {
@Test
- void createFetcher() throws Exception {
- try (ExpiringFetcherStore expiringFetcherStore = new
ExpiringFetcherStore(1, 5)) {
+ void createAndDeleteFetcher() throws Exception {
+ StateStore stateStore = StateStoreManager.createDefault();
+ try (FetcherStore fetcherStore = new FetcherStore(stateStore, 2000L,
1000L)) {
Fetcher fetcher = new Fetcher() {
@Override
- public InputStream fetch(String fetchKey, Metadata metadata,
ParseContext parseContext) throws TikaException, IOException {
+ public InputStream fetch(String fetchKey, Metadata metadata,
+ ParseContext parseContext) {
return null;
}
@@ -51,22 +49,12 @@ class ExpiringFetcherStoreTest {
return new ExtensionConfig("nick", "factory-plugin-id",
"{}");
}
};
- expiringFetcherStore.createFetcher(fetcher,
fetcher.getExtensionConfig());
-
- Assertions.assertNotNull(expiringFetcherStore
- .getFetchers()
- .get(fetcher.getExtensionConfig().id()));
-
- Awaitility
- .await()
- .atMost(Duration.ofSeconds(60))
- .until(() -> expiringFetcherStore
- .getFetchers()
- .get(fetcher.getExtensionConfig().id()) == null);
-
- assertNull(expiringFetcherStore
- .getFetcherConfigs()
+ fetcherStore.createFetcher(fetcher, fetcher.getExtensionConfig());
+ Assertions.assertNotNull(fetcherStore.getFetchers()
.get(fetcher.getExtensionConfig().id()));
+ Awaitility.await().atMost(Duration.ofSeconds(10))
+ .pollInterval(1000L, TimeUnit.MILLISECONDS)
+ .until(() ->
fetcherStore.getFetchers().get(fetcher.getExtensionConfig().id()) == null);
}
}
}
diff --git
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/statestore/StateStore.java
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/statestore/StateStore.java
new file mode 100644
index 000000000..e33bbe859
--- /dev/null
+++
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/statestore/StateStore.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.api.statestore;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.tika.plugins.TikaExtension;
+
+/**
+ * StateStore provides a key-value store abstraction for distributed state
management
+ * in Tika Pipes. This enables sharing of Fetcher, Emitter, and PipesIterator
configurations
+ * across multiple nodes in a cluster.
+ * <p>
+ * Implementations can be in-memory (for single-node deployments) or backed by
+ * distributed systems like Apache Ignite, Redis, or Hazelcast.
+ * <p>
+ * Keys use namespace prefixes to organize different types of data:
+ * <ul>
+ * <li>fetcher:* - Fetcher configurations and instances</li>
+ * <li>emitter:* - Emitter configurations and instances</li>
+ * <li>pipesiterator:* - PipesIterator configurations</li>
+ * </ul>
+ */
+public interface StateStore extends TikaExtension, AutoCloseable {
+
+ /**
+ * Store a value associated with the given key.
+ *
+ * @param key the key to store the value under
+ * @param value the byte array value to store
+ * @throws StateStoreException if the operation fails
+ */
+ void put(String key, byte[] value) throws StateStoreException;
+
+ /**
+ * Retrieve the value associated with the given key.
+ *
+ * @param key the key to retrieve
+ * @return the byte array value, or null if the key doesn't exist
+ * @throws StateStoreException if the operation fails
+ */
+ byte[] get(String key) throws StateStoreException;
+
+ /**
+ * Delete the value associated with the given key.
+ *
+ * @param key the key to delete
+ * @return true if the key was deleted, false if it didn't exist
+ * @throws StateStoreException if the operation fails
+ */
+ boolean delete(String key) throws StateStoreException;
+
+ /**
+ * List all keys currently in the store.
+ *
+ * @return a set of all keys
+ * @throws StateStoreException if the operation fails
+ */
+ Set<String> listKeys() throws StateStoreException;
+
+ /**
+ * Update the last access time for the given key.
+ * This is used for expiration tracking.
+ *
+ * @param key the key to update
+ * @param accessTime the access time to record
+ * @throws StateStoreException if the operation fails
+ */
+ void updateAccessTime(String key, Instant accessTime) throws
StateStoreException;
+
+ /**
+ * Get the last access time for the given key.
+ *
+ * @param key the key to query
+ * @return the last access time, or null if not tracked or key doesn't
exist
+ * @throws StateStoreException if the operation fails
+ */
+ Instant getAccessTime(String key) throws StateStoreException;
+
+ /**
+ * Initialize the state store with the given configuration.
+ * This is called once before the store is used.
+ *
+ * @param config configuration parameters for the store
+ * @throws StateStoreException if initialization fails
+ */
+ void initialize(Map<String, String> config) throws StateStoreException;
+
+ /**
+ * Check if a key exists in the store.
+ *
+ * @param key the key to check
+ * @return true if the key exists, false otherwise
+ * @throws StateStoreException if the operation fails
+ */
+ default boolean containsKey(String key) throws StateStoreException {
+ return get(key) != null;
+ }
+
+ /**
+ * Close the state store and release any resources.
+ * After this is called, the store should not be used.
+ *
+ * @throws StateStoreException if closing fails
+ */
+ @Override
+ void close() throws StateStoreException;
+}
diff --git
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/statestore/StateStoreException.java
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/statestore/StateStoreException.java
new file mode 100644
index 000000000..983fb1e7e
--- /dev/null
+++
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/statestore/StateStoreException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.api.statestore;
+
+/**
+ * Exception thrown when operations on a StateStore fail.
+ */
+public class StateStoreException extends Exception {
+
+ public StateStoreException(String message) {
+ super(message);
+ }
+
+ public StateStoreException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public StateStoreException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/statestore/StateStoreFactory.java
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/statestore/StateStoreFactory.java
new file mode 100644
index 000000000..ee386afbe
--- /dev/null
+++
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/statestore/StateStoreFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.api.statestore;
+
+import org.apache.tika.plugins.TikaExtensionFactory;
+
+/**
+ * Factory interface for creating StateStore instances.
+ * Implementations should be annotated with @Extension to be discovered
+ * by the PF4J plugin system.
+ */
+public interface StateStoreFactory extends TikaExtensionFactory<StateStore> {
+
+}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/ComponentSerializer.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/ComponentSerializer.java
new file mode 100644
index 000000000..9a805927e
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/ComponentSerializer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.statestore;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.tika.pipes.api.statestore.StateStoreException;
+import org.apache.tika.plugins.ExtensionConfig;
+
+/**
+ * Helper class for serializing and deserializing components to/from
StateStore.
+ * Uses JSON serialization via Jackson ObjectMapper.
+ */
+public class ComponentSerializer {
+
+ private final ObjectMapper objectMapper;
+
+ public ComponentSerializer(ObjectMapper objectMapper) {
+ this.objectMapper = objectMapper;
+ }
+
+ public ComponentSerializer() {
+ this(new ObjectMapper());
+ }
+
+ /**
+ * Serialize an ExtensionConfig to bytes.
+ *
+ * @param config the config to serialize
+ * @return byte array representation
+ * @throws StateStoreException if serialization fails
+ */
+ public byte[] serializeConfig(ExtensionConfig config) throws
StateStoreException {
+ try {
+ return
objectMapper.writeValueAsString(config).getBytes(StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ throw new StateStoreException("Failed to serialize
ExtensionConfig", e);
+ }
+ }
+
+ /**
+ * Deserialize an ExtensionConfig from bytes.
+ *
+ * @param data the byte array to deserialize
+ * @return the deserialized ExtensionConfig
+ * @throws StateStoreException if deserialization fails
+ */
+ public ExtensionConfig deserializeConfig(byte[] data) throws
StateStoreException {
+ if (data == null) {
+ return null;
+ }
+ try {
+ String json = new String(data, StandardCharsets.UTF_8);
+ return objectMapper.readValue(json, ExtensionConfig.class);
+ } catch (IOException e) {
+ throw new StateStoreException("Failed to deserialize
ExtensionConfig", e);
+ }
+ }
+
+ /**
+ * Serialize a generic object to bytes.
+ *
+ * @param obj the object to serialize
+ * @return byte array representation
+ * @throws StateStoreException if serialization fails
+ */
+ public byte[] serialize(Object obj) throws StateStoreException {
+ try {
+ return
objectMapper.writeValueAsString(obj).getBytes(StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ throw new StateStoreException("Failed to serialize object", e);
+ }
+ }
+
+ /**
+ * Deserialize a generic object from bytes.
+ *
+ * @param data the byte array to deserialize
+ * @param clazz the class to deserialize to
+ * @param <T> the type parameter
+ * @return the deserialized object
+ * @throws StateStoreException if deserialization fails
+ */
+ public <T> T deserialize(byte[] data, Class<T> clazz) throws
StateStoreException {
+ if (data == null) {
+ return null;
+ }
+ try {
+ String json = new String(data, StandardCharsets.UTF_8);
+ return objectMapper.readValue(json, clazz);
+ } catch (IOException e) {
+ throw new StateStoreException("Failed to deserialize object", e);
+ }
+ }
+}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/InMemoryStateStore.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/InMemoryStateStore.java
new file mode 100644
index 000000000..305172ad2
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/InMemoryStateStore.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.statestore;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.tika.pipes.api.statestore.StateStore;
+import org.apache.tika.pipes.api.statestore.StateStoreException;
+import org.apache.tika.plugins.ExtensionConfig;
+
+/**
+ * In-memory implementation of StateStore using ConcurrentHashMap.
+ * This is the default implementation used for single-node deployments.
+ * <p>
+ * This implementation provides thread-safe operations but does not
+ * persist state across restarts or share state across JVM instances.
+ */
+public class InMemoryStateStore implements StateStore {
+
+ private final ConcurrentHashMap<String, byte[]> data = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, Instant> accessTimes = new
ConcurrentHashMap<>();
+ private volatile boolean closed = false;
+ private ExtensionConfig extensionConfig;
+
+ @Override
+ public void put(String key, byte[] value) throws StateStoreException {
+ checkNotClosed();
+ if (key == null) {
+ throw new StateStoreException("Key cannot be null");
+ }
+ if (value == null) {
+ throw new StateStoreException("Value cannot be null");
+ }
+ data.put(key, value);
+ }
+
+ @Override
+ public byte[] get(String key) throws StateStoreException {
+ checkNotClosed();
+ if (key == null) {
+ throw new StateStoreException("Key cannot be null");
+ }
+ return data.get(key);
+ }
+
+ @Override
+ public boolean delete(String key) throws StateStoreException {
+ checkNotClosed();
+ if (key == null) {
+ throw new StateStoreException("Key cannot be null");
+ }
+ accessTimes.remove(key);
+ return data.remove(key) != null;
+ }
+
+ @Override
+ public Set<String> listKeys() throws StateStoreException {
+ checkNotClosed();
+ // Return union of both data keys and accessTimes keys
+ // This allows expiration logic to find both config and access time
entries
+ Set<String> allKeys = new java.util.HashSet<>(data.keySet());
+ allKeys.addAll(accessTimes.keySet());
+ return allKeys;
+ }
+
+ @Override
+ public void updateAccessTime(String key, Instant accessTime) throws
StateStoreException {
+ checkNotClosed();
+ if (key == null) {
+ throw new StateStoreException("Key cannot be null");
+ }
+ if (accessTime == null) {
+ throw new StateStoreException("Access time cannot be null");
+ }
+ accessTimes.put(key, accessTime);
+ }
+
+ @Override
+ public Instant getAccessTime(String key) throws StateStoreException {
+ checkNotClosed();
+ if (key == null) {
+ throw new StateStoreException("Key cannot be null");
+ }
+ return accessTimes.get(key);
+ }
+
+ @Override
+ public void initialize(Map<String, String> config) throws
StateStoreException {
+ // No initialization needed for in-memory store
+ }
+
+ @Override
+ public void close() throws StateStoreException {
+ closed = true;
+ data.clear();
+ accessTimes.clear();
+ }
+
+ @Override
+ public ExtensionConfig getExtensionConfig() {
+ return extensionConfig;
+ }
+
+ public void setExtensionConfig(ExtensionConfig extensionConfig) {
+ this.extensionConfig = extensionConfig;
+ }
+
+ private void checkNotClosed() throws StateStoreException {
+ if (closed) {
+ throw new StateStoreException("StateStore has been closed");
+ }
+ }
+}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/InMemoryStateStoreFactory.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/InMemoryStateStoreFactory.java
new file mode 100644
index 000000000..bee273292
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/InMemoryStateStoreFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.statestore;
+
+import java.io.IOException;
+
+import org.pf4j.Extension;
+
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.pipes.api.statestore.StateStore;
+import org.apache.tika.pipes.api.statestore.StateStoreException;
+import org.apache.tika.pipes.api.statestore.StateStoreFactory;
+import org.apache.tika.plugins.ExtensionConfig;
+
+/**
+ * Factory for creating InMemoryStateStore instances.
+ */
+@Extension
+public class InMemoryStateStoreFactory implements StateStoreFactory {
+
+ @Override
+ public String getName() {
+ return "in-memory";
+ }
+
+ @Override
+ public StateStore buildExtension(ExtensionConfig extensionConfig)
+ throws IOException, TikaConfigException {
+ InMemoryStateStore store = new InMemoryStateStore();
+ store.setExtensionConfig(extensionConfig);
+ // In-memory store doesn't need configuration, but we initialize it
anyway
+ try {
+ store.initialize(null);
+ } catch (StateStoreException e) {
+ throw new TikaConfigException("Failed to initialize
InMemoryStateStore", e);
+ }
+ return store;
+ }
+}
diff --git
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/StateStoreManager.java
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/StateStoreManager.java
new file mode 100644
index 000000000..09e5a31c7
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/statestore/StateStoreManager.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.statestore;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.pf4j.PluginManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.config.loader.TikaJsonConfig;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.pipes.api.statestore.StateStore;
+import org.apache.tika.pipes.api.statestore.StateStoreException;
+import org.apache.tika.pipes.api.statestore.StateStoreFactory;
+import org.apache.tika.plugins.PluginComponentLoader;
+
+/**
+ * Manager for loading StateStore implementations from configuration.
+ * This follows the same pattern as FetcherManager and EmitterManager.
+ */
+public class StateStoreManager {
+
+ public static final String CONFIG_KEY = "stateStore";
+ private static final Logger LOG =
LoggerFactory.getLogger(StateStoreManager.class);
+
+ /**
+ * Load a StateStore from the given configuration.
+ * If no state store is configured, returns an InMemoryStateStore by
default.
+ *
+ * @param pluginManager the plugin manager for loading factories
+ * @param tikaJsonConfig the Tika configuration
+ * @return a configured StateStore instance
+ * @throws IOException if loading fails
+ * @throws TikaConfigException if configuration is invalid
+ */
+ public static StateStore load(PluginManager pluginManager, TikaJsonConfig
tikaJsonConfig)
+ throws IOException, TikaConfigException {
+ JsonNode stateStoreNode = tikaJsonConfig.getRootNode().get(CONFIG_KEY);
+
+ if (stateStoreNode == null) {
+ LOG.info("No state store configured, using default
InMemoryStateStore");
+ InMemoryStateStore store = new InMemoryStateStore();
+ try {
+ store.initialize(null);
+ } catch (StateStoreException e) {
+ throw new TikaConfigException("Failed to initialize default
InMemoryStateStore", e);
+ }
+ return store;
+ }
+
+ try {
+ Map<String, StateStore> stores =
+ PluginComponentLoader.loadInstances(pluginManager,
+ StateStoreFactory.class,
+ stateStoreNode);
+
+ if (stores.isEmpty()) {
+ throw new TikaConfigException("No state store instances loaded
from configuration");
+ }
+
+ if (stores.size() > 1) {
+ LOG.warn("Multiple state stores configured, using the first
one: {}",
+ stores.keySet().iterator().next());
+ }
+
+ // Return the first (and typically only) state store
+ StateStore store = stores.values().iterator().next();
+ LOG.info("Loaded state store: {}", store.getClass().getName());
+ return store;
+
+ } catch (Exception e) {
+ throw new TikaConfigException("Failed to load state store from
configuration", e);
+ }
+ }
+
+ /**
+ * Create a default in-memory state store.
+ * This is useful for testing and single-node deployments.
+ *
+ * @return a new InMemoryStateStore instance
+ * @throws TikaConfigException if initialization fails
+ */
+ public static StateStore createDefault() throws TikaConfigException {
+ InMemoryStateStore store = new InMemoryStateStore();
+ try {
+ store.initialize(null);
+ } catch (StateStoreException e) {
+ throw new TikaConfigException("Failed to initialize default
InMemoryStateStore", e);
+ }
+ return store;
+ }
+}
diff --git
a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/statestore/InMemoryStateStoreTest.java
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/statestore/InMemoryStateStoreTest.java
new file mode 100644
index 000000000..a9b943628
--- /dev/null
+++
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/statestore/InMemoryStateStoreTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core.statestore;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.Set;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.tika.pipes.api.statestore.StateStore;
+import org.apache.tika.pipes.api.statestore.StateStoreException;
+
+public class InMemoryStateStoreTest {
+
+ private StateStore stateStore;
+
+ @BeforeEach
+ public void setUp() throws StateStoreException {
+ stateStore = new InMemoryStateStore();
+ stateStore.initialize(null);
+ }
+
+ @AfterEach
+ public void tearDown() throws StateStoreException {
+ if (stateStore != null) {
+ stateStore.close();
+ }
+ }
+
+ @Test
+ public void testPutAndGet() throws StateStoreException {
+ byte[] value = "test-value".getBytes(StandardCharsets.UTF_8);
+ stateStore.put("test-key", value);
+
+ byte[] retrieved = stateStore.get("test-key");
+ assertNotNull(retrieved);
+ assertArrayEquals(value, retrieved);
+ }
+
+ @Test
+ public void testGetNonExistent() throws StateStoreException {
+ byte[] retrieved = stateStore.get("non-existent");
+ assertNull(retrieved);
+ }
+
+ @Test
+ public void testDelete() throws StateStoreException {
+ byte[] value = "test-value".getBytes(StandardCharsets.UTF_8);
+ stateStore.put("test-key", value);
+
+ assertTrue(stateStore.delete("test-key"));
+ assertNull(stateStore.get("test-key"));
+ }
+
+ @Test
+ public void testDeleteNonExistent() throws StateStoreException {
+ assertFalse(stateStore.delete("non-existent"));
+ }
+
+ @Test
+ public void testListKeys() throws StateStoreException {
+ stateStore.put("key1", "value1".getBytes(StandardCharsets.UTF_8));
+ stateStore.put("key2", "value2".getBytes(StandardCharsets.UTF_8));
+ stateStore.put("key3", "value3".getBytes(StandardCharsets.UTF_8));
+
+ Set<String> keys = stateStore.listKeys();
+ assertEquals(3, keys.size());
+ assertTrue(keys.contains("key1"));
+ assertTrue(keys.contains("key2"));
+ assertTrue(keys.contains("key3"));
+ }
+
+ @Test
+ public void testAccessTime() throws StateStoreException {
+ Instant now = Instant.now();
+ stateStore.updateAccessTime("test-key", now);
+
+ Instant retrieved = stateStore.getAccessTime("test-key");
+ assertNotNull(retrieved);
+ assertEquals(now, retrieved);
+ }
+
+ @Test
+ public void testAccessTimeNonExistent() throws StateStoreException {
+ Instant retrieved = stateStore.getAccessTime("non-existent");
+ assertNull(retrieved);
+ }
+
+ @Test
+ public void testAccessTimeDeletedWithKey() throws StateStoreException {
+ byte[] value = "test-value".getBytes(StandardCharsets.UTF_8);
+ Instant now = Instant.now();
+
+ stateStore.put("test-key", value);
+ stateStore.updateAccessTime("test-key", now);
+
+ stateStore.delete("test-key");
+
+ assertNull(stateStore.getAccessTime("test-key"));
+ }
+
+ @Test
+ public void testContainsKey() throws StateStoreException {
+ byte[] value = "test-value".getBytes(StandardCharsets.UTF_8);
+ stateStore.put("test-key", value);
+
+ assertTrue(stateStore.containsKey("test-key"));
+ assertFalse(stateStore.containsKey("non-existent"));
+ }
+
+ @Test
+ public void testNullKeyThrows() {
+ assertThrows(StateStoreException.class, () -> stateStore.put(null, new
byte[0]));
+ assertThrows(StateStoreException.class, () -> stateStore.get(null));
+ assertThrows(StateStoreException.class, () -> stateStore.delete(null));
+ assertThrows(StateStoreException.class,
+ () -> stateStore.updateAccessTime(null, Instant.now()));
+ assertThrows(StateStoreException.class, () ->
stateStore.getAccessTime(null));
+ }
+
+ @Test
+ public void testNullValueThrows() {
+ assertThrows(StateStoreException.class, () -> stateStore.put("key",
null));
+ }
+
+ @Test
+ public void testNullAccessTimeThrows() {
+ assertThrows(StateStoreException.class, () ->
stateStore.updateAccessTime("key", null));
+ }
+
+ @Test
+ public void testOperationsAfterCloseThrow() throws StateStoreException {
+ stateStore.close();
+
+ assertThrows(StateStoreException.class,
+ () -> stateStore.put("key",
"value".getBytes(StandardCharsets.UTF_8)));
+ assertThrows(StateStoreException.class, () -> stateStore.get("key"));
+ assertThrows(StateStoreException.class, () ->
stateStore.delete("key"));
+ assertThrows(StateStoreException.class, () -> stateStore.listKeys());
+ assertThrows(StateStoreException.class,
+ () -> stateStore.updateAccessTime("key", Instant.now()));
+ assertThrows(StateStoreException.class, () ->
stateStore.getAccessTime("key"));
+ }
+
+ @Test
+ public void testNamespacePrefixes() throws StateStoreException {
+ // Test that keys with different namespace prefixes are handled
correctly
+ stateStore.put("fetcher:config:my-fetcher",
+ "fetcher-data".getBytes(StandardCharsets.UTF_8));
+ stateStore.put("emitter:config:my-emitter",
+ "emitter-data".getBytes(StandardCharsets.UTF_8));
+ stateStore.put("pipesiterator:my-iterator",
+ "iterator-data".getBytes(StandardCharsets.UTF_8));
+
+ Set<String> keys = stateStore.listKeys();
+ assertEquals(3, keys.size());
+
+ assertNotNull(stateStore.get("fetcher:config:my-fetcher"));
+ assertNotNull(stateStore.get("emitter:config:my-emitter"));
+ assertNotNull(stateStore.get("pipesiterator:my-iterator"));
+ }
+}