[
https://issues.apache.org/jira/browse/TIKA-4606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18062235#comment-18062235
]
ASF GitHub Bot commented on TIKA-4606:
--------------------------------------
nddipiazza commented on code in PR #2655:
URL: https://github.com/apache/tika/pull/2655#discussion_r2874011598
##########
tika-e2e-tests/README.md:
##########
@@ -0,0 +1,59 @@
+# Apache Tika End-to-End Tests
+
+End-to-end integration tests for Apache Tika components.
+
+## Overview
+
+This module contains standalone end-to-end (E2E) tests for various Apache Tika
distribution formats and deployment modes. Unlike unit and integration tests in
the main Tika build, these E2E tests validate complete deployment scenarios
using Docker containers and real-world test data.
+
+**Note:** This module is intentionally **NOT** included in the main Tika
parent POM. It is designed to be built and run independently to avoid slowing
down the primary build process.
+
Review Comment:
Fixed — updated README to reflect that the module is now included under the
`-Pe2e` Maven profile.
##########
tika-e2e-tests/tika-grpc/src/test/java/org/apache/tika/pipes/ExternalTestBase.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInstance;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import org.apache.tika.FetchAndParseReply;
+
+/**
+ * Base class for Tika gRPC end-to-end tests.
+ * Can run with either local server (default in CI) or Docker Compose.
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@Testcontainers
+@Slf4j
+@Tag("E2ETest")
+public abstract class ExternalTestBase {
+ public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ public static final int MAX_STARTUP_TIMEOUT = 120;
+ public static final String GOV_DOCS_FOLDER = "/tika/govdocs1";
+ public static final File TEST_FOLDER = new File("target", "govdocs1");
+ public static final int GOV_DOCS_FROM_IDX =
Integer.parseInt(System.getProperty("govdocs1.fromIndex", "1"));
+ public static final int GOV_DOCS_TO_IDX =
Integer.parseInt(System.getProperty("govdocs1.toIndex", "1"));
+ public static final String DIGITAL_CORPORA_ZIP_FILES_URL =
"https://corp.digitalcorpora.org/corpora/files/govdocs1/zipfiles";
+ private static final boolean USE_LOCAL_SERVER =
Boolean.parseBoolean(System.getProperty("tika.e2e.useLocalServer", "false"));
+ private static final int GRPC_PORT =
Integer.parseInt(System.getProperty("tika.e2e.grpcPort", "50052"));
+
+ public static DockerComposeContainer<?> composeContainer;
+ private static Process localGrpcProcess;
+
+ @BeforeAll
+ static void setup() throws Exception {
+ loadGovdocs1();
+
+ if (USE_LOCAL_SERVER) {
+ startLocalGrpcServer();
+ } else {
+ startDockerGrpcServer();
+ }
+ }
+
+ private static void startLocalGrpcServer() throws Exception {
+ log.info("Starting local tika-grpc server using Maven exec");
+
+ Path tikaGrpcDir = findTikaGrpcDirectory();
+ Path configFile =
Path.of("src/test/resources/tika-config.json").toAbsolutePath();
+
+ if (!Files.exists(configFile)) {
+ throw new IllegalStateException("Config file not found: " +
configFile);
+ }
+
+ log.info("Using tika-grpc from: {}", tikaGrpcDir);
+ log.info("Using config file: {}", configFile);
+
+ String javaHome = System.getProperty("java.home");
+ boolean isWindows =
System.getProperty("os.name").toLowerCase(Locale.ROOT).contains("win");
+ String javaCmd = javaHome + (isWindows ? "\\bin\\java.exe" :
"/bin/java");
+ String mvnCmd = isWindows ? "mvn.cmd" : "mvn";
+
+ ProcessBuilder pb = new ProcessBuilder(
+ mvnCmd,
+ "exec:exec",
+ "-Dexec.executable=" + javaCmd,
+ "-Dexec.args=" +
+ "--add-opens=java.base/java.lang=ALL-UNNAMED " +
+ "--add-opens=java.base/java.nio=ALL-UNNAMED " +
+ "--add-opens=java.base/java.util=ALL-UNNAMED " +
+ "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED " +
+ "-classpath %classpath " +
+ "org.apache.tika.pipes.grpc.TikaGrpcServer " +
+ "-c " + configFile + " " +
+ "-p " + GRPC_PORT
+ );
Review Comment:
Fixed — now uses `mvnw`/`mvnw.cmd` resolved from the located Tika root
directory instead of bare `mvn`.
##########
tika-e2e-tests/tika-grpc/src/test/java/org/apache/tika/pipes/ignite/IgniteConfigStoreTest.java:
##########
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.ignite;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import org.apache.tika.FetchAndParseReply;
+import org.apache.tika.FetchAndParseRequest;
+import org.apache.tika.SaveFetcherReply;
+import org.apache.tika.SaveFetcherRequest;
+import org.apache.tika.TikaGrpc;
+import org.apache.tika.pipes.fetcher.fs.FileSystemFetcherConfig;
+
+/**
+ * End-to-end test for Ignite ConfigStore.
+ * Tests that fetchers saved via gRPC are persisted in Ignite
+ * and available in the forked PipesServer process.
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@Testcontainers
+@Slf4j
+@Tag("E2ETest")
+@DisabledOnOs(value = OS.WINDOWS, disabledReason = "Maven not on PATH and
Docker/Testcontainers not supported on Windows CI")
+class IgniteConfigStoreTest {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final int MAX_STARTUP_TIMEOUT = 120;
+ private static final File TEST_FOLDER = new File("target", "govdocs1");
+ private static final int GOV_DOCS_FROM_IDX =
Integer.parseInt(System.getProperty("govdocs1.fromIndex", "1"));
+ private static final int GOV_DOCS_TO_IDX =
Integer.parseInt(System.getProperty("govdocs1.toIndex", "1"));
+ private static final String DIGITAL_CORPORA_ZIP_FILES_URL =
"https://corp.digitalcorpora.org/corpora/files/govdocs1/zipfiles";
+ private static final boolean USE_LOCAL_SERVER =
Boolean.parseBoolean(System.getProperty("tika.e2e.useLocalServer", "false"));
+ private static final int GRPC_PORT =
Integer.parseInt(System.getProperty("tika.e2e.grpcPort", "50052"));
+
+ private static DockerComposeContainer<?> igniteComposeContainer;
+ private static Process localGrpcProcess;
+
+ @BeforeAll
+ static void setupIgnite() throws Exception {
+ // Clean up any orphaned processes from previous runs
+ if (USE_LOCAL_SERVER) {
+ log.info("Cleaning up any orphaned processes from previous runs");
+ try {
+ killProcessOnPort(GRPC_PORT);
+ killProcessOnPort(3344);
+ killProcessOnPort(10800);
+ } catch (Exception e) {
+ log.debug("No orphaned processes to clean up");
+ }
+ }
+
+ // Load govdocs1 if not already loaded
+ if (!TEST_FOLDER.exists() || TEST_FOLDER.listFiles().length == 0) {
+ downloadAndUnzipGovdocs1(GOV_DOCS_FROM_IDX, GOV_DOCS_TO_IDX);
+ }
+
+ if (USE_LOCAL_SERVER) {
+ startLocalGrpcServer();
+ } else {
+ startDockerGrpcServer();
+ }
+ }
+
+ private static void startLocalGrpcServer() throws Exception {
+ log.info("Starting local tika-grpc server using Maven");
+
+ // Find the tika root directory - it should contain both tika-grpc and
tika-e2e-tests
+ Path currentDir = Path.of("").toAbsolutePath();
+ Path tikaRootDir = currentDir;
+
+ // Navigate up to find the directory that contains both tika-grpc and
tika-e2e-tests
+ while (tikaRootDir != null &&
+ !(Files.exists(tikaRootDir.resolve("tika-grpc")) &&
+ Files.exists(tikaRootDir.resolve("tika-e2e-tests")))) {
+ tikaRootDir = tikaRootDir.getParent();
+ }
+
+ if (tikaRootDir == null) {
+ throw new IllegalStateException("Cannot find tika root directory.
" +
+ "Current dir: " + currentDir + ". " +
+ "Please run from within the tika project.");
+ }
+
+ Path tikaGrpcDir = tikaRootDir.resolve("tika-grpc");
+ if (!Files.exists(tikaGrpcDir)) {
+ throw new IllegalStateException("Cannot find tika-grpc directory
at: " + tikaGrpcDir);
+ }
+
+ // Use different config for local vs Docker
+ String configFileName = "tika-config-ignite-local.json";
+ Path configFile = Path.of("src/test/resources/" +
configFileName).toAbsolutePath();
+
+ if (!Files.exists(configFile)) {
+ throw new IllegalStateException("Config file not found: " +
configFile);
+ }
+
+ log.info("Tika root: {}", tikaRootDir);
+ log.info("Using tika-grpc from: {}", tikaGrpcDir);
+ log.info("Using config file: {}", configFile);
+
+ // Use mvn exec:exec to run as external process (not exec:java which
breaks ServiceLoader)
+ String javaHome = System.getProperty("java.home");
+ String javaCmd = javaHome + "/bin/java";
+
+ ProcessBuilder pb = new ProcessBuilder(
+ "mvn",
+ "exec:exec",
+ "-Dexec.executable=" + javaCmd,
+ "-Dexec.args=" +
+ "--add-opens=java.base/java.lang=ALL-UNNAMED " +
+ "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED " +
+ "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED " +
+ "--add-opens=java.base/java.io=ALL-UNNAMED " +
+ "--add-opens=java.base/java.nio=ALL-UNNAMED " +
+ "--add-opens=java.base/java.math=ALL-UNNAMED " +
+ "--add-opens=java.base/java.util=ALL-UNNAMED " +
+ "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED " +
+ "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
" +
+ "--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED
" +
+ "--add-opens=java.base/java.time=ALL-UNNAMED " +
+ "--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED " +
+ "--add-opens=java.base/jdk.internal.access=ALL-UNNAMED " +
+ "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED " +
+
"--add-opens=java.management/com.sun.jmx.mbeanserver=ALL-UNNAMED " +
+
"--add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED " +
+ "-Dio.netty.tryReflectionSetAccessible=true " +
+ "-Dignite.work.dir=" +
tikaGrpcDir.resolve("target/ignite-work") + " " +
+ "-classpath %classpath " +
+ "org.apache.tika.pipes.grpc.TikaGrpcServer " +
+ "-c " + configFile + " " +
+ "-p " + GRPC_PORT
+ );
+
+ pb.directory(tikaGrpcDir.toFile());
+ pb.redirectErrorStream(true);
+ pb.redirectOutput(ProcessBuilder.Redirect.PIPE);
+
+ localGrpcProcess = pb.start();
+
+ // Track whether Ignite has started
+ final boolean[] igniteStarted = {false};
+
+ // Start a thread to consume and log output, watching for Ignite
startup
+ Thread logThread = new Thread(() -> {
+ try (java.io.BufferedReader reader = new java.io.BufferedReader(
+ new
java.io.InputStreamReader(localGrpcProcess.getInputStream(),
java.nio.charset.StandardCharsets.UTF_8))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ log.info("tika-grpc: {}", line);
+
+ // Look for signs that Ignite has fully started
+ if (line.contains("Ignite server started") ||
+ line.contains("Table") && line.contains("created
successfully") ||
+ line.contains("Server started, listening on")) {
+ synchronized (igniteStarted) {
+ igniteStarted[0] = true;
+ igniteStarted.notifyAll();
+ }
+ }
+ }
+ } catch (IOException e) {
+ log.error("Error reading server output", e);
+ }
+ });
+ logThread.setDaemon(true);
+ logThread.start();
+
+ // Wait for Ignite to start - check both log messages and gRPC
connectivity
+ log.info("Waiting for local gRPC server and Ignite to start (timeout:
180 seconds)...");
+
+ try {
+ org.awaitility.Awaitility.await()
+ .atMost(java.time.Duration.ofSeconds(180))
+ .pollInterval(java.time.Duration.ofSeconds(2))
+ .until(() -> {
+ boolean igniteReady;
+ synchronized (igniteStarted) {
+ igniteReady = igniteStarted[0];
+ }
+
+ if (!igniteReady) {
+ log.debug("Waiting for Ignite to start...");
+ return false;
+ }
+
+ // Try to actually test gRPC readiness with a real
(lightweight) call
+ try {
+ ManagedChannel testChannel = ManagedChannelBuilder
+ .forAddress("localhost", GRPC_PORT)
+ .usePlaintext()
+ .build();
+
+ try {
+ // Try to use the health check service
+ io.grpc.health.v1.HealthGrpc.HealthBlockingStub
healthStub =
+
io.grpc.health.v1.HealthGrpc.newBlockingStub(testChannel)
+ .withDeadlineAfter(2, TimeUnit.SECONDS);
+
+ io.grpc.health.v1.HealthCheckResponse response =
healthStub.check(
+
io.grpc.health.v1.HealthCheckRequest.getDefaultInstance());
+
+ boolean serving = response.getStatus() ==
+
io.grpc.health.v1.HealthCheckResponse.ServingStatus.SERVING;
+
+ if (serving) {
+ log.info("gRPC server is healthy and
serving!");
+ return true;
+ } else {
+ log.debug("gRPC server responding but not
serving yet: {}", response.getStatus());
+ return false;
+ }
+ } finally {
+ testChannel.shutdown();
+ testChannel.awaitTermination(1, TimeUnit.SECONDS);
+ }
+ } catch (io.grpc.StatusRuntimeException e) {
+ if (e.getStatus().getCode() ==
io.grpc.Status.Code.UNIMPLEMENTED) {
+ // Health check not implemented, just verify
channel works
+ log.info("Health check not available, assuming
server is ready");
+ return true;
+ }
+ log.debug("gRPC server not ready yet: {}",
e.getMessage());
+ return false;
+ } catch (Exception e) {
+ log.debug("gRPC server not ready yet: {}",
e.getMessage());
+ return false;
+ }
+ });
+
+ log.info("Both gRPC server and Ignite are ready!");
+ } catch (org.awaitility.core.ConditionTimeoutException e) {
+ if (localGrpcProcess.isAlive()) {
+ localGrpcProcess.destroyForcibly();
+ }
+ throw new RuntimeException("Local gRPC server or Ignite failed to
start within timeout", e);
+ }
+
+ log.info("Local tika-grpc server started successfully on port {}",
GRPC_PORT);
+ }
+
+
+ private static void startDockerGrpcServer() {
+ log.info("Starting Docker Compose tika-grpc server");
+
+ igniteComposeContainer = new DockerComposeContainer<>(
+ new File("src/test/resources/docker-compose-ignite.yml"))
+ .withEnv("HOST_GOVDOCS1_DIR", TEST_FOLDER.getAbsolutePath())
+ .withStartupTimeout(Duration.of(MAX_STARTUP_TIMEOUT,
ChronoUnit.SECONDS))
+ .withExposedService("tika-grpc", 50052,
+ Wait.forLogMessage(".*Server started.*\\n", 1))
+ .withLogConsumer("tika-grpc", new Slf4jLogConsumer(log));
+
+ igniteComposeContainer.start();
+
Review Comment:
Fixed — Docker mode now requires a `tika.docker.compose.ignite.file` system
property and throws a clear error if not set, rather than silently referencing
a non-existent file.
##########
tika-e2e-tests/tika-grpc/src/test/java/org/apache/tika/pipes/ignite/IgniteConfigStoreTest.java:
##########
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.ignite;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import org.apache.tika.FetchAndParseReply;
+import org.apache.tika.FetchAndParseRequest;
+import org.apache.tika.SaveFetcherReply;
+import org.apache.tika.SaveFetcherRequest;
+import org.apache.tika.TikaGrpc;
+import org.apache.tika.pipes.fetcher.fs.FileSystemFetcherConfig;
+
+/**
+ * End-to-end test for Ignite ConfigStore.
+ * Tests that fetchers saved via gRPC are persisted in Ignite
+ * and available in the forked PipesServer process.
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@Testcontainers
+@Slf4j
+@Tag("E2ETest")
+@DisabledOnOs(value = OS.WINDOWS, disabledReason = "Maven not on PATH and
Docker/Testcontainers not supported on Windows CI")
+class IgniteConfigStoreTest {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final int MAX_STARTUP_TIMEOUT = 120;
+ private static final File TEST_FOLDER = new File("target", "govdocs1");
+ private static final int GOV_DOCS_FROM_IDX =
Integer.parseInt(System.getProperty("govdocs1.fromIndex", "1"));
+ private static final int GOV_DOCS_TO_IDX =
Integer.parseInt(System.getProperty("govdocs1.toIndex", "1"));
+ private static final String DIGITAL_CORPORA_ZIP_FILES_URL =
"https://corp.digitalcorpora.org/corpora/files/govdocs1/zipfiles";
+ private static final boolean USE_LOCAL_SERVER =
Boolean.parseBoolean(System.getProperty("tika.e2e.useLocalServer", "false"));
+ private static final int GRPC_PORT =
Integer.parseInt(System.getProperty("tika.e2e.grpcPort", "50052"));
+
+ private static DockerComposeContainer<?> igniteComposeContainer;
+ private static Process localGrpcProcess;
+
+ @BeforeAll
+ static void setupIgnite() throws Exception {
+ // Clean up any orphaned processes from previous runs
+ if (USE_LOCAL_SERVER) {
+ log.info("Cleaning up any orphaned processes from previous runs");
+ try {
+ killProcessOnPort(GRPC_PORT);
+ killProcessOnPort(3344);
+ killProcessOnPort(10800);
+ } catch (Exception e) {
+ log.debug("No orphaned processes to clean up");
+ }
+ }
+
+ // Load govdocs1 if not already loaded
+ if (!TEST_FOLDER.exists() || TEST_FOLDER.listFiles().length == 0) {
+ downloadAndUnzipGovdocs1(GOV_DOCS_FROM_IDX, GOV_DOCS_TO_IDX);
+ }
+
+ if (USE_LOCAL_SERVER) {
+ startLocalGrpcServer();
+ } else {
+ startDockerGrpcServer();
+ }
+ }
+
+ private static void startLocalGrpcServer() throws Exception {
+ log.info("Starting local tika-grpc server using Maven");
+
+ // Find the tika root directory - it should contain both tika-grpc and
tika-e2e-tests
+ Path currentDir = Path.of("").toAbsolutePath();
+ Path tikaRootDir = currentDir;
+
+ // Navigate up to find the directory that contains both tika-grpc and
tika-e2e-tests
+ while (tikaRootDir != null &&
+ !(Files.exists(tikaRootDir.resolve("tika-grpc")) &&
+ Files.exists(tikaRootDir.resolve("tika-e2e-tests")))) {
+ tikaRootDir = tikaRootDir.getParent();
+ }
+
+ if (tikaRootDir == null) {
+ throw new IllegalStateException("Cannot find tika root directory.
" +
+ "Current dir: " + currentDir + ". " +
+ "Please run from within the tika project.");
+ }
+
+ Path tikaGrpcDir = tikaRootDir.resolve("tika-grpc");
+ if (!Files.exists(tikaGrpcDir)) {
+ throw new IllegalStateException("Cannot find tika-grpc directory
at: " + tikaGrpcDir);
+ }
+
+ // Use different config for local vs Docker
+ String configFileName = "tika-config-ignite-local.json";
+ Path configFile = Path.of("src/test/resources/" +
configFileName).toAbsolutePath();
+
+ if (!Files.exists(configFile)) {
+ throw new IllegalStateException("Config file not found: " +
configFile);
+ }
+
+ log.info("Tika root: {}", tikaRootDir);
+ log.info("Using tika-grpc from: {}", tikaGrpcDir);
+ log.info("Using config file: {}", configFile);
+
+ // Use mvn exec:exec to run as external process (not exec:java which
breaks ServiceLoader)
+ String javaHome = System.getProperty("java.home");
+ String javaCmd = javaHome + "/bin/java";
+
+ ProcessBuilder pb = new ProcessBuilder(
+ "mvn",
+ "exec:exec",
+ "-Dexec.executable=" + javaCmd,
+ "-Dexec.args=" +
+ "--add-opens=java.base/java.lang=ALL-UNNAMED " +
+ "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED " +
+ "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED " +
Review Comment:
Fixed — now uses `mvnw`/`mvnw.cmd` resolved from the Tika root directory.
##########
tika-e2e-tests/tika-grpc/src/test/java/org/apache/tika/pipes/filesystem/FileSystemFetcherTest.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.filesystem;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import org.apache.tika.FetchAndParseReply;
+import org.apache.tika.FetchAndParseRequest;
+import org.apache.tika.SaveFetcherReply;
+import org.apache.tika.SaveFetcherRequest;
+import org.apache.tika.TikaGrpc;
+import org.apache.tika.pipes.ExternalTestBase;
+import org.apache.tika.pipes.fetcher.fs.FileSystemFetcherConfig;
+
+@Slf4j
+@DisabledOnOs(value = OS.WINDOWS, disabledReason = "exec:exec classpath
exceeds Windows CreateProcess command-line length limit")
+class FileSystemFetcherTest extends ExternalTestBase {
+
+ @Test
+ void testFileSystemFetcher() throws Exception {
+ String fetcherId = "defaultFetcher";
+ ManagedChannel channel = getManagedChannel();
+ TikaGrpc.TikaBlockingStub blockingStub =
TikaGrpc.newBlockingStub(channel);
+ TikaGrpc.TikaStub tikaStub = TikaGrpc.newStub(channel);
+
+ // Create and save the fetcher dynamically
+ FileSystemFetcherConfig config = new FileSystemFetcherConfig();
+ // Use local path when running in local mode, Docker path otherwise
+ boolean useLocalServer =
Boolean.parseBoolean(System.getProperty("tika.e2e.useLocalServer", "false"));
Review Comment:
Fixed — default changed to `true` to match the module POM default.
##########
tika-e2e-tests/tika-grpc/src/test/java/org/apache/tika/pipes/ExternalTestBase.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInstance;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import org.apache.tika.FetchAndParseReply;
+
+/**
+ * Base class for Tika gRPC end-to-end tests.
+ * Can run with either local server (default in CI) or Docker Compose.
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@Testcontainers
+@Slf4j
+@Tag("E2ETest")
+public abstract class ExternalTestBase {
+ public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ public static final int MAX_STARTUP_TIMEOUT = 120;
+ public static final String GOV_DOCS_FOLDER = "/tika/govdocs1";
+ public static final File TEST_FOLDER = new File("target", "govdocs1");
+ public static final int GOV_DOCS_FROM_IDX =
Integer.parseInt(System.getProperty("govdocs1.fromIndex", "1"));
+ public static final int GOV_DOCS_TO_IDX =
Integer.parseInt(System.getProperty("govdocs1.toIndex", "1"));
+ public static final String DIGITAL_CORPORA_ZIP_FILES_URL =
"https://corp.digitalcorpora.org/corpora/files/govdocs1/zipfiles";
+ private static final boolean USE_LOCAL_SERVER =
Boolean.parseBoolean(System.getProperty("tika.e2e.useLocalServer", "false"));
+ private static final int GRPC_PORT =
Integer.parseInt(System.getProperty("tika.e2e.grpcPort", "50052"));
+
+ public static DockerComposeContainer<?> composeContainer;
+ private static Process localGrpcProcess;
+
+ @BeforeAll
+ static void setup() throws Exception {
+ loadGovdocs1();
+
+ if (USE_LOCAL_SERVER) {
+ startLocalGrpcServer();
+ } else {
+ startDockerGrpcServer();
+ }
+ }
+
+ private static void startLocalGrpcServer() throws Exception {
+ log.info("Starting local tika-grpc server using Maven exec");
+
+ Path tikaGrpcDir = findTikaGrpcDirectory();
+ Path configFile =
Path.of("src/test/resources/tika-config.json").toAbsolutePath();
+
+ if (!Files.exists(configFile)) {
+ throw new IllegalStateException("Config file not found: " +
configFile);
+ }
+
+ log.info("Using tika-grpc from: {}", tikaGrpcDir);
+ log.info("Using config file: {}", configFile);
+
+ String javaHome = System.getProperty("java.home");
+ boolean isWindows =
System.getProperty("os.name").toLowerCase(Locale.ROOT).contains("win");
+ String javaCmd = javaHome + (isWindows ? "\\bin\\java.exe" :
"/bin/java");
+ String mvnCmd = isWindows ? "mvn.cmd" : "mvn";
+
+ ProcessBuilder pb = new ProcessBuilder(
+ mvnCmd,
+ "exec:exec",
+ "-Dexec.executable=" + javaCmd,
+ "-Dexec.args=" +
+ "--add-opens=java.base/java.lang=ALL-UNNAMED " +
+ "--add-opens=java.base/java.nio=ALL-UNNAMED " +
+ "--add-opens=java.base/java.util=ALL-UNNAMED " +
+ "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED " +
+ "-classpath %classpath " +
+ "org.apache.tika.pipes.grpc.TikaGrpcServer " +
+ "-c " + configFile + " " +
+ "-p " + GRPC_PORT
+ );
+
+ pb.directory(tikaGrpcDir.toFile());
+ pb.redirectErrorStream(true);
+ pb.redirectOutput(ProcessBuilder.Redirect.PIPE);
+
+ localGrpcProcess = pb.start();
+
+ // Start thread to consume output
+ Thread logThread = new Thread(() -> {
+ try (BufferedReader reader = new BufferedReader(
+ new InputStreamReader(localGrpcProcess.getInputStream(),
StandardCharsets.UTF_8))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ log.info("tika-grpc: {}", line);
+ }
+ } catch (IOException e) {
+ log.error("Error reading server output", e);
+ }
+ });
+ logThread.setDaemon(true);
+ logThread.start();
+
+ // Wait for server to be ready
+ waitForServerReady();
+
+ log.info("Local tika-grpc server started successfully on port {}",
GRPC_PORT);
+ }
+
+ private static Path findTikaGrpcDirectory() {
+ Path currentDir = Path.of("").toAbsolutePath();
+ Path tikaRootDir = currentDir;
+
+ while (tikaRootDir != null &&
+ !(Files.exists(tikaRootDir.resolve("tika-grpc")) &&
+ Files.exists(tikaRootDir.resolve("tika-e2e-tests")))) {
+ tikaRootDir = tikaRootDir.getParent();
+ }
+
+ if (tikaRootDir == null) {
+ throw new IllegalStateException("Cannot find tika root directory.
" +
+ "Current dir: " + currentDir);
+ }
+
+ return tikaRootDir.resolve("tika-grpc");
+ }
+
+ private static void waitForServerReady() throws Exception {
+ int maxAttempts = 60;
+ for (int i = 0; i < maxAttempts; i++) {
+ try {
+ ManagedChannel testChannel = ManagedChannelBuilder
+ .forAddress("localhost", GRPC_PORT)
+ .usePlaintext()
+ .build();
+
+ try {
+ // Try a simple connection
+ testChannel.getState(true);
+ TimeUnit.MILLISECONDS.sleep(100);
+ if
(testChannel.getState(false).toString().contains("READY")) {
+ log.info("gRPC server is ready!");
+ return;
+ }
+ } finally {
+ testChannel.shutdown();
+ testChannel.awaitTermination(1, TimeUnit.SECONDS);
+ }
+ } catch (Exception e) {
+ // Server not ready yet
+ }
+ TimeUnit.SECONDS.sleep(1);
+ }
+
+ if (localGrpcProcess != null && localGrpcProcess.isAlive()) {
+ localGrpcProcess.destroyForcibly();
+ }
+ throw new RuntimeException("Local gRPC server failed to start within
timeout");
+ }
+
+ private static void startDockerGrpcServer() {
+ log.info("Starting Docker Compose tika-grpc server");
+
+ composeContainer = new DockerComposeContainer<>(
+ new File("src/test/resources/docker-compose.yml"))
+ .withEnv("HOST_GOVDOCS1_DIR", TEST_FOLDER.getAbsolutePath())
+ .withStartupTimeout(Duration.of(MAX_STARTUP_TIMEOUT,
ChronoUnit.SECONDS))
+ .withExposedService("tika-grpc", 50052,
+ Wait.forLogMessage(".*Server started.*\\n", 1))
+ .withLogConsumer("tika-grpc", new Slf4jLogConsumer(log));
+
+ composeContainer.start();
+
+ log.info("Docker Compose containers started successfully");
+ }
+
+ private static void loadGovdocs1() throws IOException,
InterruptedException {
+ int retries = 3;
+ int attempt = 0;
+ while (true) {
+ try {
+ downloadAndUnzipGovdocs1(GOV_DOCS_FROM_IDX, GOV_DOCS_TO_IDX);
+ break;
+ } catch (IOException e) {
+ attempt++;
+ if (attempt >= retries) {
+ throw e;
+ }
+ log.warn("Download attempt {} failed, retrying in 10
seconds...", attempt, e);
+ TimeUnit.SECONDS.sleep(10);
+ }
+ }
+ }
+
+ @AfterAll
+ void close() {
+ if (USE_LOCAL_SERVER && localGrpcProcess != null) {
+ log.info("Stopping local gRPC server");
+ localGrpcProcess.destroy();
+ try {
+ if (!localGrpcProcess.waitFor(10, TimeUnit.SECONDS)) {
+ localGrpcProcess.destroyForcibly();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ localGrpcProcess.destroyForcibly();
+ }
+ } else if (composeContainer != null) {
+ composeContainer.close();
+ }
+ }
+
+ public static void downloadAndUnzipGovdocs1(int fromIndex, int toIndex)
throws IOException {
+ Path targetDir = TEST_FOLDER.toPath();
+ Files.createDirectories(targetDir);
+
+ for (int i = fromIndex; i <= toIndex; i++) {
+ String zipName = String.format(java.util.Locale.ROOT, "%03d.zip",
i);
+ String url = DIGITAL_CORPORA_ZIP_FILES_URL + "/" + zipName;
+ Path zipPath = targetDir.resolve(zipName);
+
+ if (Files.exists(zipPath)) {
+ log.info("{} already exists, skipping download", zipName);
+ continue;
+ }
+
+ log.info("Downloading {} from {}...", zipName, url);
+ try (InputStream in = new URL(url).openStream()) {
+ Files.copy(in, zipPath, StandardCopyOption.REPLACE_EXISTING);
+ }
+
+ log.info("Unzipping {}...", zipName);
+ try (ZipInputStream zis = new ZipInputStream(new
FileInputStream(zipPath.toFile()))) {
+ ZipEntry entry;
+ while ((entry = zis.getNextEntry()) != null) {
+ Path outPath = targetDir.resolve(entry.getName());
+ if (entry.isDirectory()) {
+ Files.createDirectories(outPath);
+ } else {
+ Files.createDirectories(outPath.getParent());
+ try (OutputStream out =
Files.newOutputStream(outPath)) {
+ zis.transferTo(out);
+ }
+ }
+ zis.closeEntry();
+ }
+ }
+ }
+
+ log.info("Finished downloading and extracting govdocs1 files");
+ }
+
+ public static void assertAllFilesFetched(Path baseDir,
List<FetchAndParseReply> successes,
+ List<FetchAndParseReply> errors) {
+ Set<String> allFetchKeys = new HashSet<>();
+ for (FetchAndParseReply reply : successes) {
+ allFetchKeys.add(reply.getFetchKey());
+ }
+ for (FetchAndParseReply reply : errors) {
+ allFetchKeys.add(reply.getFetchKey());
+ }
+
+ Set<String> keysFromGovdocs1 = new HashSet<>();
+ try (Stream<Path> paths = Files.walk(baseDir)) {
+ paths.filter(Files::isRegularFile)
+ .forEach(file -> {
+ String relPath = baseDir.relativize(file).toString();
+ if
(Pattern.compile("\\d{3}\\.zip").matcher(relPath).find()) {
+ return;
+ }
+ keysFromGovdocs1.add(relPath);
+ });
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ Assertions.assertNotEquals(0, successes.size(), "Should have some
successful fetches");
+ // Note: errors.size() can be 0 if all files parse successfully
+ log.info("Processed {} files: {} successes, {} errors",
allFetchKeys.size(), successes.size(), errors.size());
+ Assertions.assertEquals(keysFromGovdocs1, allFetchKeys, () -> {
+ Set<String> missing = new HashSet<>(keysFromGovdocs1);
+ missing.removeAll(allFetchKeys);
+ return "Missing fetch keys: " + missing;
+ });
+ }
+
+ public static ManagedChannel getManagedChannel() {
+ if (USE_LOCAL_SERVER) {
+ return ManagedChannelBuilder
+ .forAddress("localhost", GRPC_PORT)
+ .usePlaintext()
+ .executor(Executors.newCachedThreadPool())
+ .maxInboundMessageSize(160 * 1024 * 1024)
+ .build();
Review Comment:
Fixed — removed `Executors.newCachedThreadPool()` from the channel builder;
gRPC will use its default executor.
##########
tika-e2e-tests/tika-grpc/src/test/resources/tika-config-ignite-local.json:
##########
@@ -0,0 +1,52 @@
+{
+ "plugin-roots": ["/var/cache/tika/plugins"],
+ "pipes": {
+ "numClients": 1,
+ "configStoreType": "ignite",
+ "configStoreParams": "{\"tableName\": \"tika_e2e_test\",
\"igniteInstanceName\": \"TikaE2ETest\", \"replicas\": 2, \"partitions\": 10,
\"autoClose\": true}",
Review Comment:
Fixed — changed `replicas` from 2 to 1 for single-node local test mode.
##########
tika-e2e-tests/tika-grpc/src/test/java/org/apache/tika/pipes/ExternalTestBase.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInstance;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import org.apache.tika.FetchAndParseReply;
+
+/**
+ * Base class for Tika gRPC end-to-end tests.
+ * Can run with either local server (default in CI) or Docker Compose.
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@Testcontainers
+@Slf4j
+@Tag("E2ETest")
+public abstract class ExternalTestBase {
+ public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ public static final int MAX_STARTUP_TIMEOUT = 120;
+ public static final String GOV_DOCS_FOLDER = "/tika/govdocs1";
+ public static final File TEST_FOLDER = new File("target", "govdocs1");
+ public static final int GOV_DOCS_FROM_IDX =
Integer.parseInt(System.getProperty("govdocs1.fromIndex", "1"));
+ public static final int GOV_DOCS_TO_IDX =
Integer.parseInt(System.getProperty("govdocs1.toIndex", "1"));
+ public static final String DIGITAL_CORPORA_ZIP_FILES_URL =
"https://corp.digitalcorpora.org/corpora/files/govdocs1/zipfiles";
+ private static final boolean USE_LOCAL_SERVER =
Boolean.parseBoolean(System.getProperty("tika.e2e.useLocalServer", "false"));
Review Comment:
Fixed — ExternalTestBase already reads from the `tika.e2e.useLocalServer`
system property; the pom default is `true` and that is what CI passes. The
`FileSystemFetcherTest` local default was also corrected to `true`.
##########
tika-e2e-tests/tika-grpc/src/test/java/org/apache/tika/pipes/ignite/IgniteConfigStoreTest.java:
##########
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.ignite;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import org.apache.tika.FetchAndParseReply;
+import org.apache.tika.FetchAndParseRequest;
+import org.apache.tika.SaveFetcherReply;
+import org.apache.tika.SaveFetcherRequest;
+import org.apache.tika.TikaGrpc;
+import org.apache.tika.pipes.fetcher.fs.FileSystemFetcherConfig;
+
+/**
+ * End-to-end test for Ignite ConfigStore.
+ * Tests that fetchers saved via gRPC are persisted in Ignite
+ * and available in the forked PipesServer process.
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@Testcontainers
+@Slf4j
+@Tag("E2ETest")
+@DisabledOnOs(value = OS.WINDOWS, disabledReason = "Maven not on PATH and
Docker/Testcontainers not supported on Windows CI")
+class IgniteConfigStoreTest {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final int MAX_STARTUP_TIMEOUT = 120;
+ private static final File TEST_FOLDER = new File("target", "govdocs1");
+ private static final int GOV_DOCS_FROM_IDX =
Integer.parseInt(System.getProperty("govdocs1.fromIndex", "1"));
+ private static final int GOV_DOCS_TO_IDX =
Integer.parseInt(System.getProperty("govdocs1.toIndex", "1"));
+ private static final String DIGITAL_CORPORA_ZIP_FILES_URL =
"https://corp.digitalcorpora.org/corpora/files/govdocs1/zipfiles";
+ private static final boolean USE_LOCAL_SERVER =
Boolean.parseBoolean(System.getProperty("tika.e2e.useLocalServer", "false"));
Review Comment:
Fixed — IgniteConfigStoreTest now reads the same default (`true`) consistent
with the POM.
##########
tika-e2e-tests/tika-grpc/pom.xml:
##########
@@ -0,0 +1,173 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.tika</groupId>
+ <artifactId>tika-e2e-tests</artifactId>
+ <version>${revision}</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>tika-grpc-e2e-test</artifactId>
+ <name>Apache Tika gRPC End-to-End Tests</name>
+ <description>End-to-end tests for Apache Tika gRPC Server using test
containers</description>
+
+ <properties>
+ <!
> Upgrade Ignite config store to Ignite 3.x with Calcite SQL engine
> -----------------------------------------------------------------
>
> Key: TIKA-4606
> URL: https://issues.apache.org/jira/browse/TIKA-4606
> Project: Tika
> Issue Type: Improvement
> Reporter: Nicholas DiPiazza
> Assignee: Nicholas DiPiazza
> Priority: Major
>
> h2. Overview
> Upgrade the tika-pipes-config-store-ignite module from Apache Ignite 2.17.0
> (which uses H2 1.4.x) to Apache Ignite 3.x (which uses Apache Calcite SQL
> engine).
> h2. Current State
> * Module: *tika-pipes-config-store-ignite*
> * Ignite Version: 2.17.0
> * SQL Engine: H2 1.4.197 (embedded)
> * Location: {{tika-pipes/tika-pipes-config-store-ignite/}}
> h2. Goals
> # Upgrade to Apache Ignite 3.x (latest stable release)
> # Replace H2 SQL engine with Calcite-based SQL engine
> # Maintain all existing functionality for config store
> # Update API calls to match Ignite 3.x breaking changes
> # Ensure backward compatibility for stored configurations (if possible)
> h2. Benefits
> * Modern SQL engine with Apache Calcite
> * Better performance and query optimization
> * Active maintenance and future support
> * Improved SQL feature set
> * No dependency on old H2 1.4.x (2018)
> h2. Breaking Changes to Address
> * Ignite 3.x has major API changes from 2.x
> * Configuration format changes
> * Cache API differences
> * SQL query API updates
> * Client connection changes
> h2. Implementation Steps
> # Research Ignite 3.x API changes and migration guide
> # Update Maven dependencies to Ignite 3.x
> # Refactor {{IgniteConfigStore}} to use new Ignite 3.x API
> # Update {{IgniteStoreServer}} for new connection model
> # Modify SQL queries if needed for Calcite compatibility
> # Update configuration handling
> # Update tests to work with Ignite 3.x
> # Test backward compatibility with existing configs
> # Update documentation
> h2. Acceptance Criteria
> * Ignite upgraded to version 3.x (latest stable)
> * Uses Calcite SQL engine instead of H2
> * All existing tests pass
> * Config store functionality preserved
> * No H2 dependencies remain
> * Documentation updated
> h2. References
> * Apache Ignite 3.x: https://ignite.apache.org/docs/3.0.0/
> * Ignite 3.x Migration Guide
> * Apache Calcite: https://calcite.apache.org/
> * Current module: {{tika-pipes/tika-pipes-config-store-ignite/}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)