This is an automated email from the ASF dual-hosted git repository.

ndipiazza pushed a commit to branch file-based-config-store
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/file-based-config-store by 
this push:
     new 8954c37b7 Add embedded Ignite server architecture
8954c37b7 is described below

commit 8954c37b743dc3eabe475f826c45c559b0c2e081
Author: Nicholas DiPiazza <[email protected]>
AuthorDate: Sun Dec 28 14:34:14 2025 -0600

    Add embedded Ignite server architecture
    
    - Created IgniteStoreServer class that runs as embedded server
    - TikaGrpcServer starts Ignite server on startup (if ignite ConfigStore 
configured)
    - IgniteConfigStore now acts as client-only (clientMode=true)
    - No external Ignite dependency needed in Docker/Kubernetes
    - Server runs in background daemon thread within tika-grpc process
    - Clients (gRPC + forked PipesServer) connect to embedded server
    
    Architecture:
      ┌─────────────────────────────────┐
      │      tika-grpc Process         │
      │  ┌──────────────────────────┐  │
      │  │  IgniteStoreServer       │  │
      │  │  (server mode, daemon)   │  │
      │  └────────▲─────────────────┘  │
      │           │                     │
      │  ┌────────┴─────────────────┐  │
      │  │ IgniteConfigStore        │  │
      │  │ (client mode)            │  │
      │  └──────────────────────────┘  │
      └─────────────────────────────────┘
               ▲
               │ (client connection)
               │
      ┌────────┴─────────────────┐
      │  PipesServer (forked)    │
      │  IgniteConfigStore       │
      │  (client mode)           │
      └──────────────────────────┘
---
 .../apache/tika/pipes/grpc/TikaGrpcServerImpl.java |  39 +++++++
 .../tika/pipes/ignite/IgniteConfigStore.java       |  21 ++--
 .../pipes/ignite/server/IgniteStoreServer.java     | 112 +++++++++++++++++++++
 3 files changed, 164 insertions(+), 8 deletions(-)

diff --git 
a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java 
b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
index 26579a2c1..f575aa56b 100644
--- a/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
+++ b/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
@@ -131,11 +131,50 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
         ExtensionConfig storeConfig = new ExtensionConfig(
             configStoreType, configStoreType, configStoreParams);
 
+        // If using Ignite, start the embedded server first
+        if ("ignite".equalsIgnoreCase(configStoreType)) {
+            startIgniteServer(storeConfig);
+        }
+
         return ConfigStoreFactory.createConfigStore(
                 pluginManager,
                 configStoreType,
                 storeConfig);
     }
+    
+    private void startIgniteServer(ExtensionConfig config) {
+        try {
+            LOG.info("Starting embedded Ignite server for ConfigStore");
+            
+            // Parse config to get Ignite settings
+            com.fasterxml.jackson.databind.ObjectMapper mapper = new 
com.fasterxml.jackson.databind.ObjectMapper();
+            com.fasterxml.jackson.databind.JsonNode params = 
mapper.readTree(config.json());
+            
+            String cacheName = params.has("cacheName") ? 
params.get("cacheName").asText() : "tika-config-store";
+            String cacheMode = params.has("cacheMode") ? 
params.get("cacheMode").asText() : "REPLICATED";
+            String instanceName = params.has("igniteInstanceName") ? 
params.get("igniteInstanceName").asText() : "TikaIgniteServer";
+            
+            // Dynamically load and start server (avoid compile-time 
dependency)
+            Class<?> serverClass = 
Class.forName("org.apache.tika.pipes.ignite.server.IgniteStoreServer");
+            Class<?> cacheModeClass = 
Class.forName("org.apache.ignite.cache.CacheMode");
+            Object cacheModeEnum = Enum.valueOf((Class<Enum>) cacheModeClass, 
cacheMode);
+            
+            Object server = serverClass
+                .getConstructor(String.class, cacheModeClass, String.class)
+                .newInstance(cacheName, cacheModeEnum, instanceName);
+            
+            serverClass.getMethod("startAsync").invoke(server);
+            
+            LOG.info("Embedded Ignite server started successfully");
+            
+        } catch (ClassNotFoundException e) {
+            LOG.warn("Ignite server class not found - skipping embedded server 
startup. " +
+                    "Make sure tika-pipes-ignite plugin is loaded.");
+        } catch (Exception e) {
+            LOG.error("Failed to start embedded Ignite server", e);
+            throw new RuntimeException("Failed to start Ignite server", e);
+        }
+    }
 
     @Override
     public void fetchAndParseServerSideStreaming(FetchAndParseRequest request,
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/main/java/org/apache/tika/pipes/ignite/IgniteConfigStore.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/main/java/org/apache/tika/pipes/ignite/IgniteConfigStore.java
index cd65fb858..b76ea4eab 100644
--- 
a/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/main/java/org/apache/tika/pipes/ignite/IgniteConfigStore.java
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/main/java/org/apache/tika/pipes/ignite/IgniteConfigStore.java
@@ -97,17 +97,22 @@ public class IgniteConfigStore implements ConfigStore {
                 cacheName, cacheMode, igniteInstanceName);
 
         IgniteConfiguration cfg = new IgniteConfiguration();
-        cfg.setIgniteInstanceName(igniteInstanceName);
-        cfg.setClientMode(false);
+        cfg.setIgniteInstanceName(igniteInstanceName + "-Client");
+        cfg.setClientMode(true);  // Client mode - connects to embedded server
 
         ignite = Ignition.start(cfg);
 
-        CacheConfiguration<String, ExtensionConfigDTO> cacheCfg = new 
CacheConfiguration<>(cacheName);
-        cacheCfg.setCacheMode(cacheMode);
-        cacheCfg.setBackups(cacheMode == CacheMode.PARTITIONED ? 1 : 0);
-
-        cache = ignite.getOrCreateCache(cacheCfg);
-        LOG.info("IgniteConfigStore initialized successfully");
+        // Get cache (it should already exist on the server)
+        cache = ignite.cache(cacheName);
+        if (cache == null) {
+            // If not found, create it (shouldn't happen if server started 
first)
+            LOG.warn("Cache {} not found on server, creating it", cacheName);
+            CacheConfiguration<String, ExtensionConfigDTO> cacheCfg = new 
CacheConfiguration<>(cacheName);
+            cacheCfg.setCacheMode(cacheMode);
+            cacheCfg.setBackups(cacheMode == CacheMode.PARTITIONED ? 1 : 0);
+            cache = ignite.getOrCreateCache(cacheCfg);
+        }
+        LOG.info("IgniteConfigStore initialized successfully as client");
     }
 
     @Override
