This is an automated email from the ASF dual-hosted git repository.

rec pushed a commit to branch 
bugfix/UIMA-6391-CpePipeline-should-kill-CPE-if-reader-throws-exception
in repository https://gitbox.apache.org/repos/asf/uima-uimafit.git

commit 05c4d371ee4503a03b5d0d50f044fefa6e97eca9
Author: Richard Eckart de Castilho <[email protected]>
AuthorDate: Mon Feb 21 13:52:35 2022 +0100

    [UIMA-6391] CpePipeline should kill CPE if reader throws exception
    
    - Changed CpePipeline to kill engine when any exception is seen
    - Added test
---
 uimafit-core/pom.xml                               |   8 +-
 uimafit-cpe/pom.xml                                |  13 +-
 .../java/org/apache/uima/fit/cpe/CpePipeline.java  |  46 ++++---
 .../fit/cpe/CpePipelineFailureHandlingTest.java    |  88 ++++++++++++
 .../org/apache/uima/fit/cpe/CpePipelineTest.java   | 153 +++++++++------------
 5 files changed, 198 insertions(+), 110 deletions(-)

diff --git a/uimafit-core/pom.xml b/uimafit-core/pom.xml
index 0a42580..1f34788 100644
--- a/uimafit-core/pom.xml
+++ b/uimafit-core/pom.xml
@@ -17,7 +17,9 @@
   specific language governing permissions and limitations
   under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
   <artifactId>uimafit-core</artifactId>
   <packaging>jar</packaging>
@@ -79,8 +81,8 @@
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-simple</artifactId>
       <scope>test</scope>
-    </dependency>    
-    <dependency>    
+    </dependency>
+    <dependency>
       <groupId>org.assertj</groupId>
       <artifactId>assertj-core</artifactId>
       <scope>test</scope>
diff --git a/uimafit-cpe/pom.xml b/uimafit-cpe/pom.xml
index 9295af3..d00a15b 100644
--- a/uimafit-cpe/pom.xml
+++ b/uimafit-cpe/pom.xml
@@ -17,7 +17,9 @@
   specific language governing permissions and limitations
   under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.uima</groupId>
@@ -41,11 +43,16 @@
       <groupId>org.apache.uima</groupId>
       <artifactId>uimaj-core</artifactId>
     </dependency>
-    
+
+    <dependency>
+      <groupId>org.assertj</groupId>
+      <artifactId>assertj-core</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-simple</artifactId>
       <scope>test</scope>
-    </dependency>    
+    </dependency>
   </dependencies>
 </project>
