This is an automated email from the ASF dual-hosted git repository.
ndipiazza pushed a commit to branch file-based-config-store
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/file-based-config-store by
this push:
new 8954c37b7 Add embedded Ignite server architecture
8954c37b7 is described below
commit 8954c37b743dc3eabe475f826c45c559b0c2e081
Author: Nicholas DiPiazza <[email protected]>
AuthorDate: Sun Dec 28 14:34:14 2025 -0600
Add embedded Ignite server architecture
- Created IgniteStoreServer class that runs as embedded server
- TikaGrpcServer starts Ignite server on startup (if ignite ConfigStore
configured)
- IgniteConfigStore now acts as client-only (clientMode=true)
- No external Ignite dependency needed in Docker/Kubernetes
- Server runs in background daemon thread within tika-grpc process
- Clients (gRPC + forked PipesServer) connect to embedded server
Architecture:
┌─────────────────────────────────┐
│ tika-grpc Process │
│ ┌──────────────────────────┐ │
│ │ IgniteStoreServer │ │
│ │ (server mode, daemon) │ │
│ └────────▲─────────────────┘ │
│ │ │
│ ┌────────┴─────────────────┐ │
│ │ IgniteConfigStore │ │
│ │ (client mode) │ │
│ └──────────────────────────┘ │
└─────────────────────────────────┘
▲
│ (client connection)
│
┌────────┴─────────────────┐
│ PipesServer (forked) │
│ IgniteConfigStore │
│ (client mode) │
└──────────────────────────┘
---
.../apache/tika/pipes/grpc/TikaGrpcServerImpl.java | 39 +++++++
.../tika/pipes/ignite/IgniteConfigStore.java | 21 ++--
.../pipes/ignite/server/IgniteStoreServer.java | 112 +++++++++++++++++++++
3 files changed, 164 insertions(+), 8 deletions(-)
diff --git
a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
index 26579a2c1..f575aa56b 100644
--- a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
+++ b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
@@ -131,11 +131,50 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
ExtensionConfig storeConfig = new ExtensionConfig(
configStoreType, configStoreType, configStoreParams);
+ // If using Ignite, start the embedded server first
+ if ("ignite".equalsIgnoreCase(configStoreType)) {
+ startIgniteServer(storeConfig);
+ }
+
return ConfigStoreFactory.createConfigStore(
pluginManager,
configStoreType,
storeConfig);
}
+
+ private void startIgniteServer(ExtensionConfig config) {
+ try {
+ LOG.info("Starting embedded Ignite server for ConfigStore");
+
+ // Parse config to get Ignite settings
+ com.fasterxml.jackson.databind.ObjectMapper mapper = new
com.fasterxml.jackson.databind.ObjectMapper();
+ com.fasterxml.jackson.databind.JsonNode params =
mapper.readTree(config.json());
+
+ String cacheName = params.has("cacheName") ?
params.get("cacheName").asText() : "tika-config-store";
+ String cacheMode = params.has("cacheMode") ?
params.get("cacheMode").asText() : "REPLICATED";
+ String instanceName = params.has("igniteInstanceName") ?
params.get("igniteInstanceName").asText() : "TikaIgniteServer";
+
+ // Dynamically load and start server (avoid compile-time
dependency)
+ Class<?> serverClass =
Class.forName("org.apache.tika.pipes.ignite.server.IgniteStoreServer");
+ Class<?> cacheModeClass =
Class.forName("org.apache.ignite.cache.CacheMode");
+ Object cacheModeEnum = Enum.valueOf((Class<Enum>) cacheModeClass,
cacheMode);
+
+ Object server = serverClass
+ .getConstructor(String.class, cacheModeClass, String.class)
+ .newInstance(cacheName, cacheModeEnum, instanceName);
+
+ serverClass.getMethod("startAsync").invoke(server);
+
+ LOG.info("Embedded Ignite server started successfully");
+
+ } catch (ClassNotFoundException e) {
+ LOG.warn("Ignite server class not found - skipping embedded server
startup. " +
+ "Make sure tika-pipes-ignite plugin is loaded.");
+ } catch (Exception e) {
+ LOG.error("Failed to start embedded Ignite server", e);
+ throw new RuntimeException("Failed to start Ignite server", e);
+ }
+ }
@Override
public void fetchAndParseServerSideStreaming(FetchAndParseRequest request,
diff --git
a/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/main/java/org/apache/tika/pipes/ignite/IgniteConfigStore.java
b/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/main/java/org/apache/tika/pipes/ignite/IgniteConfigStore.java
index cd65fb858..b76ea4eab 100644
---
a/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/main/java/org/apache/tika/pipes/ignite/IgniteConfigStore.java
+++
b/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/main/java/org/apache/tika/pipes/ignite/IgniteConfigStore.java
@@ -97,17 +97,22 @@ public class IgniteConfigStore implements ConfigStore {
cacheName, cacheMode, igniteInstanceName);
IgniteConfiguration cfg = new IgniteConfiguration();
- cfg.setIgniteInstanceName(igniteInstanceName);
- cfg.setClientMode(false);
+ cfg.setIgniteInstanceName(igniteInstanceName + "-Client");
+ cfg.setClientMode(true); // Client mode - connects to embedded server
ignite = Ignition.start(cfg);
- CacheConfiguration<String, ExtensionConfigDTO> cacheCfg = new
CacheConfiguration<>(cacheName);
- cacheCfg.setCacheMode(cacheMode);
- cacheCfg.setBackups(cacheMode == CacheMode.PARTITIONED ? 1 : 0);
-
- cache = ignite.getOrCreateCache(cacheCfg);
- LOG.info("IgniteConfigStore initialized successfully");
+ // Get cache (it should already exist on the server)
+ cache = ignite.cache(cacheName);
+ if (cache == null) {
+ // If not found, create it (shouldn't happen if server started
first)
+ LOG.warn("Cache {} not found on server, creating it", cacheName);
+ CacheConfiguration<String, ExtensionConfigDTO> cacheCfg = new
CacheConfiguration<>(cacheName);
+ cacheCfg.setCacheMode(cacheMode);
+ cacheCfg.setBackups(cacheMode == CacheMode.PARTITIONED ? 1 : 0);
+ cache = ignite.getOrCreateCache(cacheCfg);
+ }
+ LOG.info("IgniteConfigStore initialized successfully as client");
}
@Override
diff --git
a/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/main/java/org/apache/tika/pipes/ignite/server/IgniteStoreServer.java
b/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/main/java/org/apache/tika/pipes/ignite/server/IgniteStoreServer.java
new file mode 100644
index 000000000..498e063b3
--- /dev/null
+++
b/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/main/java/org/apache/tika/pipes/ignite/server/IgniteStoreServer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.ignite.server;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.pipes.ignite.ExtensionConfigDTO;
+
+/**
+ * Embedded Ignite server that hosts the distributed cache.
+ * This runs as a background thread within the tika-grpc process.
+ * Tika gRPC and forked PipesServer instances connect as clients.
+ */
+public class IgniteStoreServer implements AutoCloseable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(IgniteStoreServer.class);
+ private static final String DEFAULT_CACHE_NAME = "tika-config-store";
+ private static final String DEFAULT_INSTANCE_NAME = "TikaIgniteServer";
+
+ private Ignite ignite;
+ private final String cacheName;
+ private final CacheMode cacheMode;
+ private final String instanceName;
+
+ public IgniteStoreServer() {
+ this(DEFAULT_CACHE_NAME, CacheMode.REPLICATED, DEFAULT_INSTANCE_NAME);
+ }
+
+ public IgniteStoreServer(String cacheName, CacheMode cacheMode, String
instanceName) {
+ this.cacheName = cacheName;
+ this.cacheMode = cacheMode;
+ this.instanceName = instanceName;
+ }
+
+ /**
+ * Start the Ignite server node in a background daemon thread.
+ */
+ public void startAsync() {
+ Thread serverThread = new Thread(() -> {
+ try {
+ start();
+ } catch (Exception e) {
+ LOG.error("Failed to start Ignite server", e);
+ }
+ }, "IgniteServerThread");
+ serverThread.setDaemon(true);
+ serverThread.start();
+
+ // Wait for server to initialize
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void start() throws Exception {
+ LOG.info("Starting Ignite server: instance={}, cache={}, mode={}",
+ instanceName, cacheName, cacheMode);
+
+ IgniteConfiguration cfg = new IgniteConfiguration();
+ cfg.setIgniteInstanceName(instanceName);
+ cfg.setClientMode(false); // Server mode
+ cfg.setPeerClassLoadingEnabled(true);
+
+ ignite = Ignition.start(cfg);
+
+ CacheConfiguration<String, ExtensionConfigDTO> cacheCfg =
+ new CacheConfiguration<>(cacheName);
+ cacheCfg.setCacheMode(cacheMode);
+ cacheCfg.setBackups(cacheMode == CacheMode.PARTITIONED ? 1 : 0);
+
+ IgniteCache<String, ExtensionConfigDTO> cache =
ignite.getOrCreateCache(cacheCfg);
+
+ LOG.info("Ignite server started successfully with cache: {}",
cache.getName());
+ LOG.info("Ignite topology: {} nodes", ignite.cluster().nodes().size());
+ }
+
+ public boolean isRunning() {
+ return ignite != null;
+ }
+
+ @Override
+ public void close() {
+ if (ignite != null) {
+ LOG.info("Stopping Ignite server: {}", instanceName);
+ ignite.close();
+ ignite = null;
+ }
+ }
+}