diff --git 
a/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/main/java/org/apache/tika/pipes/ignite/server/IgniteStoreServer.java
 
b/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/main/java/org/apache/tika/pipes/ignite/server/IgniteStoreServer.java
new file mode 100644
index 000000000..498e063b3
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-plugins/tika-pipes-ignite/src/main/java/org/apache/tika/pipes/ignite/server/IgniteStoreServer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.ignite.server;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.pipes.ignite.ExtensionConfigDTO;
+
+/**
+ * Embedded Ignite server that hosts the distributed cache.
+ * This runs as a background thread within the tika-grpc process.
+ * Tika gRPC and forked PipesServer instances connect as clients.
+ */
+public class IgniteStoreServer implements AutoCloseable {
+    
+    private static final Logger LOG = 
LoggerFactory.getLogger(IgniteStoreServer.class);
+    private static final String DEFAULT_CACHE_NAME = "tika-config-store";
+    private static final String DEFAULT_INSTANCE_NAME = "TikaIgniteServer";
+    
+    private Ignite ignite;
+    private final String cacheName;
+    private final CacheMode cacheMode;
+    private final String instanceName;
+    
+    public IgniteStoreServer() {
+        this(DEFAULT_CACHE_NAME, CacheMode.REPLICATED, DEFAULT_INSTANCE_NAME);
+    }
+    
+    public IgniteStoreServer(String cacheName, CacheMode cacheMode, String 
instanceName) {
+        this.cacheName = cacheName;
+        this.cacheMode = cacheMode;
+        this.instanceName = instanceName;
+    }
+    
+    /**
+     * Start the Ignite server node in a background daemon thread.
+     */
+    public void startAsync() {
+        Thread serverThread = new Thread(() -> {
+            try {
+                start();
+            } catch (Exception e) {
+                LOG.error("Failed to start Ignite server", e);
+            }
+        }, "IgniteServerThread");
+        serverThread.setDaemon(true);
+        serverThread.start();
+        
+        // Wait for server to initialize
+        try {
+            Thread.sleep(3000);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+    
+    private void start() throws Exception {
+        LOG.info("Starting Ignite server: instance={}, cache={}, mode={}", 
+            instanceName, cacheName, cacheMode);
+        
+        IgniteConfiguration cfg = new IgniteConfiguration();
+        cfg.setIgniteInstanceName(instanceName);
+        cfg.setClientMode(false); // Server mode
+        cfg.setPeerClassLoadingEnabled(true);
+        
+        ignite = Ignition.start(cfg);
+        
+        CacheConfiguration<String, ExtensionConfigDTO> cacheCfg = 
+            new CacheConfiguration<>(cacheName);
+        cacheCfg.setCacheMode(cacheMode);
+        cacheCfg.setBackups(cacheMode == CacheMode.PARTITIONED ? 1 : 0);
+        
+        IgniteCache<String, ExtensionConfigDTO> cache = 
ignite.getOrCreateCache(cacheCfg);
+        
+        LOG.info("Ignite server started successfully with cache: {}", 
cache.getName());
+        LOG.info("Ignite topology: {} nodes", ignite.cluster().nodes().size());
+    }
+    
+    public boolean isRunning() {
+        return ignite != null;
+    }
+    
+    @Override
+    public void close() {
+        if (ignite != null) {
+            LOG.info("Stopping Ignite server: {}", instanceName);
+            ignite.close();
+            ignite = null;
+        }
+    }
+}

Reply via email to