\ No newline at end of file
diff --git a/uimafit-cpe/src/main/java/org/apache/uima/fit/cpe/CpePipeline.java 
b/uimafit-cpe/src/main/java/org/apache/uima/fit/cpe/CpePipeline.java
index 30bba23..a488edc 100644
--- a/uimafit-cpe/src/main/java/org/apache/uima/fit/cpe/CpePipeline.java
+++ b/uimafit-cpe/src/main/java/org/apache/uima/fit/cpe/CpePipeline.java
@@ -127,33 +127,38 @@ public final class CpePipeline {
     CollectionProcessingEngine engine = builder.createCpe(status);
 
     engine.process();
-    try {
+    while (status.isProcessing) {
       synchronized (status) {
-        while (status.isProcessing) {
+        try {
           status.wait();
+        } catch (InterruptedException e) {
+          // Do nothing
         }
-      }
-    } catch (InterruptedException e) {
-      // Do nothing
-    }
 
-    if (status.exceptions.size() > 0) {
-      throw new AnalysisEngineProcessException(status.exceptions.get(0));
+        if (status.exceptions.size() > 0) {
+          if (engine.isProcessing()) {
+            engine.kill();
+          }
+
+          throw new AnalysisEngineProcessException(status.exceptions.get(0));
+        }
+      }
     }
   }
-  
+
   private static boolean mayContainCasMultiplier(final 
AnalysisEngineDescription desc) {
     if (desc.isPrimitive()) {
-      return 
desc.getAnalysisEngineMetaData().getOperationalProperties().isMultipleDeploymentAllowed();
+      return desc.getAnalysisEngineMetaData().getOperationalProperties()
+              .isMultipleDeploymentAllowed();
     }
-    
+
     for (MetaDataObject mdo : 
desc.getDelegateAnalysisEngineSpecifiersWithImports().values()) {
       if (mdo instanceof Import) {
         // The imported delegate might be a CAS multiplier, but we cannot 
really tell without
         // risking an exception. So let's just assume it does.
         return true;
       }
-      
+
       if (mdo instanceof AnalysisEngineDescription) {
         AnalysisEngineDescription aed = (AnalysisEngineDescription) mdo;
         if 
(aed.getAnalysisEngineMetaData().getOperationalProperties().getOutputsNewCASes())
 {
@@ -161,7 +166,7 @@ public final class CpePipeline {
         }
       }
     }
-    
+
     return false;
   }
 
@@ -176,15 +181,18 @@ public final class CpePipeline {
       if (arg1.isException()) {
         for (Exception e : arg1.getExceptions()) {
           exceptions.add(e);
+          synchronized (this) {
+            notify();
+          }
         }
       }
     }
 
     @Override
     public void aborted() {
-      synchronized (this) {
-        if (isProcessing) {
-          isProcessing = false;
+      if (isProcessing) {
+        isProcessing = false;
+        synchronized (this) {
           notify();
         }
       }
@@ -197,9 +205,9 @@ public final class CpePipeline {
 
     @Override
     public void collectionProcessComplete() {
-      synchronized (this) {
-        if (isProcessing) {
-          isProcessing = false;
+      if (isProcessing) {
+        isProcessing = false;
+        synchronized (this) {
           notify();
         }
       }
diff --git 
a/uimafit-cpe/src/test/java/org/apache/uima/fit/cpe/CpePipelineFailureHandlingTest.java
 
b/uimafit-cpe/src/test/java/org/apache/uima/fit/cpe/CpePipelineFailureHandlingTest.java
new file mode 100644
index 0000000..cc63687
--- /dev/null
+++ 
b/uimafit-cpe/src/test/java/org/apache/uima/fit/cpe/CpePipelineFailureHandlingTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.fit.cpe;
+
+import static org.apache.uima.fit.cpe.CpePipeline.runPipeline;
+import static 
org.apache.uima.fit.factory.AnalysisEngineFactory.createEngineDescription;
+import static 
org.apache.uima.fit.factory.CollectionReaderFactory.createReaderDescription;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
+import org.apache.uima.collection.CollectionException;
+import org.apache.uima.fit.component.JCasAnnotator_ImplBase;
+import org.apache.uima.fit.component.JCasCollectionReader_ImplBase;
+import org.apache.uima.fit.descriptor.ConfigurationParameter;
+import org.apache.uima.jcas.JCas;
+import org.apache.uima.util.Progress;
+import org.junit.Test;
+
+public class CpePipelineFailureHandlingTest {
+  private static AtomicInteger processed = new AtomicInteger(0);
+  
+  @Test
+  public void test() throws Exception {
+    int failAfter = 50;
+    try {
+      runPipeline( //
+              createReaderDescription(Reader.class, //
+                      "failAfter", failAfter),
+              createEngineDescription(Annotator.class));
+    } catch (Exception e) {
+      // Ignore
+    }
+
+    assertThat(processed.get())
+      .as("CPE stopped processing after reader threw an exception")
+      .isEqualTo(failAfter);
+  }
+
+  public static class Reader extends JCasCollectionReader_ImplBase {
+
+    @ConfigurationParameter
+    private int failAfter;
+
+    @Override
+    public Progress[] getProgress() {
+      return null;
+    }
+
+    @Override
+    public boolean hasNext() throws IOException, CollectionException {
+      return processed.get() < failAfter * 2;
+    }
+
+    @Override
+    public void getNext(JCas jCas) throws IOException, CollectionException {
+      if (processed.incrementAndGet() >= failAfter) {
+        throw new CollectionException();
+      }
+    }
+  }
+
+  public static class Annotator extends JCasAnnotator_ImplBase {
+
+    @Override
+    public void process(JCas jCas) throws AnalysisEngineProcessException {
+      // Nothing to do
+    }
+  }
+}
diff --git 
a/uimafit-cpe/src/test/java/org/apache/uima/fit/cpe/CpePipelineTest.java 
b/uimafit-cpe/src/test/java/org/apache/uima/fit/cpe/CpePipelineTest.java
index 73a904d..dac0f75 100644
--- a/uimafit-cpe/src/test/java/org/apache/uima/fit/cpe/CpePipelineTest.java
+++ b/uimafit-cpe/src/test/java/org/apache/uima/fit/cpe/CpePipelineTest.java
@@ -18,103 +18,86 @@
  */
 package org.apache.uima.fit.cpe;
 
+import static org.apache.uima.fit.cpe.CpePipeline.runPipeline;
+import static 
org.apache.uima.fit.factory.AnalysisEngineFactory.createEngineDescription;
+import static 
org.apache.uima.fit.factory.CollectionReaderFactory.createReaderDescription;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 
-import junit.framework.Assert;
-
 import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
 import org.apache.uima.cas.TypeSystem;
 import org.apache.uima.collection.CollectionException;
 import org.apache.uima.fit.component.JCasAnnotator_ImplBase;
 import org.apache.uima.fit.component.JCasCollectionReader_ImplBase;
-import org.apache.uima.fit.factory.AnalysisEngineFactory;
-import org.apache.uima.fit.factory.CollectionReaderFactory;
 import org.apache.uima.jcas.JCas;
 import org.apache.uima.resource.ResourceInitializationException;
 import org.apache.uima.util.Progress;
 import org.junit.Test;
 
-/**
- */
-public class CpePipelineTest
-{
-       @Test
-       public void test()
-               throws Exception
-       {
-               
CpePipeline.runPipeline(CollectionReaderFactory.createReaderDescription(Reader.class),
-                               
AnalysisEngineFactory.createEngineDescription(Annotator.class),
-                               
AnalysisEngineFactory.createEngineDescription(Writer.class));
-               Assert.assertEquals(MARKER, Writer.MARKER_SEEN);
-       }
-
-       public static final String TEXT = "Some text";
-       public static final String MARKER = "annotator has seen this document";
-
-       public static class Reader
-               extends JCasCollectionReader_ImplBase
-       {
-
-               private int size = 1;
-
-               private int current = 0;
-
-               private boolean initTypeSystemCalled = false;
-
-               @Override
-               public void typeSystemInit(TypeSystem aTypeSystem)
-                       throws ResourceInitializationException
-               {
-                       initTypeSystemCalled = true;
-               }
-
-               public Progress[] getProgress()
-               {
-                       return null;
-               }
-
-               public boolean hasNext()
-                       throws IOException, CollectionException
-               {
-                       assertTrue("typeSystemInit() has not been called", 
initTypeSystemCalled);
-                       return this.current < this.size;
-               }
-
-               @Override
-               public void getNext(JCas jCas)
-                       throws IOException, CollectionException
-               {
-                       jCas.setDocumentText(TEXT);
-                       this.current += 1;
-               }
-
-       }
-
-       public static class Annotator
-               extends JCasAnnotator_ImplBase
-       {
-
-               @Override
-               public void process(JCas jCas)
-                       throws AnalysisEngineProcessException
-               {
-                       jCas.setDocumentLanguage(MARKER);
-               }
-       }
-
-       public static class Writer
-               extends JCasAnnotator_ImplBase
-       {
-
-               public static String MARKER_SEEN;
-
-               @Override
-               public void process(JCas jCas)
-                       throws AnalysisEngineProcessException
-               {
-                       MARKER_SEEN = jCas.getDocumentLanguage();
-               }
-       }
+public class CpePipelineTest {
+  @Test
+  public void test() throws Exception {
+    runPipeline( //
+            createReaderDescription(Reader.class), 
+            createEngineDescription(Annotator.class),
+            createEngineDescription(Writer.class));
+    
+    assertThat(Writer.MARKER_SEEN).isEqualTo(MARKER);
+  }
+
+  public static final String TEXT = "Some text";
+
+  public static final String MARKER = "annotator has seen this document";
+
+  public static class Reader extends JCasCollectionReader_ImplBase {
+
+    private int size = 1;
+
+    private int current = 0;
+
+    private boolean initTypeSystemCalled = false;
+
+    @Override
+    public void typeSystemInit(TypeSystem aTypeSystem) throws 
ResourceInitializationException {
+      initTypeSystemCalled = true;
+    }
+
+    @Override
+    public Progress[] getProgress() {
+      return null;
+    }
+
+    @Override
+    public boolean hasNext() throws IOException, CollectionException {
+      assertTrue("typeSystemInit() has not been called", initTypeSystemCalled);
+      return this.current < this.size;
+    }
+
+    @Override
+    public void getNext(JCas jCas) throws IOException, CollectionException {
+      jCas.setDocumentText(TEXT);
+      this.current += 1;
+    }
+
+  }
+
+  public static class Annotator extends JCasAnnotator_ImplBase {
+
+    @Override
+    public void process(JCas jCas) throws AnalysisEngineProcessException {
+      jCas.setDocumentLanguage(MARKER);
+    }
+  }
+
+  public static class Writer extends JCasAnnotator_ImplBase {
+
+    public static String MARKER_SEEN;
+
+    @Override
+    public void process(JCas jCas) throws AnalysisEngineProcessException {
+      MARKER_SEEN = jCas.getDocumentLanguage();
+    }
+  }
 }

Reply via email to