This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch TIKA-4560 in repository https://gitbox.apache.org/repos/asf/tika.git
commit e30f132ce0122655f3f03c4309dacc898da215ad Author: tallison <[email protected]> AuthorDate: Tue Dec 9 12:56:12 2025 -0500 TIKA-4560 -- clean up emit strategy --- .../test/resources/configs/config-template.json | 12 +- .../src/test/resources/kafka/plugins-template.json | 7 +- .../resources/opensearch/plugins-template.json | 5 +- .../opensearch/tika-config-opensearch.json | 5 +- .../src/test/resources/s3/plugins-template.json | 14 ++- .../src/test/resources/solr/plugins-template.json | 5 +- .../src/test/resources/tika-config-solr-urls.json | 7 +- .../test/resources/configs/config-template.json | 14 ++- .../org/apache/tika/pipes/core/EmitStrategy.java | 31 +++++ .../apache/tika/pipes/core/EmitStrategyConfig.java | 130 +++++++++++++++++++++ .../tika/pipes/core/EmitStrategyOverride.java | 82 +++++++++++++ .../org/apache/tika/pipes/core/PipesClient.java | 1 - .../org/apache/tika/pipes/core/PipesConfig.java | 38 +++--- .../apache/tika/pipes/core/server/EmitHandler.java | 29 +++-- .../apache/tika/pipes/core/server/PipesServer.java | 26 ++--- .../apache/tika/pipes/core/server/PipesWorker.java | 6 - .../resources/configs/tika-config-bad-class.json | 9 +- .../configs/tika-config-bad-java-path.json | 9 +- .../configs/tika-config-bad-jvm-args.json | 10 +- .../test/resources/configs/tika-config-basic.json | 5 +- .../configs/tika-config-crashing-detector.json | 13 ++- .../resources/configs/tika-config-emit-all.json | 8 +- .../resources/configs/tika-config-passback.json | 4 +- .../resources/configs/tika-config-truncate.json | 5 +- .../tika/server/core/resource/PipesResource.java | 13 ++- .../resources/configs/cxf-test-base-template.json | 4 +- .../resources/configs/cxf-test-base-template.json | 4 +- 27 files changed, 404 insertions(+), 92 deletions(-) diff --git a/tika-app/src/test/resources/configs/config-template.json b/tika-app/src/test/resources/configs/config-template.json index 783012094..290396b3c 100644 --- a/tika-app/src/test/resources/configs/config-template.json +++ b/tika-app/src/test/resources/configs/config-template.json @@ -43,7 +43,6 @@ "queueSize": 10000, "numEmitters": 1, "emitIntermediateResults": false, - "directEmitThresholdBytes": 100000, "timeoutMillis": 60000, "startupTimeoutMillis": 240000, "sleepOnStartupTimeoutMillis": 240000, @@ -52,8 +51,15 @@ "maxFilesProcessedPerProcess": 10000, "staleFetcherTimeoutSeconds": 600, "staleFetcherDelaySeconds": 60, - "forkedJvmArgs": ["-Xmx1g", "-XX:+UseG1GC"], - "javaPath": "java" + "forkedJvmArgs": [ + "-Xmx1g", + "-XX:+UseG1GC" + ], + "javaPath": "java", + "emitStrategy": { + "type": "DYNAMIC", + "thresholdBytes": 100000 + } }, "plugin-roots": "PLUGIN_ROOTS" } diff --git a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/kafka/plugins-template.json b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/kafka/plugins-template.json index 5cf6b03d1..1d12138b1 100644 --- a/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/kafka/plugins-template.json +++ b/tika-integration-tests/tika-pipes-kafka-integration-tests/src/test/resources/kafka/plugins-template.json @@ -94,7 +94,6 @@ } }, "pipes": { - "directEmitThresholdBytes": 10000, "emitMaxEstimatedBytes": 100000, "emitWithinMillis": 10, "numEmitters": 1, @@ -105,7 +104,11 @@ "-XX:+ExitOnOutOfMemoryError", "-Dlog4j.configurationFile=LOG4J_PROPERTIES_FILE" ], - "timeoutMillis": 60000 + "timeoutMillis": 60000, + "emitStrategy": { + "type": "DYNAMIC", + "thresholdBytes": 10000 + } }, "plugin-roots": "target/plugins" } diff --git a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/plugins-template.json b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/plugins-template.json index 781e2cfa8..1b78134b4 100644 --- a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/plugins-template.json +++ b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/plugins-template.json @@ -60,7 +60,10 @@ } }, "pipes": { - "directEmitThresholdBytes": 10000, + "emitStrategy": { + "type": "DYNAMIC", + "thresholdBytes": 10000 + }, "emitMaxEstimatedBytes": 100000, "emitWithinMillis": 60000, "numEmitters": 1, diff --git a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/tika-config-opensearch.json b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/tika-config-opensearch.json index b15e7c437..52cf8c096 100644 --- a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/tika-config-opensearch.json +++ b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/tika-config-opensearch.json @@ -101,7 +101,10 @@ } }, "pipes": { - "directEmitThresholdBytes": 10000, + "emitStrategy": { + "type": "DYNAMIC", + "thresholdBytes": 10000 + }, "emitMaxEstimatedBytes": 100000, "emitWithinMillis": 60000, "numEmitters": 1, diff --git a/tika-integration-tests/tika-pipes-s3-integration-tests/src/test/resources/s3/plugins-template.json b/tika-integration-tests/tika-pipes-s3-integration-tests/src/test/resources/s3/plugins-template.json index 1415f1876..d8f79dd0c 100644 --- a/tika-integration-tests/tika-pipes-s3-integration-tests/src/test/resources/s3/plugins-template.json +++ b/tika-integration-tests/tika-pipes-s3-integration-tests/src/test/resources/s3/plugins-template.json @@ -10,7 +10,12 @@ "endpointConfigurationService": "{ENDPOINT_CONFIGURATION_SERVICE}", "pathStyleAccessEnabled": true, "maxConnections": 50, - "throttleSeconds": [30, 120, 600, 1200] + "throttleSeconds": [ + 30, + 120, + 600, + 1200 + ] } } }, @@ -56,7 +61,6 @@ } }, "pipes": { - "directEmitThresholdBytes": 10000, "emitMaxEstimatedBytes": 100000, "emitWithinMillis": 10, "numEmitters": 1, @@ -67,7 +71,11 @@ "-XX:+ExitOnOutOfMemoryError", "-Dlog4j.configurationFile={LOG4J_PROPERTIES_FILE}" ], - "timeoutMillis": 60000 + "timeoutMillis": 60000, + "emitStrategy": { + "type": "DYNAMIC", + "thresholdBytes": 10000 + } }, "plugin-roots": "target/plugins" } diff --git a/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/resources/solr/plugins-template.json b/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/resources/solr/plugins-template.json index 94f1ec99b..864c175bf 100644 --- a/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/resources/solr/plugins-template.json +++ b/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/resources/solr/plugins-template.json @@ -91,7 +91,10 @@ } }, "pipes": { - "directEmitThresholdBytes": 10000, + "emitStrategy": { + "type": "DYNAMIC", + "thresholdBytes": 10000 + }, "emitMaxEstimatedBytes": 100000, "emitWithinMillis": 10, "numEmitters": 1, diff --git a/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/resources/tika-config-solr-urls.json b/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/resources/tika-config-solr-urls.json index 134f77962..1aaf8bfd8 100644 --- a/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/resources/tika-config-solr-urls.json +++ b/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/resources/tika-config-solr-urls.json @@ -41,7 +41,6 @@ } ], "pipes": { - "directEmitThresholdBytes": 10000, "emitMaxEstimatedBytes": 100000, "emitWithinMillis": 10, "numEmitters": 1, @@ -52,7 +51,11 @@ "-XX:+ExitOnOutOfMemoryError", "-Dlog4j.configurationFile={LOG4J_PROPERTIES_FILE}" ], - "timeoutMillis": 60000 + "timeoutMillis": 60000, + "emitStrategy": { + "type": "DYNAMIC", + "thresholdBytes": 10000 + } }, "fetchers": { "file-system-fetcher": { diff --git a/tika-pipes/tika-async-cli/src/test/resources/configs/config-template.json b/tika-pipes/tika-async-cli/src/test/resources/configs/config-template.json index 41db19628..1ce0957ef 100644 --- a/tika-pipes/tika-async-cli/src/test/resources/configs/config-template.json +++ b/tika-pipes/tika-async-cli/src/test/resources/configs/config-template.json @@ -27,7 +27,6 @@ "queueSize": 10000, "numEmitters": 1, "emitIntermediateResults": false, - "directEmitThresholdBytes": 100000, "timeoutMillis": 60000, "startupTimeoutMillis": 240000, "sleepOnStartupTimeoutMillis": 240000, @@ -36,8 +35,15 @@ "maxFilesProcessedPerProcess": 10000, "staleFetcherTimeoutSeconds": 600, "staleFetcherDelaySeconds": 60, - "forkedJvmArgs": ["-Xmx1g", "-XX:+UseG1GC"], - "javaPath": "java" + "forkedJvmArgs": [ + "-Xmx1g", + "-XX:+UseG1GC" + ], + "javaPath": "java", + "emitStrategy": { + "type": "DYNAMIC", + "thresholdBytes": 100000 + } }, "plugin-roots": "PLUGIN_ROOTS" -} \ No newline at end of file +} diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/EmitStrategy.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/EmitStrategy.java new file mode 100644 index 000000000..f9ead0f3b --- /dev/null +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/EmitStrategy.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.core; + +/** + * Strategy for how the forked PipesServer handles emitting data. + * <ul> + * <li>EMIT_ALL: Always emit directly from PipesServer (never pass back to client)</li> + * <li>PASSBACK_ALL: Always pass back to client for batch emitting (never emit directly)</li> + * <li>DYNAMIC: Emit directly if size >= threshold, otherwise pass back to client</li> + * </ul> + */ +public enum EmitStrategy { + EMIT_ALL, + PASSBACK_ALL, + DYNAMIC +} diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/EmitStrategyConfig.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/EmitStrategyConfig.java new file mode 100644 index 000000000..a7576276b --- /dev/null +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/EmitStrategyConfig.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.core; + +import org.apache.tika.exception.TikaConfigException; +import org.apache.tika.exception.TikaException; + +/** + * Configuration for emit strategy in PipesConfig. + * <p> + * Example JSON configuration: + * <pre> + * { + * "pipes": { + * "emitStrategy": { + * "type": "DYNAMIC", + * "thresholdBytes": 100000 + * } + * } + * } + * </pre> + * Or for simpler strategies: + * <pre> + * { + * "pipes": { + * "emitStrategy": { + * "type": "EMIT_ALL" + * } + * } + * } + * </pre> + */ +public class EmitStrategyConfig { + + /** + * Default emit strategy for PipesServer. + * DYNAMIC means the strategy is determined by directEmitThresholdBytes. + */ + public static final EmitStrategy DEFAULT_EMIT_STRATEGY = EmitStrategy.DYNAMIC; + + /** + * Default threshold in bytes for direct emission from PipesServer. + * If an extract is larger than this, it will be emitted + * directly from the forked PipesServer rather than passed back to PipesClient. + * Only used when emitStrategy is DYNAMIC. + */ + public static final long DEFAULT_DIRECT_EMIT_THRESHOLD_BYTES = 100000; + + private EmitStrategy type = DEFAULT_EMIT_STRATEGY; + private Long thresholdBytes = null; + + public EmitStrategyConfig() { + } + + public EmitStrategyConfig(EmitStrategy type) { + this.type = type; + if (type == EmitStrategy.DYNAMIC) { + thresholdBytes = DEFAULT_DIRECT_EMIT_THRESHOLD_BYTES; + } + } + + public EmitStrategyConfig(EmitStrategy type, Long thresholdBytes) throws TikaException { + this.type = type; + this.thresholdBytes = thresholdBytes; + validate(); + } + + /** + * Get the emit strategy type. + * + * @return the emit strategy + */ + public EmitStrategy getType() { + return type; + } + + /** + * Set the emit strategy type. + * + * @param type the emit strategy + */ + public void setType(EmitStrategy type) throws TikaConfigException { + this.type = type; + validate(); + } + + /** + * Get the threshold in bytes for DYNAMIC strategy. + * Only applicable when type is DYNAMIC. + * + * @return the threshold in bytes, or null to use default + */ + public Long getThresholdBytes() { + return thresholdBytes; + } + + /** + * Set the threshold in bytes for DYNAMIC strategy. + * Only applicable when type is DYNAMIC. + * + * @param thresholdBytes the threshold in bytes + */ + public void setThresholdBytes(Long thresholdBytes) throws TikaConfigException { + this.thresholdBytes = thresholdBytes; + validate(); + } + + private void validate() throws TikaConfigException { + if (thresholdBytes != null && + (type == EmitStrategy.EMIT_ALL || type == EmitStrategy.PASSBACK_ALL)) { + throw new TikaConfigException( + "thresholdBytes cannot be set for emit strategy type " + type + + ". thresholdBytes is only applicable for DYNAMIC strategy."); + } + } +} diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/EmitStrategyOverride.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/EmitStrategyOverride.java new file mode 100644 index 000000000..2b460c6e4 --- /dev/null +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/EmitStrategyOverride.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.core; + +/** + * Runtime override for emit strategy that can be set in ParseContext to override + * the default strategy from PipesConfig on a per-request basis. + * <p> + * Example usage: + * <pre> + * ParseContext context = new ParseContext(); + * context.set(EmitStrategyOverride.class, + * new EmitStrategyOverride(EmitStrategy.EMIT_ALL)); + * </pre> + */ +public class EmitStrategyOverride { + + private final EmitStrategy emitStrategy; + private final Long directEmitThresholdBytes; + + /** + * Create an emit strategy override with just the strategy. + * If the strategy is DYNAMIC, the threshold from PipesConfig will be used. + * + * @param emitStrategy the emit strategy to use + */ + public EmitStrategyOverride(EmitStrategy emitStrategy) { + this(emitStrategy, null); + } + + /** + * Create an emit strategy override with both strategy and threshold. + * The threshold is only used when emitStrategy is DYNAMIC. + * + * @param emitStrategy the emit strategy to use + * @param directEmitThresholdBytes the threshold in bytes for DYNAMIC strategy (can be null to use default) + * @throws IllegalArgumentException if thresholdBytes is set for EMIT_ALL or PASSBACK_ALL strategies + */ + public EmitStrategyOverride(EmitStrategy emitStrategy, Long directEmitThresholdBytes) { + if (directEmitThresholdBytes != null && + (emitStrategy == EmitStrategy.EMIT_ALL || emitStrategy == EmitStrategy.PASSBACK_ALL)) { + throw new IllegalArgumentException( + "directEmitThresholdBytes cannot be set for emit strategy " + emitStrategy + + ". Threshold is only applicable for DYNAMIC strategy."); + } + this.emitStrategy = emitStrategy; + this.directEmitThresholdBytes = directEmitThresholdBytes; + } + + /** + * Get the emit strategy. + * + * @return the emit strategy + */ + public EmitStrategy getEmitStrategy() { + return emitStrategy; + } + + /** + * Get the direct emit threshold in bytes for DYNAMIC strategy. + * Returns null if not set, indicating the default from PipesConfig should be used. + * + * @return the threshold in bytes, or null to use default + */ + public Long getDirectEmitThresholdBytes() { + return directEmitThresholdBytes; + } +} diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java index fb89ade96..4e9250ce9 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java @@ -22,7 +22,6 @@ import static org.apache.tika.pipes.api.PipesResult.RESULT_STATUS.TIMEOUT; import static org.apache.tika.pipes.api.PipesResult.RESULT_STATUS.UNSPECIFIED_CRASH; import static org.apache.tika.pipes.core.PipesClient.COMMANDS.ACK; import static org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.FINISHED; -import static org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.OOM; import static org.apache.tika.pipes.core.server.PipesServer.PROCESSING_STATUS.READY; import java.io.Closeable; diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java index 51f572e0a..3ff5e9dbf 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesConfig.java @@ -27,12 +27,6 @@ import org.apache.tika.exception.TikaConfigException; public class PipesConfig { - /** - * Default threshold in bytes for direct emission from PipesServer. - * If an extract is larger than this, it will be emitted - * directly from the forked PipesServer rather than passed back to PipesClient. - */ - public static final long DEFAULT_DIRECT_EMIT_THRESHOLD_BYTES = 100000; public static final long DEFAULT_TIMEOUT_MILLIS = 60000; @@ -50,9 +44,12 @@ public class PipesConfig { public static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; - //if an extract is larger than this, the forked PipesServer should - //emit the extract directly and not send the contents back to the PipesClient - private long directEmitThresholdBytes = DEFAULT_DIRECT_EMIT_THRESHOLD_BYTES; + /** + * The emit strategy configuration determines how the forked PipesServer handles emitting data. + * See {@link EmitStrategyConfig} for details. + */ + private EmitStrategyConfig emitStrategy = new EmitStrategyConfig(EmitStrategyConfig.DEFAULT_EMIT_STRATEGY); + private long timeoutMillis = DEFAULT_TIMEOUT_MILLIS; private long socketTimeoutMs = DEFAULT_SOCKET_TIMEOUT_MS; private long heartbeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL_MS; @@ -224,22 +221,21 @@ public class PipesConfig { } /** - * What is the maximum bytes size per extract that - * will be allowed to be shipped back to the emit queue in the forking process. - * If an extract is too big, skip the emit queue and forward it directly from the - * forked PipesServer. - * If set to <code>0</code>, this will never send an extract back for batch emitting, - * but will always emit the extract directly from the forked PipeServer. - * If set to <code>-1</code>, this will always send the extract back for batch emitting. + * Get the emit strategy configuration. * - * @return the threshold extract size at which to emit directly from the forked PipeServer + * @return the emit strategy configuration */ - public long getDirectEmitThresholdBytes() { - return directEmitThresholdBytes; + public EmitStrategyConfig getEmitStrategy() { + return emitStrategy; } - public void setDirectEmitThresholdBytes(long directEmitThresholdBytes) { - this.directEmitThresholdBytes = directEmitThresholdBytes; + /** + * Set the emit strategy configuration. + * + * @param emitStrategy the emit strategy configuration + */ + public void setEmitStrategy(EmitStrategyConfig emitStrategy) { + this.emitStrategy = emitStrategy; } public long getSleepOnStartupTimeoutMillis() { diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/EmitHandler.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/EmitHandler.java index 687269b2c..57260d2e8 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/EmitHandler.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/EmitHandler.java @@ -36,6 +36,8 @@ import org.apache.tika.pipes.api.PipesResult; import org.apache.tika.pipes.api.emitter.EmitKey; import org.apache.tika.pipes.api.emitter.Emitter; import org.apache.tika.pipes.api.emitter.StreamEmitter; +import org.apache.tika.pipes.core.EmitStrategy; +import org.apache.tika.pipes.core.EmitStrategyOverride; import org.apache.tika.pipes.core.PassbackFilter; import org.apache.tika.pipes.core.emitter.EmitDataImpl; import org.apache.tika.pipes.core.emitter.EmitterManager; @@ -48,12 +50,12 @@ class EmitHandler { private static final Logger LOG = LoggerFactory.getLogger(EmitHandler.class); private final MetadataFilter defaultMetadataFilter; - private final PipesWorker.EMIT_STRATEGY emitStrategy; + private final EmitStrategy emitStrategy; private final EmitterManager emitterManager; private final long directEmitThresholdBytes; - public EmitHandler(MetadataFilter defaultMetadataFilter, PipesWorker.EMIT_STRATEGY emitStrategy, EmitterManager emitterManager, long directEmitThresholdBytes) { + public EmitHandler(MetadataFilter defaultMetadataFilter, EmitStrategy emitStrategy, EmitterManager emitterManager, long directEmitThresholdBytes) { this.defaultMetadataFilter = defaultMetadataFilter; this.emitStrategy = emitStrategy; this.emitterManager = emitterManager; @@ -77,7 +79,7 @@ class EmitHandler { t.setEmitKey(emitKey); } EmitDataImpl emitDataTuple = new EmitDataImpl(t.getEmitKey().getEmitKey(), parseData.getMetadataList(), stack); - if (shouldEmit(embeddedDocumentBytesConfig, parseData, emitDataTuple)) { + if (shouldEmit(embeddedDocumentBytesConfig, parseData, emitDataTuple, parseContext)) { return emit(t.getId(), emitKey, embeddedDocumentBytesConfig.isExtractEmbeddedDocumentBytes(), parseData, stack, parseContext); } else { @@ -153,16 +155,27 @@ class EmitHandler { } - private boolean shouldEmit(EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig, MetadataListAndEmbeddedBytes parseData, EmitDataImpl emitDataTuple) { - if (emitStrategy == PipesWorker.EMIT_STRATEGY.EMIT_ALL) { + private boolean shouldEmit(EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig, MetadataListAndEmbeddedBytes parseData, EmitDataImpl emitDataTuple, ParseContext parseContext) { + EmitStrategy strategy = emitStrategy; + long thresholdBytes = directEmitThresholdBytes; + + EmitStrategyOverride overrideConfig = parseContext.get(EmitStrategyOverride.class); + if (overrideConfig != null) { + strategy = overrideConfig.getEmitStrategy(); + if (overrideConfig.getDirectEmitThresholdBytes() != null) { + thresholdBytes = overrideConfig.getDirectEmitThresholdBytes(); + } + } + + if (strategy == EmitStrategy.EMIT_ALL) { return true; } else if (embeddedDocumentBytesConfig.isExtractEmbeddedDocumentBytes() && parseData.toBePackagedForStreamEmitter()) { return true; - } else if (emitStrategy == PipesWorker.EMIT_STRATEGY.PASSBACK_ALL) { + } else if (strategy == EmitStrategy.PASSBACK_ALL) { return false; - } else if (emitStrategy == PipesWorker.EMIT_STRATEGY.DYNAMIC) { - if (emitDataTuple.getEstimatedSizeBytes() >= directEmitThresholdBytes) { + } else if (strategy == EmitStrategy.DYNAMIC) { + if (emitDataTuple.getEstimatedSizeBytes() >= thresholdBytes) { return true; } } diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java index 949dcfea4..85636f0b3 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesServer.java @@ -66,6 +66,8 @@ import org.apache.tika.parser.DigestingParser; import org.apache.tika.parser.RecursiveParserWrapper; import org.apache.tika.pipes.api.FetchEmitTuple; import org.apache.tika.pipes.api.PipesResult; +import org.apache.tika.pipes.core.EmitStrategy; +import org.apache.tika.pipes.core.EmitStrategyConfig; import org.apache.tika.pipes.core.PipesClient; import org.apache.tika.pipes.core.PipesConfig; import org.apache.tika.pipes.core.emitter.EmitterManager; @@ -149,7 +151,7 @@ public class PipesServer implements AutoCloseable { private EmitterManager emitterManager; private final ExecutorService executorService = Executors.newSingleThreadExecutor(); private final ExecutorCompletionService<PipesResult> executorCompletionService = new ExecutorCompletionService<>(executorService); - private final PipesWorker.EMIT_STRATEGY emitStrategy; + private final EmitStrategy emitStrategy; public static PipesServer load(int port, Path tikaConfigPath) throws Exception { String pipesClientId = System.getProperty("pipesClientId", "unknown"); @@ -226,13 +228,7 @@ public class PipesServer implements AutoCloseable { LOG.error(msg + " Proceeding because tika.pipes.allowInvalidHeartbeat=true"); } - if (pipesConfig.getDirectEmitThresholdBytes() == 0) { - emitStrategy = PipesWorker.EMIT_STRATEGY.EMIT_ALL; - } else if (pipesConfig.getDirectEmitThresholdBytes() < 0) { - emitStrategy = PipesWorker.EMIT_STRATEGY.PASSBACK_ALL; - } else { - emitStrategy = PipesWorker.EMIT_STRATEGY.DYNAMIC; - } + emitStrategy = pipesConfig.getEmitStrategy().getType(); } @@ -330,7 +326,9 @@ public class PipesServer implements AutoCloseable { private PipesWorker getPipesWorker(ArrayBlockingQueue<Metadata> intermediateResult, FetchEmitTuple fetchEmitTuple, CountDownLatch countDownLatch) { FetchHandler fetchHandler = new FetchHandler(fetcherManager); ParseHandler parseHandler = new ParseHandler(detector, digester, intermediateResult, countDownLatch, autoDetectParser, rMetaParser); - EmitHandler emitHandler = new EmitHandler(defaultMetadataFilter, emitStrategy, emitterManager, pipesConfig.getDirectEmitThresholdBytes()); + Long thresholdBytes = pipesConfig.getEmitStrategy().getThresholdBytes(); + long threshold = (thresholdBytes != null) ? thresholdBytes : EmitStrategyConfig.DEFAULT_DIRECT_EMIT_THRESHOLD_BYTES; + EmitHandler emitHandler = new EmitHandler(defaultMetadataFilter, emitStrategy, emitterManager, threshold); PipesWorker pipesWorker = new PipesWorker(fetchEmitTuple, autoDetectParser, emitterManager, fetchHandler, parseHandler, emitHandler); return pipesWorker; } @@ -455,14 +453,8 @@ public class PipesServer implements AutoCloseable { //TODO allowed named configurations in tika config this.fetcherManager = FetcherManager.load(tikaPluginManager, tikaJsonConfig); - //skip initialization of the emitters if emitting - //from the pipesserver is turned off. - if (pipesConfig.getDirectEmitThresholdBytes() > -1) { - this.emitterManager = EmitterManager.load(tikaPluginManager, tikaJsonConfig); - } else { - LOG.debug("'directEmitThresholdBytes' < 0. Not initializing emitters in PipesServer"); - this.emitterManager = null; - } + // Always initialize emitters to support runtime overrides via ParseContext + this.emitterManager = EmitterManager.load(tikaPluginManager, tikaJsonConfig); this.autoDetectParser = (AutoDetectParser) tikaLoader.loadAutoDetectParser(); if (autoDetectParser.getAutoDetectParserConfig() .getDigesterFactory() != null) { diff --git a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesWorker.java b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesWorker.java index daf364990..8d15c92a0 100644 --- a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesWorker.java +++ b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/server/PipesWorker.java @@ -55,12 +55,6 @@ class PipesWorker implements Callable<PipesResult> { private static final Logger LOG = LoggerFactory.getLogger(PipesWorker.class); - public enum EMIT_STRATEGY { - EMIT_ALL, - PASSBACK_ALL, - DYNAMIC - } - private final FetchEmitTuple fetchEmitTuple; private final AutoDetectParser autoDetectParser; private final EmitterManager emitterManager; diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-bad-class.json b/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-bad-class.json index 51e22b0ad..43f834938 100644 --- a/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-bad-class.json +++ b/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-bad-class.json @@ -9,8 +9,13 @@ "pipes": { "numClients": 1, "timeoutMillis": 5000, - "forkedJvmArgs": ["-Xmx256m"], - "directEmitThresholdBytes": 1000000 + "forkedJvmArgs": [ + "-Xmx256m" + ], + "emitStrategy": { + "type": "DYNAMIC", + "thresholdBytes": 1000000 + } }, "plugin-roots": "PLUGINS_PATHS" } diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-bad-java-path.json b/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-bad-java-path.json index cf7f051c2..8b623dfec 100644 --- a/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-bad-java-path.json +++ b/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-bad-java-path.json @@ -10,8 +10,13 @@ "numClients": 1, "timeoutMillis": 5000, "javaPath": "thisIsntJava", - "forkedJvmArgs": ["-Xmx256m"], - "directEmitThresholdBytes": 1000000 + "forkedJvmArgs": [ + "-Xmx256m" + ], + "emitStrategy": { + "type": "DYNAMIC", + "thresholdBytes": 1000000 + } }, "plugin-roots": "PLUGINS_PATHS" } diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-bad-jvm-args.json b/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-bad-jvm-args.json index 5c63904cd..06e609ae8 100644 --- a/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-bad-jvm-args.json +++ b/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-bad-jvm-args.json @@ -9,8 +9,14 @@ "pipes": { "numClients": 1, "timeoutMillis": 5000, - "forkedJvmArgs": ["-Xmx256m", "-ThisIsntAThing"], - "directEmitThresholdBytes": 1000000 + "forkedJvmArgs": [ + "-Xmx256m", + "-ThisIsntAThing" + ], + "emitStrategy": { + "type": "DYNAMIC", + "thresholdBytes": 1000000 + } }, "plugin-roots": "PLUGINS_PATHS" } diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-basic.json b/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-basic.json index a8fbe4f10..401d9cee6 100644 --- a/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-basic.json +++ b/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-basic.json @@ -43,7 +43,10 @@ "timeoutMillis": 5000, "emitIntermediateResults": EMIT_INTERMEDIATE_RESULTS, "forkedJvmArgs": ["-Xmx512m"], - "directEmitThresholdBytes": 1000000 + "emitStrategy": { + "type": "DYNAMIC", + "thresholdBytes": 1000000 + } }, "auto-detect-parser": { "spoolToDisk": 1000000, diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-crashing-detector.json b/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-crashing-detector.json index 61fcd5032..9050b06bd 100644 --- a/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-crashing-detector.json +++ b/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-crashing-detector.json @@ -19,11 +19,18 @@ "pipes": { "numClients": 1, "timeoutMillis": 5000, - "forkedJvmArgs": ["-Xmx256m"], - "directEmitThresholdBytes": 1000000 + "forkedJvmArgs": [ + "-Xmx256m" + ], + "emitStrategy": { + "type": "DYNAMIC", + "thresholdBytes": 1000000 + } }, "detectors": [ - {"org.apache.tika.pipes.core.CrashingDetector": {}} + { + "org.apache.tika.pipes.core.CrashingDetector": {} + } ], "plugin-roots": "PLUGINS_PATHS" } diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-emit-all.json b/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-emit-all.json index 6389562eb..af1652cd0 100644 --- a/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-emit-all.json +++ b/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-emit-all.json @@ -19,8 +19,12 @@ "pipes": { "numClients": 1, "timeoutMillis": 60000, - "forkedJvmArgs": ["-Xmx256m"], - "directEmitThresholdBytes": 0 + "forkedJvmArgs": [ + "-Xmx256m" + ], + "emitStrategy": { + "type": "EMIT_ALL" + } }, "plugin-roots": "PLUGINS_PATHS" } diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-passback.json b/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-passback.json index 3f81bdd19..8ab503da5 100644 --- a/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-passback.json +++ b/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-passback.json @@ -43,7 +43,9 @@ "timeoutMillis": 5000, "emitIntermediateResults": EMIT_INTERMEDIATE_RESULTS, "forkedJvmArgs": ["-Xmx512m"], - "directEmitThresholdBytes": 0 + "emitStrategy": { + "type": "EMIT_ALL" + } }, "auto-detect-parser": { "spoolToDisk": 1000000, diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-truncate.json b/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-truncate.json index e13838e1a..9ae654e01 100644 --- a/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-truncate.json +++ b/tika-pipes/tika-pipes-integration-tests/src/test/resources/configs/tika-config-truncate.json @@ -43,7 +43,10 @@ "timeoutMillis": 5000, "emitIntermediateResults": EMIT_INTERMEDIATE_RESULTS, "forkedJvmArgs": ["-Xmx512m"], - "directEmitThresholdBytes": 1000000 + "emitStrategy": { + "type": "DYNAMIC", + "thresholdBytes": 1000000 + } }, "auto-detect-parser": { "spoolToDisk": 1000000, diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java index 8c1530511..45c70df52 100644 --- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java +++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/PipesResource.java @@ -39,6 +39,8 @@ import org.apache.tika.metadata.Metadata; import org.apache.tika.metadata.TikaCoreProperties; import org.apache.tika.pipes.api.FetchEmitTuple; import org.apache.tika.pipes.api.PipesResult; +import org.apache.tika.pipes.core.EmitStrategy; +import org.apache.tika.pipes.core.EmitStrategyConfig; import org.apache.tika.pipes.core.PipesConfig; import org.apache.tika.pipes.core.PipesException; import org.apache.tika.pipes.core.PipesParser; @@ -55,13 +57,12 @@ public class PipesResource { public PipesResource(java.nio.file.Path tikaConfig) throws TikaConfigException, IOException { TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(tikaConfig); PipesConfig pipesConfig = PipesConfig.load(tikaJsonConfig, tikaConfig); - //this has to be zero. everything must be emitted through the PipesServer - long maxEmit = pipesConfig.getDirectEmitThresholdBytes(); - if (maxEmit != 0) { - pipesConfig.setDirectEmitThresholdBytes(0); - if (maxEmit != PipesConfig.DEFAULT_DIRECT_EMIT_THRESHOLD_BYTES) { - LOG.warn("resetting max for emit batch to 0"); + // Everything must be emitted through the PipesServer (EMIT_ALL strategy) + if (pipesConfig.getEmitStrategy().getType() != EmitStrategy.EMIT_ALL) { + if (pipesConfig.getEmitStrategy().getType() != EmitStrategyConfig.DEFAULT_EMIT_STRATEGY) { + LOG.warn("resetting emit strategy to EMIT_ALL for pipes endpoint"); } + pipesConfig.setEmitStrategy(new EmitStrategyConfig(EmitStrategy.EMIT_ALL)); } this.pipesParser = new PipesParser(pipesConfig); } diff --git a/tika-server/tika-server-core/src/test/resources/configs/cxf-test-base-template.json b/tika-server/tika-server-core/src/test/resources/configs/cxf-test-base-template.json index 475441885..28b89934f 100644 --- a/tika-server/tika-server-core/src/test/resources/configs/cxf-test-base-template.json +++ b/tika-server/tika-server-core/src/test/resources/configs/cxf-test-base-template.json @@ -40,7 +40,9 @@ "forkedJvmArgs": [ "-Xmx512m" ], - "directEmitThresholdBytes": 0 + "emitStrategy": { + "type": "EMIT_ALL" + } }, "auto-detect-parser": { "spoolToDisk": 1000000, diff --git a/tika-server/tika-server-standard/src/test/resources/configs/cxf-test-base-template.json b/tika-server/tika-server-standard/src/test/resources/configs/cxf-test-base-template.json index 475441885..28b89934f 100644 --- a/tika-server/tika-server-standard/src/test/resources/configs/cxf-test-base-template.json +++ b/tika-server/tika-server-standard/src/test/resources/configs/cxf-test-base-template.json @@ -40,7 +40,9 @@ "forkedJvmArgs": [ "-Xmx512m" ], - "directEmitThresholdBytes": 0 + "emitStrategy": { + "type": "EMIT_ALL" + } }, "auto-detect-parser": { "spoolToDisk": 1000000,
