This is an automated email from the ASF dual-hosted git repository.

ndipiazza pushed a commit to branch TIKA-4547-phase1
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/TIKA-4547-phase1 by this push:
     new fca1536a4 TIKA-4557 create tika-proto in its own module - **Phase 1: 
Foundation (TIKA-4547)**                                                 - 
Implement StateStore abstraction                                                
 - Refactor ExpiringFetcherStore                                                
 - Add InMemoryStateStore                                                 - 
Update documentation
fca1536a4 is described below

commit fca1536a45b3cc5ee75d5d5bee79a05a801229ad
Author: Nicholas DiPiazza <[email protected]>
AuthorDate: Mon Dec 8 10:14:39 2025 -0600

    TIKA-4557 create tika-proto in its own module - **Phase 1: Foundation 
(TIKA-4547)**
                                                    - Implement StateStore 
abstraction
                                                    - Refactor 
ExpiringFetcherStore
                                                    - Add InMemoryStateStore
                                                    - Update documentation
---
 .../org/apache/tika/pipes/core/FetcherStore.java   | 258 +++++++++++++++++++++
 1 file changed, 258 insertions(+)

diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/FetcherStore.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/FetcherStore.java
new file mode 100644
index 000000000..1d36ff42d
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/FetcherStore.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.core;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.pipes.api.fetcher.Fetcher;
+import org.apache.tika.pipes.api.statestore.StateStore;
+import org.apache.tika.pipes.api.statestore.StateStoreException;
+import org.apache.tika.pipes.core.statestore.ComponentSerializer;
+import org.apache.tika.plugins.ExtensionConfig;
+
+/**
+ * FetcherStore manages Fetcher instances using StateStore for distributed 
state management.
+ * Provides automatic expiration of stale fetchers based on last access time.
+ * <p>
+ * Key namespaces used:
+ * <ul>
+ *   <li>fetcher:config:{id} - ExtensionConfig for the fetcher</li>
+ *   <li>fetcher:access:{id} - Last access time tracking</li>
+ * </ul>
+ * <p>
+ * Fetcher instances are kept in local memory cache for performance,
+ * while configurations are stored in StateStore for cluster-wide sharing.
+ */
+public class FetcherStore implements AutoCloseable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FetcherStore.class);
+    public static final long EXPIRE_JOB_INITIAL_DELAY = 1L;
+
+    private static final String FETCHER_CONFIG_PREFIX = "fetcher:config:";
+    private static final String FETCHER_ACCESS_PREFIX = "fetcher:access:";
+
+    private final StateStore stateStore;
+    private final ComponentSerializer serializer;
+    private final long expireAfterMillis;
+    private final long checkForExpiredDelayMillis;
+
+    // Local cache of fetcher instances for performance
+    private final Map<String, Fetcher> fetcherCache = new 
ConcurrentHashMap<>();
+
+    private final ScheduledExecutorService executorService =
+            Executors.newSingleThreadScheduledExecutor();
+
+    /**
+     * Create a FetcherStore with the given StateStore backend.
+     *
+     * @param stateStore the state store for distributed state
+     * @param expireAfterMillis how long before fetchers expire (milliseconds)
+     * @param checkForExpiredDelayMillis how often to check for expired 
fetchers (milliseconds)
+     */
+    public FetcherStore(StateStore stateStore, long expireAfterMillis,
+                        long checkForExpiredDelayMillis) {
+        this.stateStore = stateStore;
+        this.serializer = new ComponentSerializer();
+        this.expireAfterMillis = expireAfterMillis;
+        this.checkForExpiredDelayMillis = checkForExpiredDelayMillis;
+
+        // Start expiration check job
+        executorService.scheduleAtFixedRate(this::checkAndRemoveExpired,
+                EXPIRE_JOB_INITIAL_DELAY, checkForExpiredDelayMillis, 
TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Legacy constructor for backward compatibility (takes seconds).
+     *
+     * @param stateStore the state store
+     * @param expireAfterSeconds how long before fetchers expire (seconds)
+     * @param checkForExpiredDelaySeconds how often to check for expired 
fetchers (seconds)
+     */
+    public FetcherStore(StateStore stateStore, int expireAfterSeconds,
+                        int checkForExpiredDelaySeconds) {
+        this(stateStore, expireAfterSeconds * 1000L, 
checkForExpiredDelaySeconds * 1000L);
+    }
+
+    private void checkAndRemoveExpired() {
+        Set<String> expired = new HashSet<>();
+        try {
+            Set<String> keys = stateStore.listKeys();
+            for (String key : keys) {
+                LOG.info("Key in state store: {}", key);
+                if (key.startsWith(FETCHER_ACCESS_PREFIX)) {
+                    String fetcherId = 
key.substring(FETCHER_ACCESS_PREFIX.length());
+                    Instant lastAccessed = stateStore.getAccessTime(key);
+                    if (lastAccessed == null) {
+                        LOG.error("Detected a fetcher with no last access 
time. " +
+                                "FetcherName={}", fetcherId);
+                        expired.add(fetcherId);
+                    } else if (Instant.now()
+                            
.isAfter(lastAccessed.plusMillis(expireAfterMillis))) {
+                        long elapsedMillis = Instant.now().toEpochMilli() -
+                                lastAccessed.toEpochMilli();
+                        LOG.info("Detected stale fetcher {} hasn't been 
accessed in {} ms. " +
+                                "Deleting.", fetcherId, elapsedMillis);
+                        expired.add(fetcherId);
+                    }
+                }
+            }
+            for (String expiredFetcherId : expired) {
+                deleteFetcher(expiredFetcherId);
+            }
+        } catch (StateStoreException e) {
+            LOG.error("Error checking for expired fetchers", e);
+        }
+    }
+
+    /**
+     * Delete a fetcher from the store.
+     *
+     * @param fetcherPluginId the fetcher ID to delete
+     * @return true if the fetcher was deleted, false if it didn't exist
+     */
+    public boolean deleteFetcher(String fetcherPluginId) {
+        try {
+            fetcherCache.remove(fetcherPluginId);
+            boolean configDeleted =
+                    stateStore.delete(FETCHER_CONFIG_PREFIX + fetcherPluginId);
+            stateStore.delete(FETCHER_ACCESS_PREFIX + fetcherPluginId);
+            return configDeleted;
+        } catch (StateStoreException e) {
+            LOG.error("Error deleting fetcher: {}", fetcherPluginId, e);
+            return false;
+        }
+    }
+
+    /**
+     * Get all fetchers currently in the store.
+     *
+     * @return map of fetcher ID to Fetcher instance
+     */
+    public Map<String, Fetcher> getFetchers() {
+        Map<String, Fetcher> result = new HashMap<>();
+        try {
+            Set<String> keys = stateStore.listKeys();
+            for (String key : keys) {
+                if (key.startsWith(FETCHER_CONFIG_PREFIX)) {
+                    String fetcherId = 
key.substring(FETCHER_CONFIG_PREFIX.length());
+                    Fetcher fetcher = fetcherCache.get(fetcherId);
+                    if (fetcher != null) {
+                        result.put(fetcherId, fetcher);
+                    }
+                }
+            }
+        } catch (StateStoreException e) {
+            LOG.error("Error getting fetchers", e);
+        }
+        return result;
+    }
+
+    /**
+     * Get all fetcher configurations currently in the store.
+     *
+     * @return map of fetcher ID to ExtensionConfig
+     */
+    public Map<String, ExtensionConfig> getFetcherConfigs() {
+        Map<String, ExtensionConfig> result = new HashMap<>();
+        try {
+            Set<String> keys = stateStore.listKeys();
+            for (String key : keys) {
+                if (key.startsWith(FETCHER_CONFIG_PREFIX)) {
+                    String fetcherId = 
key.substring(FETCHER_CONFIG_PREFIX.length());
+                    byte[] configData = stateStore.get(key);
+                    if (configData != null) {
+                        ExtensionConfig config = 
serializer.deserializeConfig(configData);
+                        result.put(fetcherId, config);
+                    }
+                }
+            }
+        } catch (StateStoreException e) {
+            LOG.error("Error getting fetcher configs", e);
+        }
+        return result;
+    }
+
+    /**
+     * Get a fetcher and log its access time.
+     * This prevents the scheduled job from removing the stale fetcher.
+     *
+     * @param fetcherPluginId the fetcher ID to retrieve
+     * @param <T> the fetcher type
+     * @return the fetcher instance, or null if not found
+     */
+    public <T extends Fetcher> T getFetcherAndLogAccess(String 
fetcherPluginId) {
+        try {
+            // Update access time in state store
+            String accessKey = FETCHER_ACCESS_PREFIX + fetcherPluginId;
+            stateStore.updateAccessTime(accessKey, Instant.now());
+
+            // Return from cache
+            return (T) fetcherCache.get(fetcherPluginId);
+        } catch (StateStoreException e) {
+            LOG.error("Error logging access for fetcher: {}", fetcherPluginId, 
e);
+            return (T) fetcherCache.get(fetcherPluginId);
+        }
+    }
+
+    /**
+     * Create and store a new fetcher.
+     *
+     * @param fetcher the fetcher instance
+     * @param config the fetcher configuration
+     * @param <T> the fetcher type
+     */
+    public <T extends Fetcher> void createFetcher(T fetcher, ExtensionConfig 
config) {
+        String id = fetcher.getExtensionConfig().id();
+
+        try {
+            // Store config in state store for cluster-wide sharing
+            byte[] configData = serializer.serializeConfig(config);
+            stateStore.put(FETCHER_CONFIG_PREFIX + id, configData);
+
+            // Cache the fetcher instance locally
+            fetcherCache.put(id, fetcher);
+
+            // Log initial access
+            getFetcherAndLogAccess(id);
+
+            LOG.info("Created fetcher: {}", id);
+        } catch (StateStoreException e) {
+            LOG.error("Error creating fetcher: {}", id, e);
+            throw new RuntimeException("Failed to create fetcher: " + id, e);
+        }
+    }
+
+    @Override
+    public void close() {
+        executorService.shutdownNow();
+        try {
+            stateStore.close();
+        } catch (StateStoreException e) {
+            LOG.error("Error closing state store", e);
+        }
+    }
+}

Reply via email to