This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tika.git
commit b7499a3ef2276ebc0c910d27b198f46d0b2d2f8d Author: tballison <talli...@apache.org> AuthorDate: Wed Nov 16 11:44:34 2022 -0500 improve robustness of AsyncProcessor --- .../apache/tika/pipes/async/AsyncProcessor.java | 64 +++++++++++----------- .../apache/tika/async/cli/TikaAsyncCLITest.java | 44 +++++++++++++++ .../src/test/resources/tika-config-broken.xml | 36 ++++++++++++ 3 files changed, 113 insertions(+), 31 deletions(-) diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java index e86aa71b3..476e4df58 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java @@ -82,41 +82,43 @@ public class AsyncProcessor implements Closeable { asyncConfig.getNumClients() + asyncConfig.getNumEmitters() + 1); this.executorCompletionService = new ExecutorCompletionService<>(executorService); - if (!tikaConfigPath.toAbsolutePath() - .equals(asyncConfig.getTikaConfig().toAbsolutePath())) { - LOG.warn("TikaConfig for AsyncProcessor ({}) is different " + - "from TikaConfig for workers ({}). If this is intended," + - " please ignore this warning.", - tikaConfigPath.toAbsolutePath(), - asyncConfig.getTikaConfig().toAbsolutePath() - ); - } - this.executorCompletionService.submit(() -> { - while (true) { - try { - Thread.sleep(500); - checkActive(); - } catch (InterruptedException e) { - return WATCHER_FUTURE_CODE; + try { + if (!tikaConfigPath.toAbsolutePath().equals(asyncConfig.getTikaConfig().toAbsolutePath())) { + LOG.warn("TikaConfig for AsyncProcessor ({}) is different " + + "from TikaConfig for workers ({}). If this is intended," + + " please ignore this warning.", tikaConfigPath.toAbsolutePath(), + asyncConfig.getTikaConfig().toAbsolutePath()); + } + this.executorCompletionService.submit(() -> { + while (true) { + try { + Thread.sleep(500); + checkActive(); + } catch (InterruptedException e) { + return WATCHER_FUTURE_CODE; + } } + }); + //this is run in a daemon thread + if (pipesIterator != null && (pipesIterator instanceof TotalCounter)) { + LOG.debug("going to total counts"); + startCounter((TotalCounter) pipesIterator); } - }); - //this is run in a daemon thread - if (pipesIterator != null && - (pipesIterator instanceof TotalCounter)) { - LOG.debug("going to total counts"); - startCounter((TotalCounter) pipesIterator); - } - for (int i = 0; i < asyncConfig.getNumClients(); i++) { - executorCompletionService.submit(new FetchEmitWorker(asyncConfig, fetchEmitTuples, - emitData)); - } + for (int i = 0; i < asyncConfig.getNumClients(); i++) { + executorCompletionService.submit( + new FetchEmitWorker(asyncConfig, fetchEmitTuples, emitData)); + } - EmitterManager emitterManager = EmitterManager.load(asyncConfig.getTikaConfig()); - for (int i = 0; i < asyncConfig.getNumEmitters(); i++) { - executorCompletionService.submit(new AsyncEmitter(asyncConfig, emitData, - emitterManager)); + EmitterManager emitterManager = EmitterManager.load(asyncConfig.getTikaConfig()); + for (int i = 0; i < asyncConfig.getNumEmitters(); i++) { + executorCompletionService.submit( + new AsyncEmitter(asyncConfig, emitData, emitterManager)); + } + } catch (Exception e) { + executorService.shutdownNow(); + asyncConfig.getPipesReporter().close(); + throw e; } } diff --git a/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaAsyncCLITest.java b/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaAsyncCLITest.java new file mode 100644 index 000000000..a06f71c39 --- /dev/null +++ b/tika-pipes/tika-async-cli/src/test/java/org/apache/tika/async/cli/TikaAsyncCLITest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.async.cli; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.nio.file.Path; +import java.nio.file.Paths; + +import org.junit.jupiter.api.Test; + +import org.apache.tika.exception.TikaConfigException; +import org.apache.tika.utils.ProcessUtils; + +public class TikaAsyncCLITest { + @Test + public void testCrash() throws Exception { + Path config = getPath("/tika-config-broken.xml"); + assertThrows(TikaConfigException.class, + () -> TikaAsyncCLI.main( + new String[] { + ProcessUtils.escapeCommandLine(config.toAbsolutePath().toString()) + }) + ); + } + + private Path getPath(String file) throws Exception { + return Paths.get(this.getClass().getResource(file).toURI()); + } +} diff --git a/tika-pipes/tika-async-cli/src/test/resources/tika-config-broken.xml b/tika-pipes/tika-async-cli/src/test/resources/tika-config-broken.xml new file mode 100644 index 000000000..75b10da92 --- /dev/null +++ b/tika-pipes/tika-async-cli/src/test/resources/tika-config-broken.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<properties> + <fetchers> + <fetcher class="org.apache.tika.pipes.fetcher.s3.S3Fetcher"> + <params> + <name>s3</name> + <region>us-east-1</region> + <profile><!-- fill in here --></profile> + </params> + </fetcher> + </fetchers> + <pipesIterator class="org.apache.tika.pipes.pipesiterator.fs.FileSystemPipesIterator"> + <params> + <fetcherName>fs</fetcherName> + <basePath>basePath</basePath> + </params> + </pipesIterator> +</properties> \ No newline at end of file