This is an automated email from the ASF dual-hosted git repository. rec pushed a commit to branch bugfix/UIMA-6391-CpePipeline-should-kill-CPE-if-reader-throws-exception in repository https://gitbox.apache.org/repos/asf/uima-uimafit.git
commit 05c4d371ee4503a03b5d0d50f044fefa6e97eca9 Author: Richard Eckart de Castilho <[email protected]> AuthorDate: Mon Feb 21 13:52:35 2022 +0100 [UIMA-6391] CpePipeline should kill CPE if reader throws exception - Changed CpePipeline to kill engine when any exception is seen - Added test --- uimafit-core/pom.xml | 8 +- uimafit-cpe/pom.xml | 13 +- .../java/org/apache/uima/fit/cpe/CpePipeline.java | 46 ++++--- .../fit/cpe/CpePipelineFailureHandlingTest.java | 88 ++++++++++++ .../org/apache/uima/fit/cpe/CpePipelineTest.java | 153 +++++++++------------ 5 files changed, 198 insertions(+), 110 deletions(-) diff --git a/uimafit-core/pom.xml b/uimafit-core/pom.xml index 0a42580..1f34788 100644 --- a/uimafit-core/pom.xml +++ b/uimafit-core/pom.xml @@ -17,7 +17,9 @@ specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <artifactId>uimafit-core</artifactId> <packaging>jar</packaging> @@ -79,8 +81,8 @@ <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <scope>test</scope> - </dependency> - <dependency> + </dependency> + <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <scope>test</scope> diff --git a/uimafit-cpe/pom.xml b/uimafit-cpe/pom.xml index 9295af3..d00a15b 100644 --- a/uimafit-cpe/pom.xml +++ b/uimafit-cpe/pom.xml @@ -17,7 +17,9 @@ specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.uima</groupId> @@ -41,11 +43,16 @@ <groupId>org.apache.uima</groupId> <artifactId>uimaj-core</artifactId> </dependency> - + + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <scope>test</scope> - </dependency> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/uimafit-cpe/src/main/java/org/apache/uima/fit/cpe/CpePipeline.java b/uimafit-cpe/src/main/java/org/apache/uima/fit/cpe/CpePipeline.java index 30bba23..a488edc 100644 --- a/uimafit-cpe/src/main/java/org/apache/uima/fit/cpe/CpePipeline.java +++ b/uimafit-cpe/src/main/java/org/apache/uima/fit/cpe/CpePipeline.java @@ -127,33 +127,38 @@ public final class CpePipeline { CollectionProcessingEngine engine = builder.createCpe(status); engine.process(); - try { + while (status.isProcessing) { synchronized (status) { - while (status.isProcessing) { + try { status.wait(); + } catch (InterruptedException e) { + // Do nothing } - } - } catch (InterruptedException e) { - // Do nothing - } - if (status.exceptions.size() > 0) { - throw new AnalysisEngineProcessException(status.exceptions.get(0)); + if (status.exceptions.size() > 0) { + if (engine.isProcessing()) { + engine.kill(); + } + + throw new AnalysisEngineProcessException(status.exceptions.get(0)); + } + } } } - + private static boolean mayContainCasMultiplier(final AnalysisEngineDescription desc) { if (desc.isPrimitive()) { - return desc.getAnalysisEngineMetaData().getOperationalProperties().isMultipleDeploymentAllowed(); + return desc.getAnalysisEngineMetaData().getOperationalProperties() + .isMultipleDeploymentAllowed(); } - + for (MetaDataObject mdo : desc.getDelegateAnalysisEngineSpecifiersWithImports().values()) { if (mdo instanceof Import) { // The imported delegate might be a CAS multiplier, but we cannot really tell without // risking an exception. So let's just assume it does. return true; } - + if (mdo instanceof AnalysisEngineDescription) { AnalysisEngineDescription aed = (AnalysisEngineDescription) mdo; if (aed.getAnalysisEngineMetaData().getOperationalProperties().getOutputsNewCASes()) { @@ -161,7 +166,7 @@ public final class CpePipeline { } } } - + return false; } @@ -176,15 +181,18 @@ public final class CpePipeline { if (arg1.isException()) { for (Exception e : arg1.getExceptions()) { exceptions.add(e); + synchronized (this) { + notify(); + } } } } @Override public void aborted() { - synchronized (this) { - if (isProcessing) { - isProcessing = false; + if (isProcessing) { + isProcessing = false; + synchronized (this) { notify(); } } @@ -197,9 +205,9 @@ public final class CpePipeline { @Override public void collectionProcessComplete() { - synchronized (this) { - if (isProcessing) { - isProcessing = false; + if (isProcessing) { + isProcessing = false; + synchronized (this) { notify(); } } diff --git a/uimafit-cpe/src/test/java/org/apache/uima/fit/cpe/CpePipelineFailureHandlingTest.java b/uimafit-cpe/src/test/java/org/apache/uima/fit/cpe/CpePipelineFailureHandlingTest.java new file mode 100644 index 0000000..cc63687 --- /dev/null +++ b/uimafit-cpe/src/test/java/org/apache/uima/fit/cpe/CpePipelineFailureHandlingTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.uima.fit.cpe; + +import static org.apache.uima.fit.cpe.CpePipeline.runPipeline; +import static org.apache.uima.fit.factory.AnalysisEngineFactory.createEngineDescription; +import static org.apache.uima.fit.factory.CollectionReaderFactory.createReaderDescription; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.uima.analysis_engine.AnalysisEngineProcessException; +import org.apache.uima.collection.CollectionException; +import org.apache.uima.fit.component.JCasAnnotator_ImplBase; +import org.apache.uima.fit.component.JCasCollectionReader_ImplBase; +import org.apache.uima.fit.descriptor.ConfigurationParameter; +import org.apache.uima.jcas.JCas; +import org.apache.uima.util.Progress; +import org.junit.Test; + +public class CpePipelineFailureHandlingTest { + private static AtomicInteger processed = new AtomicInteger(0); + + @Test + public void test() throws Exception { + int failAfter = 50; + try { + runPipeline( // + createReaderDescription(Reader.class, // + "failAfter", failAfter), + createEngineDescription(Annotator.class)); + } catch (Exception e) { + // Ignore + } + + assertThat(processed.get()) + .as("CPE stopped processing after reader threw an exception") + .isEqualTo(failAfter); + } + + public static class Reader extends JCasCollectionReader_ImplBase { + + @ConfigurationParameter + private int failAfter; + + @Override + public Progress[] getProgress() { + return null; + } + + @Override + public boolean hasNext() throws IOException, CollectionException { + return processed.get() < failAfter * 2; + } + + @Override + public void getNext(JCas jCas) throws IOException, CollectionException { + if (processed.incrementAndGet() >= failAfter) { + throw new CollectionException(); + } + } + } + + public static class Annotator extends JCasAnnotator_ImplBase { + + @Override + public void process(JCas jCas) throws AnalysisEngineProcessException { + // Nothing to do + } + } +} diff --git a/uimafit-cpe/src/test/java/org/apache/uima/fit/cpe/CpePipelineTest.java b/uimafit-cpe/src/test/java/org/apache/uima/fit/cpe/CpePipelineTest.java index 73a904d..dac0f75 100644 --- a/uimafit-cpe/src/test/java/org/apache/uima/fit/cpe/CpePipelineTest.java +++ b/uimafit-cpe/src/test/java/org/apache/uima/fit/cpe/CpePipelineTest.java @@ -18,103 +18,86 @@ */ package org.apache.uima.fit.cpe; +import static org.apache.uima.fit.cpe.CpePipeline.runPipeline; +import static org.apache.uima.fit.factory.AnalysisEngineFactory.createEngineDescription; +import static org.apache.uima.fit.factory.CollectionReaderFactory.createReaderDescription; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertTrue; import java.io.IOException; -import junit.framework.Assert; - import org.apache.uima.analysis_engine.AnalysisEngineProcessException; import org.apache.uima.cas.TypeSystem; import org.apache.uima.collection.CollectionException; import org.apache.uima.fit.component.JCasAnnotator_ImplBase; import org.apache.uima.fit.component.JCasCollectionReader_ImplBase; -import org.apache.uima.fit.factory.AnalysisEngineFactory; -import org.apache.uima.fit.factory.CollectionReaderFactory; import org.apache.uima.jcas.JCas; import org.apache.uima.resource.ResourceInitializationException; import org.apache.uima.util.Progress; import org.junit.Test; -/** - */ -public class CpePipelineTest -{ - @Test - public void test() - throws Exception - { - CpePipeline.runPipeline(CollectionReaderFactory.createReaderDescription(Reader.class), - AnalysisEngineFactory.createEngineDescription(Annotator.class), - AnalysisEngineFactory.createEngineDescription(Writer.class)); - Assert.assertEquals(MARKER, Writer.MARKER_SEEN); - } - - public static final String TEXT = "Some text"; - public static final String MARKER = "annotator has seen this document"; - - public static class Reader - extends JCasCollectionReader_ImplBase - { - - private int size = 1; - - private int current = 0; - - private boolean initTypeSystemCalled = false; - - @Override - public void typeSystemInit(TypeSystem aTypeSystem) - throws ResourceInitializationException - { - initTypeSystemCalled = true; - } - - public Progress[] getProgress() - { - return null; - } - - public boolean hasNext() - throws IOException, CollectionException - { - assertTrue("typeSystemInit() has not been called", initTypeSystemCalled); - return this.current < this.size; - } - - @Override - public void getNext(JCas jCas) - throws IOException, CollectionException - { - jCas.setDocumentText(TEXT); - this.current += 1; - } - - } - - public static class Annotator - extends JCasAnnotator_ImplBase - { - - @Override - public void process(JCas jCas) - throws AnalysisEngineProcessException - { - jCas.setDocumentLanguage(MARKER); - } - } - - public static class Writer - extends JCasAnnotator_ImplBase - { - - public static String MARKER_SEEN; - - @Override - public void process(JCas jCas) - throws AnalysisEngineProcessException - { - MARKER_SEEN = jCas.getDocumentLanguage(); - } - } +public class CpePipelineTest { + @Test + public void test() throws Exception { + runPipeline( // + createReaderDescription(Reader.class), + createEngineDescription(Annotator.class), + createEngineDescription(Writer.class)); + + assertThat(Writer.MARKER_SEEN).isEqualTo(MARKER); + } + + public static final String TEXT = "Some text"; + + public static final String MARKER = "annotator has seen this document"; + + public static class Reader extends JCasCollectionReader_ImplBase { + + private int size = 1; + + private int current = 0; + + private boolean initTypeSystemCalled = false; + + @Override + public void typeSystemInit(TypeSystem aTypeSystem) throws ResourceInitializationException { + initTypeSystemCalled = true; + } + + @Override + public Progress[] getProgress() { + return null; + } + + @Override + public boolean hasNext() throws IOException, CollectionException { + assertTrue("typeSystemInit() has not been called", initTypeSystemCalled); + return this.current < this.size; + } + + @Override + public void getNext(JCas jCas) throws IOException, CollectionException { + jCas.setDocumentText(TEXT); + this.current += 1; + } + + } + + public static class Annotator extends JCasAnnotator_ImplBase { + + @Override + public void process(JCas jCas) throws AnalysisEngineProcessException { + jCas.setDocumentLanguage(MARKER); + } + } + + public static class Writer extends JCasAnnotator_ImplBase { + + public static String MARKER_SEEN; + + @Override + public void process(JCas jCas) throws AnalysisEngineProcessException { + MARKER_SEEN = jCas.getDocumentLanguage(); + } + } }
