Author: cwiklik
Date: Tue Jul 24 17:26:39 2018
New Revision: 1836574

URL: http://svn.apache.org/viewvc?rev=1836574&view=rev
Log:
UIMA-5843 - added support for error window

Added:
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/errors/builtin/WindowBasedErrorHandler.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/metrics/builtin/
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/metrics/builtin/ProcessWindowStats.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitErrorHandlingTestSuite.java
Modified:
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/ServiceConfiguration.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/builders/PullServiceStepBuilder.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/errors/builtin/DefaultErrorHandler.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/examples/processor/CustomProcessorExample.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/ServiceWrapper.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IServiceProcessor.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/AbstractServiceProcessor.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaProcessResult.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/ServiceConfiguration.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/ServiceConfiguration.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/ServiceConfiguration.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/ServiceConfiguration.java
 Tue Jul 24 17:26:39 2018
@@ -44,8 +44,23 @@ public class ServiceConfiguration {
        private String ccOverrides;
        private String cmOverrides;
        private String aeOverrides;
+       private String maxErrors;
+       private String errorWindowSize;
        private ClassLoader sysCL=null;
 
+       public String getMaxErrors() {
+               return maxErrors;
+       }
+       public void setMaxErrors(String maxErrors) {
+               this.maxErrors = maxErrors;
+       }
+
+       public String getErrorWindowSize() {
+               return errorWindowSize;
+       }
+       public void setErrorWindowSize(String errorWindowSize) {
+               this.errorWindowSize = errorWindowSize;
+       }
        public ClassLoader getSysCL() {
                return sysCL;
        }
@@ -217,6 +232,9 @@ public class ServiceConfiguration {
                clientURL = System.getProperty("ducc.deploy.JdURL");
                threadCount = System.getProperty("ducc.deploy.JpThreadCount");
                serviceType = System.getProperty("ducc.deploy.service.type");
+           maxErrors = 
System.getProperty("ducc.deploy.service.error.threshold");
+           errorWindowSize = 
System.getProperty("ducc.deploy.service.error.window");
+               
                jpType = System.getProperty("ducc.deploy.JpType");
                assignedJmxPort = System.getProperty("ducc.jmx.port");
                customRegistryClass = 
System.getProperty("ducc.deploy.registry.class");

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/builders/PullServiceStepBuilder.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/builders/PullServiceStepBuilder.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/builders/PullServiceStepBuilder.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/builders/PullServiceStepBuilder.java
 Tue Jul 24 17:26:39 2018
@@ -103,7 +103,6 @@ public final class PullServiceStepBuilde
        public interface OptionalsStep {
                public OptionalsStep withScaleout(int scaleout);
                public OptionalsStep withType(String type);
-               
                public BuildStep withOptionalsDone();
        }
        public interface BuildStep {

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/errors/builtin/DefaultErrorHandler.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/errors/builtin/DefaultErrorHandler.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/errors/builtin/DefaultErrorHandler.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/errors/builtin/DefaultErrorHandler.java
 Tue Jul 24 17:26:39 2018
@@ -20,59 +20,19 @@
 package org.apache.uima.ducc.ps.service.errors.builtin;
 
 import org.apache.uima.ducc.ps.service.IServiceComponent;
-import org.apache.uima.ducc.ps.service.Lifecycle;
 import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler;
 import org.apache.uima.ducc.ps.service.metrics.IWindowStats;
 
 
 public class DefaultErrorHandler implements IServiceErrorHandler {
-       private int frameWorkErrorLimit=-1; // no limit
-       private Action actionOnProcessError;
-       private int windowSize = 0;
-       private Action actionOnExceedsWindowSize;
-       private Lifecycle lifecycleMonitor;
-       private long errorCount=0;
        
-       public DefaultErrorHandler(Action action) {
-               
-               this.actionOnProcessError = action;
-       }
-       public DefaultErrorHandler() {
-               this.actionOnProcessError = Action.TERMINATE;
-       }
-       
-       public DefaultErrorHandler withMaxFrameworkErrors(int 
maxFrameworkError) {
-               this.frameWorkErrorLimit = maxFrameworkError;
-               return this;
-       }
-       public DefaultErrorHandler withProcessErrorWindow(int errorWindow, 
Action errorAction ) {
-               this.actionOnExceedsWindowSize = errorAction;
-               this.windowSize = errorWindow;
-               return this;
-       }
-       private boolean exceedsProcessWindow() {
-               return (errorCount % windowSize == 0);
-       }
        @Override
        public Action handleProcessError(Exception e, IServiceComponent source, 
IWindowStats stats) {
-               
                return Action.TERMINATE;
        }
        @Override
        public Action handle(Exception e, IServiceComponent source) {
-               errorCount++;
-               if ( exceedsProcessWindow() ) {
-                       Thread t = new Thread( new Runnable() {
-                               public void run() {
-                                       lifecycleMonitor.stop();
-                               }
-                       });
-                       t.start();
-                       
-               }
-               
-               
-               
-               return Action.TERMINATE;        }
+               return Action.TERMINATE;        
+        }
        
 }

Added: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/errors/builtin/WindowBasedErrorHandler.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/errors/builtin/WindowBasedErrorHandler.java?rev=1836574&view=auto
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/errors/builtin/WindowBasedErrorHandler.java
 (added)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/errors/builtin/WindowBasedErrorHandler.java
 Tue Jul 24 17:26:39 2018
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+package org.apache.uima.ducc.ps.service.errors.builtin;
+
+import java.util.Arrays;
+
+import org.apache.uima.ducc.ps.service.IServiceComponent;
+import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler;
+import org.apache.uima.ducc.ps.service.metrics.IWindowStats;
+
+public class WindowBasedErrorHandler implements IServiceErrorHandler  {
+
+       private int errorThreshold=1; 
+       private int windowSize = 1;
+       private long errorCount=0;
+       private long errorSequences[];
+       
+
+       public WindowBasedErrorHandler withMaxFrameworkErrors(int 
maxFrameworkError) {
+               this.errorThreshold = maxFrameworkError;
+               return this;
+       }
+       public WindowBasedErrorHandler withProcessErrorWindow(int errorWindow ) 
{
+               this.windowSize = errorWindow;
+               return this;
+       }
+       public WindowBasedErrorHandler build() {
+               if (windowSize >= errorThreshold && errorThreshold > 1) {
+                  errorSequences = new long[errorThreshold - 1];
+                  Arrays.fill(errorSequences, -windowSize);
+               }
+               return this;
+       }
+
+       public boolean exceededErrorWindow(long taskCount) {
+               if (errorThreshold == 0) {
+                       return false;
+               }
+               ++errorCount;
+
+               // If no window check if total errors have REACHED the threshold
+               if (errorSequences == null) {
+                       return (errorCount >= errorThreshold);
+               }
+               // Insert in array by replacing one that is outside the window.
+               int i = errorThreshold - 1;
+               while (--i >= 0) {
+                       if (errorSequences[i] <= taskCount - windowSize) {
+                               errorSequences[i] = taskCount;
+                               return false;
+                       }
+               }
+               // If insert fails then have reached threshold.
+               // Should not be called again after returning true as may 
return false!
+               // But may be called again if no action specified, but then it 
doesn't matter.
+               return true;
+       }
+
+       @Override
+       public Action handleProcessError(Exception e, IServiceComponent source, 
IWindowStats stats) {
+               Action action = Action.CONTINUE;
+               if ( exceededErrorWindow(stats.getSuccessCount()) ) {
+                       action = Action.TERMINATE;
+               }
+               return action;
+       }
+       @Override
+       public Action handle(Exception e, IServiceComponent source) {
+               return Action.TERMINATE;        
+        }
+       
+
+}

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/examples/processor/CustomProcessorExample.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/examples/processor/CustomProcessorExample.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/examples/processor/CustomProcessorExample.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/examples/processor/CustomProcessorExample.java
 Tue Jul 24 17:26:39 2018
@@ -51,4 +51,10 @@ public class CustomProcessorExample impl
                logger.log(Level.INFO,"... getScaleout() called");
                return 1;
        }
+
+       @Override
+       public void setErrorHandlerWindow(int maxErrors, int windowSize) {
+               logger.log(Level.INFO,"... setErrorHandlerWindow() called");
+
+       }
 }

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java
 Tue Jul 24 17:26:39 2018
@@ -166,9 +166,6 @@ public class PullService implements ISer
                        // this down just before thread dies.
                        CountDownLatch stopLatch = new CountDownLatch(scaleout);
                        serviceProcessor.setScaleout(scaleout);
-//                     if ( serviceProcessor instanceof IScaleable ) {
-//                             ((IScaleable) 
serviceProcessor).setScaleout(scaleout);
-//                     }
                        // add default protocol handler
                protocolHandler =
                                           new 
DefaultServiceProtocolHandler.Builder()

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/ServiceWrapper.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/ServiceWrapper.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/ServiceWrapper.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/ServiceWrapper.java
 Tue Jul 24 17:26:39 2018
@@ -28,8 +28,11 @@ import org.apache.uima.ducc.ps.service.I
 import org.apache.uima.ducc.ps.service.ServiceConfiguration;
 import org.apache.uima.ducc.ps.service.builders.PullServiceStepBuilder;
 import org.apache.uima.ducc.ps.service.dgen.DeployableGeneration;
+import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler;
 import org.apache.uima.ducc.ps.service.errors.ServiceException;
 import org.apache.uima.ducc.ps.service.errors.ServiceInitializationException;
+import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler.Action;
+import org.apache.uima.ducc.ps.service.errors.builtin.WindowBasedErrorHandler;
 import org.apache.uima.ducc.ps.service.jmx.JMXAgent;
 import org.apache.uima.ducc.ps.service.processor.IServiceProcessor;
 import org.apache.uima.ducc.ps.service.processor.uima.UimaAsServiceProcessor;
@@ -71,6 +74,7 @@ public class ServiceWrapper {
         */
        private IServiceProcessor createProcessor(String 
analysisEngineDescriptorPath, String[] args) 
        throws ServiceInitializationException{
+               IServiceProcessor serviceProcessor=null;
                if ( serviceConfiguration.getCustomProcessorClass() != null ) {
                        try {
                        Class<?> clz = 
Class.forName(serviceConfiguration.getCustomProcessorClass());
@@ -78,20 +82,48 @@ public class ServiceWrapper {
                        if ( !IServiceProcessor.class.isAssignableFrom(clz) ) {
                                throw new 
ServiceInitializationException(serviceConfiguration.getCustomProcessorClass()+" 
Processor Class does not implement IServiceProcessor ");
                        }
-                       return (IServiceProcessor) clz.newInstance();
+                       serviceProcessor = (IServiceProcessor) 
clz.newInstance();
+                       int maxErrors = 0;
+                       int windowSize = 0;
+                       if ( serviceConfiguration.getMaxErrors() != null ) {
+                               maxErrors = 
Integer.parseInt(serviceConfiguration.getMaxErrors());
+                       }
+                       if ( serviceConfiguration.getMaxErrors() != null ) {
+                               windowSize = 
Integer.parseInt(serviceConfiguration.getErrorWindowSize());
+                       }
+                       serviceProcessor.setErrorHandlerWindow(maxErrors, 
windowSize);
+
                        } catch( Exception e) {
                                logger.log(Level.WARNING,"",e);
                                throw new 
ServiceInitializationException("Unable to instantiate Custom Processor from 
class:"+serviceConfiguration.getCustomProcessorClass());
                        }
                } else {
-                       if  ( "uima".equals(serviceConfiguration.getJpType() ) 
){
-                               return new 
UimaServiceProcessor(analysisEngineDescriptorPath, serviceConfiguration);
+                       if  ( "uima".equals(serviceConfiguration.getJpType() ) 
) {
+                               serviceProcessor = new 
UimaServiceProcessor(analysisEngineDescriptorPath, serviceConfiguration);
+                       
                        } else if ( 
"uima-as".equals(serviceConfiguration.getJpType()) ) {
-                               return new UimaAsServiceProcessor(args, 
serviceConfiguration);
+                               serviceProcessor = new 
UimaAsServiceProcessor(args, serviceConfiguration);
+                       
                        } else {
                                throw new RuntimeException("Invalid deployment. 
Set either -Dducc.deploy.JpType=[uima,uima-as] or provide 
-Dducc.deploy.custom.processor.class=XX where XX implements IServiceProcessor 
");
                        }
                } 
+               return serviceProcessor;
+       }
+       private IServiceErrorHandler getErrorHandler() {
+               int maxErrors = 1;
+               int windowSize = 1;
+               
+               if ( serviceConfiguration.getMaxErrors() != null ) {
+                       maxErrors = 
Integer.parseInt(serviceConfiguration.getMaxErrors());
+               }
+               if ( serviceConfiguration.getErrorWindowSize() != null ) {
+                       windowSize = 
Integer.parseInt(serviceConfiguration.getErrorWindowSize());
+               }
+               // Error handler which terminates service on the 1st error 
+               return  new WindowBasedErrorHandler()
+                               .withMaxFrameworkErrors(maxErrors)
+                               .withProcessErrorWindow(windowSize).build();
        }
        /**
         * Check if AE descriptor is provided or we need to create it from parts

Added: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/metrics/builtin/ProcessWindowStats.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/metrics/builtin/ProcessWindowStats.java?rev=1836574&view=auto
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/metrics/builtin/ProcessWindowStats.java
 (added)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/metrics/builtin/ProcessWindowStats.java
 Tue Jul 24 17:26:39 2018
@@ -0,0 +1,50 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.metrics.builtin;
+
+import org.apache.uima.ducc.ps.service.metrics.IWindowStats;
+
+public class ProcessWindowStats implements IWindowStats {
+       private final long errorCount;
+       private final long successCount;
+       private final long errorsSinceLastSuccess;
+       
+       public ProcessWindowStats(long errorCount, long successCount, long 
errorsSinceLastSuccess)  {
+               this.errorCount = errorCount;
+               this.successCount = successCount;
+               this.errorsSinceLastSuccess = errorsSinceLastSuccess;
+       }
+
+       @Override
+       public long getErrorCount() {
+               return errorCount;
+       }
+
+       @Override
+       public long getSuccessCount() {
+               return successCount;
+       }
+
+       @Override
+       public long getErrorCountSinceLastSuccess() {
+               return errorsSinceLastSuccess;
+       }
+
+}

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IServiceProcessor.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IServiceProcessor.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IServiceProcessor.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IServiceProcessor.java
 Tue Jul 24 17:26:39 2018
@@ -33,4 +33,6 @@ public interface IServiceProcessor exten
        public void setScaleout(int scaleout);
        
        public int getScaleout();
+       
+       public void setErrorHandlerWindow(int maxErrors, int windowSize);
 }

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/AbstractServiceProcessor.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/AbstractServiceProcessor.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/AbstractServiceProcessor.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/AbstractServiceProcessor.java
 Tue Jul 24 17:26:39 2018
@@ -20,7 +20,10 @@ package org.apache.uima.ducc.ps.service.
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler;
+import org.apache.uima.ducc.ps.service.errors.builtin.WindowBasedErrorHandler;
 import org.apache.uima.ducc.ps.service.utils.UimaSerializer;
 import org.apache.uima.util.Level;
 import org.apache.uima.util.Logger;
@@ -28,12 +31,32 @@ import org.apache.uima.util.Logger;
 public class AbstractServiceProcessor {
        // Map to store DuccUimaSerializer instances. Each has affinity to a 
thread
        protected Map<Long, UimaSerializer> serializerMap = new HashMap<>();
+       protected AtomicLong errorCount = new AtomicLong();
+       protected AtomicLong successCount = new AtomicLong();
+       protected AtomicLong errorCountSinceLastSuccess = new AtomicLong();
+       protected int maxErrors=1;  // default is to fail on 1st error
+       protected int windowSize=1;
+    protected int DEFAULT_INIT_DELAY=30000;
+    
+       protected IServiceErrorHandler getErrorHandler() {
+               // concrete implementation of this abstract class should
+               // provide a way to set maxErrors and windowSize. The 
IServiceProcessor
+               // provides a setter method for overriding default values 
+               return new WindowBasedErrorHandler().
+                               withMaxFrameworkErrors(maxErrors).
+                               withProcessErrorWindow(windowSize).build();
+
+       }
 
        protected void delay(Logger logger, long howLong) {
+               long delay = DEFAULT_INIT_DELAY;
+               if ( System.getProperty("ducc.service.init.delay") != null ) {
+                       delay = 
Long.parseLong(System.getProperty("ducc.service.init.delay").trim());
+               }
                logger.log(Level.INFO, "Wait for the initialized state to 
propagate to the SM " +
-                         "so any processing errors are not treates as 
initialization failures");
+                         "so any processing errors are not treates as 
initialization failures - delay="+delay);
                try {
-                       Thread.sleep(30000);
+                       Thread.sleep(delay);
                } catch (InterruptedException e1) {
                }
 

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java
 Tue Jul 24 17:26:39 2018
@@ -41,8 +41,11 @@ import org.apache.uima.ducc.ps.service.I
 import org.apache.uima.ducc.ps.service.ServiceConfiguration;
 import org.apache.uima.ducc.ps.service.dgen.DeployableGenerator;
 import org.apache.uima.ducc.ps.service.dgen.DuccUimaReferenceByName;
+import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler;
 import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler.Action;
 import org.apache.uima.ducc.ps.service.errors.ServiceInitializationException;
+import org.apache.uima.ducc.ps.service.metrics.IWindowStats;
+import org.apache.uima.ducc.ps.service.metrics.builtin.ProcessWindowStats;
 import org.apache.uima.ducc.ps.service.monitor.IServiceMonitor;
 import org.apache.uima.ducc.ps.service.monitor.builtin.RemoteStateObserver;
 import org.apache.uima.ducc.ps.service.processor.IProcessResult;
@@ -62,7 +65,6 @@ public class UimaAsServiceProcessor exte
 
        Logger logger = UIMAFramework.getLogger(UimaServiceProcessor.class);
        // Map to store DuccUimaSerializer instances. Each has affinity to a 
thread
-//     private Map<Long, UimaSerializer> serializerMap = new HashMap<>();
        private static Object brokerInstance = null;
        private UimaAsClientWrapper uimaASClient = null;
        private String saxonURL = null;
@@ -82,13 +84,13 @@ public class UimaAsServiceProcessor exte
        private static Object platformMBeanServer;
        private ServiceConfiguration serviceConfiguration;
        private IServiceMonitor monitor;
+       private IServiceErrorHandler errorHandler;
        public volatile boolean initialized = false;
        private String[] deploymentDescriptors = null;
        private String[] ids = null;
        private String duccHome = null;
        boolean enablePerformanceBreakdownReporting = false;
        private AtomicInteger numberOfInitializedThreads = new AtomicInteger();
-
        static {
                // try to get platform MBean Server (Java 1.5 only)
                try {
@@ -101,12 +103,12 @@ public class UimaAsServiceProcessor exte
        }
        private String[] args;
 
+
        public UimaAsServiceProcessor(String[] args, ServiceConfiguration 
serviceConfiguration) {
                this.args = args;
                this.serviceConfiguration = serviceConfiguration;
                // start a thread which will collect AE initialization state
                launchStateInitializationCollector();
-
        }
 
        @Override
@@ -118,7 +120,10 @@ public class UimaAsServiceProcessor exte
        public int getScaleout() {
                return scaleout;
        }
-
+       public void setErrorHandlerWindow(int maxErrors, int windowSize) {
+               this.maxErrors = maxErrors;
+               this.windowSize = windowSize;
+       }
        private int generateDescriptorsAndGetScaleout(String[] args) throws 
Exception {
                deploymentDescriptors = getDescriptors(args);
                ids = new String[deploymentDescriptors.length];
@@ -157,6 +162,7 @@ public class UimaAsServiceProcessor exte
                        }
                        // Needed to resolve ${queue.name} placeholder in DD 
generated by DUCC
                        System.setProperty(queuePropertyName, endpointName);
+                       errorHandler = getErrorHandler();
 
                        // generate Spring context file once
                        synchronized (UimaAsServiceProcessor.class) {
@@ -195,7 +201,7 @@ public class UimaAsServiceProcessor exte
                                doDeploy();
                        }
                        if ( numberOfInitializedThreads.incrementAndGet() == 
scaleout ) {
-                               super.delay(logger, 30000);
+                               super.delay(logger, DEFAULT_INIT_DELAY);
                                
monitor.onStateChange(IServiceState.State.Running.toString(), new Properties());
                        }
 
@@ -232,21 +238,29 @@ public class UimaAsServiceProcessor exte
         matcher.appendTail(sb);
         return sb.toString();
        }
+
+       private CAS getCAS(String serializedTask) throws Exception {
+               CAS cas = uimaASClient.getCAS();
+               // DUCC JP services are given a serialized CAS ... others just 
the doc-text for
+               // a CAS
+               if (serviceConfiguration.getJpType() != null) {
+                       // Use thread dedicated UimaSerializer to de-serialize 
the CAS
+                       
getUimaSerializer().deserializeCasFromXmi(serializedTask, cas);
+               } else {
+                       cas.setDocumentText(serializedTask);
+                       cas.setDocumentLanguage("en");
+               }
+               return cas;
+       }
+
+       
        @Override
        public IProcessResult process(String serializedTask) {
                CAS cas = null;
                IProcessResult result;
                try {
-                       cas = uimaASClient.getCAS();
-                       // DUCC JP  services are given a serialized CAS ... 
others just the doc-text for a CAS
-                       if (serviceConfiguration.getJpType() != null) {
-                               // Use thread dedicated UimaSerializer to 
de-serialize the CAS
-                               
getUimaSerializer().deserializeCasFromXmi(serializedTask, cas);
-                       } else {
-                               cas.setDocumentText(serializedTask);
-                               cas.setDocumentLanguage("en");
-                       }
 
+                       cas = getCAS(serializedTask);
                        List<PerformanceMetrics> casMetrics = new ArrayList<>();
 
                        if (enablePerformanceBreakdownReporting) {
@@ -254,9 +268,18 @@ public class UimaAsServiceProcessor exte
 
                                try {
                                        uimaASClient.sendAndReceive(cas, 
perfMetrics);
+                                       successCount.incrementAndGet();
+                                       errorCountSinceLastSuccess.set(0);
+
                                } catch (Exception t) {
                                        logger.log(Level.WARNING, "", t);
-                                       result = new UimaProcessResult(t, 
Action.TERMINATE);
+                                       IWindowStats stats = 
+                                                       new 
ProcessWindowStats(errorCount.incrementAndGet(), 
+                                                                       
successCount.get(), 
+                                                                       
errorCountSinceLastSuccess.incrementAndGet());
+                                       Action action = 
+                                                       
errorHandler.handleProcessError(t, this, stats);
+                                       result = new UimaProcessResult(t, 
action);
                                        return result;
                                }
 
@@ -280,10 +303,21 @@ public class UimaAsServiceProcessor exte
                        } else {
                                // delegate processing to the UIMA-AS service 
and wait for a reply
                                try {
+                                       
                                        uimaASClient.sendAndReceive(cas);
+                                       successCount.incrementAndGet();
+                                       errorCountSinceLastSuccess.set(0);
+
                                } catch (Exception t) {
                                        logger.log(Level.WARNING, "", t);
-                                       result = new UimaProcessResult(t, 
Action.TERMINATE);
+                                       IWindowStats stats = 
+                                                       new 
ProcessWindowStats(errorCount.incrementAndGet(), 
+                                                                       
successCount.get(), 
+                                                                       
errorCountSinceLastSuccess.incrementAndGet());
+                                       Action action = 
+                                                       
errorHandler.handleProcessError(t, this, stats);
+
+                                       result = new UimaProcessResult(t, 
action);
                                        return result;
                                }                                               
                                 
                                PerformanceMetrics pm = new PerformanceMetrics(

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaProcessResult.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaProcessResult.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaProcessResult.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaProcessResult.java
 Tue Jul 24 17:26:39 2018
@@ -20,6 +20,7 @@ package org.apache.uima.ducc.ps.service.
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.util.Objects;
 
 import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler.Action;
 import org.apache.uima.ducc.ps.service.processor.IProcessResult;
@@ -47,6 +48,9 @@ public class UimaProcessResult implement
        }
        @Override
        public String getError() {
+               if ( Objects.isNull(exception)) {
+                       return null;
+               }
                StringWriter sw = new StringWriter();
                exception.printStackTrace(new PrintWriter(sw));
                return sw.toString();

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java
 Tue Jul 24 17:26:39 2018
@@ -21,7 +21,6 @@ package org.apache.uima.ducc.ps.service.
 import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 //import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -36,6 +35,8 @@ import org.apache.uima.ducc.ps.service.S
 //import org.apache.uima.ducc.ps.service.dgen.DeployableGeneration;
 import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler;
 import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler.Action;
+import org.apache.uima.ducc.ps.service.metrics.IWindowStats;
+import org.apache.uima.ducc.ps.service.metrics.builtin.ProcessWindowStats;
 //import org.apache.uima.ducc.ps.service.jmx.JmxAEProcessInitMonitor;
 import org.apache.uima.ducc.ps.service.monitor.IServiceMonitor;
 import org.apache.uima.ducc.ps.service.monitor.builtin.RemoteStateObserver;
@@ -77,6 +78,7 @@ public class UimaServiceProcessor extend
        private ServiceConfiguration serviceConfiguration;
        private IServiceMonitor monitor;
        private AtomicInteger numberOfInitializedThreads = new AtomicInteger();
+       private IServiceErrorHandler errorHandler;
        
        static {
                // try to get platform MBean Server (Java 1.5 only)
@@ -104,8 +106,25 @@ public class UimaServiceProcessor extend
                if (serviceConfiguration.getJpType() != null) {
                  serializerMap = new HashMap<>();
                }
+               // check if error window override has been set via -D
+               if ( serviceConfiguration.getMaxErrors() != null ) {
+                       this.maxErrors = 
Integer.parseInt(serviceConfiguration.getMaxErrors());
+               }
+               // check if error window override has been set via -D
+               if ( serviceConfiguration.getErrorWindowSize() != null ) {
+                       this.windowSize = 
Integer.parseInt(serviceConfiguration.getErrorWindowSize());
+               }
+       }
+       /*
+        * Defines error handling parameters
+        * 
+        * @param maxErrors - maximum error threshold within an error window
+        * @param windowSize - error window size
+        */
+       public void setErrorHandlerWindow(int maxErrors, int windowSize) {
+               this.maxErrors = maxErrors;
+               this.windowSize = windowSize;
        }
-       
        private void launchStateInitializationCollector() {
                monitor =
                                new RemoteStateObserver(serviceConfiguration, 
logger);
@@ -124,6 +143,8 @@ public class UimaServiceProcessor extend
                        logger.log(Level.FINE, "Process Thread:"+ 
Thread.currentThread().getName()+" Initializing AE");
                        
                }
+               errorHandler = getErrorHandler();
+               
                try {
                        // multiple threads may call this method. Send 
initializing state once
                        initStateLock.lockInterruptibly();
@@ -176,7 +197,7 @@ public class UimaServiceProcessor extend
                        
                }
                if ( numberOfInitializedThreads.incrementAndGet() == scaleout ) 
{
-                       super.delay(logger, 30000);
+                       super.delay(logger, DEFAULT_INIT_DELAY);
                        
monitor.onStateChange(IServiceState.State.Running.toString(), new Properties());
                }
        }
@@ -189,10 +210,6 @@ public class UimaServiceProcessor extend
                casPool = new CasPool(scaleout, analysisEngineMetadata, rm);
        }
 
-//     private UimaSerializer getUimaSerializer() {
-//             
-//             return serializerMap.get(Thread.currentThread().getId());
-//     }
        @Override
        public IProcessResult process(String serializedTask) {
                AnalysisEngine ae = null;
@@ -229,13 +246,22 @@ public class UimaServiceProcessor extend
                        // get the delta
                        List<PerformanceMetrics> casMetrics = 
                                        UimaMetricsGenerator.getDelta( 
afterAnalysis, beforeAnalysis);
+                       
+                       successCount.incrementAndGet();
+                       errorCountSinceLastSuccess.set(0);
                        return new 
UimaProcessResult(resultSerializer.serialize(casMetrics));
                } catch( Exception e ) {
                        logger.log(Level.WARNING,"",e);
-                       result = new UimaProcessResult(e, Action.TERMINATE);
+                       IWindowStats stats = 
+                                       new 
ProcessWindowStats(errorCount.incrementAndGet(), 
+                                                       successCount.get(), 
+                                                       
errorCountSinceLastSuccess.incrementAndGet());
+                       Action action = 
+                                       errorHandler.handleProcessError(e, 
this, stats);
+
+                       result = new UimaProcessResult(e, action);
                        return result;
-               }
-               finally {
+               } finally {
                        
                        if (cas != null) {
                                casPool.releaseCas(cas);
@@ -245,7 +271,7 @@ public class UimaServiceProcessor extend
 
        
        public void setErrorHandler(IServiceErrorHandler errorHandler) {
-
+               this.errorHandler = errorHandler;
        }
 
        @Override

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
 Tue Jul 24 17:26:39 2018
@@ -19,6 +19,7 @@
 package org.apache.uima.ducc.ps.service.protocol.builtin;
 
 import java.io.InvalidClassException;
+import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
@@ -237,15 +238,20 @@ public class DefaultServiceProtocolHandl
 
                                // assume success
                                Action action = Action.CONTINUE;
+                               // check if process error occurred.  
+                               String errorAsString = processResult.getError();
+
                                if (processResult.terminateProcess()) {
                                        action = Action.TERMINATE;
-                                       String errorAsString = 
processResult.getError();
+                               } else if ( Objects.isNull(errorAsString)){
+                                       // success 
+                                       
transaction.getMetaTask().setPerformanceMetrics(processResult.getResult());
+                               } 
+                               if ( Objects.nonNull(errorAsString ) ) {
                                        IMetaTask mc = 
transaction.getMetaTask();
                                        mc.setUserSpaceException(errorAsString);
-                               } else {
-                                       // success
-                                       
transaction.getMetaTask().setPerformanceMetrics(processResult.getResult());
                                }
+                               
                                // send END Request
                                callEnd(transaction);
                                if (running && Action.TERMINATE.equals(action)) 
{
@@ -275,16 +281,19 @@ public class DefaultServiceProtocolHandl
                        }               
                }
                stopLatch.countDown();
+               logger.log(Level.INFO,"ProtocolHandler stopped requesting new 
tasks - Stopping processor");
+
                if ( processor != null ) {
                        processor.stop();
                }
-               logger.log(Level.INFO,"ProtocolHandler terminated");
                return String.valueOf(Thread.currentThread().getId());
        }
 
        
        private void delegateStop() {
-               service.stop(); // dont quiesce
+       service.stop(); // dont quiesce
+               
+               //service.quiesceAndStop();
        }
        @Override
        public void stop() {
@@ -298,15 +307,14 @@ public class DefaultServiceProtocolHandl
        public void quiesceAndStop() {
                quiescing = true;
                running = false;
-               if ( logger.isLoggable(Level.INFO)) {
-                       logger.log(Level.INFO, this.getClass().getName()+" 
quiesceAndStop() called");
-               }
+               logger.log(Level.INFO, this.getClass().getName()+" 
quiesceAndStop() called");
                try {
                        // wait for process threads to terminate
                        stopLatch.await();
-                       logger.log(Level.INFO, this.getClass().getName()+" All 
process threads completed quiesce");
                } catch( Exception e ) {
+
                }
+               logger.log(Level.INFO, this.getClass().getName()+" All process 
threads completed quiesce");
        }
        @Override
        public void start() {

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java
 Tue Jul 24 17:26:39 2018
@@ -21,6 +21,7 @@ package org.apache.uima.ducc.ps;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
@@ -49,6 +50,7 @@ import org.junit.After;
 public class Client {
        private Server server;
        private boolean block = false;
+       private AtomicLong errorCount = new AtomicLong();
        private final static String app="test";
        private int httpPort = 8080;
        private int maxThreads = 50;
@@ -162,6 +164,9 @@ public class Client {
                                        case End:
                                                System.out.println("---- Driver 
handling END Request - "+imt.getMetaTask().getAppData());
                                                
//handleMetaCasTransationEnd(trans, taskConsumer);
+                                               if ( 
imt.getMetaTask().getUserSpaceException() != null ) {
+                                                       
System.out.println("Client received error#"+errorCount.incrementAndGet());
+                                               }
                                                break;
                                        case InvestmentReset:
                                        //      
handleMetaCasTransationInvestmentReset(trans, rwt);
@@ -203,6 +208,9 @@ public class Client {
                        }
 
                }
+               public long getErrorCount() {
+                       return errorCount.get();
+               }
                private IMetaTask getMetaCas(String serializedCas) {
                        if ( serializedCas == null ) {
                                return null;

Added: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitErrorHandlingTestSuite.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitErrorHandlingTestSuite.java?rev=1836574&view=auto
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitErrorHandlingTestSuite.java
 (added)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitErrorHandlingTestSuite.java
 Tue Jul 24 17:26:39 2018
@@ -0,0 +1,140 @@
+package org.apache.uima.ducc.ps.service;
+
+import org.apache.uima.ducc.ps.service.errors.builtin.WindowBasedErrorHandler;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+public class JunitErrorHandlingTestSuite {
+
+       public JunitErrorHandlingTestSuite() {
+               
+               
+       }
+
+       @Test
+       public void testNoWindow() throws Exception {
+               // fails on 3rd error 
+               WindowBasedErrorHandler errorHandler = 
+                               new WindowBasedErrorHandler();
+               errorHandler.withMaxFrameworkErrors(3).build();
+               int taskCount = 10;
+               int failures=0;
+               for( int i = 0; i < taskCount; i++) {
+                       failures++;
+                       if ( errorHandler.exceededErrorWindow(i) ) {
+                               break;
+                       }
+               }
+               // there should be 3 failures
+               Assert.assertEquals(3, failures);
+       }
+       @Test
+       public void testDefaults() throws Exception {
+               // fails on 1st error 
+               WindowBasedErrorHandler errorHandler = 
+                               new WindowBasedErrorHandler();
+               errorHandler.build();
+               int taskCount = 10;
+               int failures=0;
+               for( int i = 0; i < taskCount; i++) {
+                       failures++;
+                       if ( errorHandler.exceededErrorWindow(i) ) {
+                               break;
+                       }
+               }
+               // there should be 1 failure
+               Assert.assertEquals(1, failures);
+       }
+       @Test
+       public void testErrorFirstTask() throws Exception {
+               // fails on 3rd error 
+               WindowBasedErrorHandler errorHandler = 
+                               new WindowBasedErrorHandler();
+               
errorHandler.withMaxFrameworkErrors(1).withProcessErrorWindow(1).build();
+               int taskCount = 10;
+               int failures=0;
+               for( int i = 0; i < taskCount; i++) {
+                       failures++;
+                       if ( errorHandler.exceededErrorWindow(i) ) {
+                               break;
+                       }
+               }
+               // there should be 1 failure
+               Assert.assertEquals(1, failures);
+       }
+       @Test
+       public void testZeroThresholdWindow() throws Exception {
+               WindowBasedErrorHandler errorHandler = 
+                               new WindowBasedErrorHandler();
+               errorHandler.withMaxFrameworkErrors(0).
+                            withProcessErrorWindow(5).build();
+               int taskCount = 10;
+               for( int i = 0; i < taskCount; i++) {
+                       // simulate failure on task3. The test should not fail 
since max errors is 0, meaning
+                       // don't ever fail.
+                       if (  i == 3 ) {
+                               if ( errorHandler.exceededErrorWindow(i) ) {
+                                       Assert.fail("Unexpected test failure - 
window=5 maxErrors=0 taskCount:"+i);
+                               }
+                       }
+               }
+       }
+       @Test
+       public void testErrorsLessWindow() throws Exception {
+               WindowBasedErrorHandler errorHandler = 
+                               new WindowBasedErrorHandler();
+               errorHandler.withMaxFrameworkErrors(3).
+                            withProcessErrorWindow(5).build();
+               int taskCount = 10;
+               for( int i = 0; i < taskCount; i++) {
+                       // simulate failure on task1, task3, and task9. Setup 
calls for failure
+                       // if 3 errors occur within 5 tasks window. In this 
case, task1 and task3
+                       // should not cause a failure. Also task9 should not 
cause error since
+                       // its a new window.
+                       if ( i == 1 || i == 3 || i == 9) {
+                               if ( errorHandler.exceededErrorWindow(i) ) {
+                                       Assert.fail("Unexpected test failure - 
window=5 maxErrors=3 taskCount:"+i);
+                               }
+                       }
+               }
+       }
+       @Test
+       public void testErrorsExceedingWindow() throws Exception {
+               WindowBasedErrorHandler errorHandler = 
+                               new WindowBasedErrorHandler();
+               errorHandler.withMaxFrameworkErrors(3).
+                            withProcessErrorWindow(8).build();
+               int taskCount = 10;
+               for( int i = 0; i < taskCount; i++) {
+                       // simulate failure on task1, task3, and task7. Setup 
calls for failure
+                       // if 3 errors occur within 8 tasks window. In this 
case, task1, task3, and task7
+                       // should cause failure. 
+                       if ( i == 1 || i == 3 || i == 7) {
+                               if ( errorHandler.exceededErrorWindow(i) ) {
+                                       return;
+                               }
+                       }
+               }
+               Assert.fail("Unexpected test success - Should have failed - 
window=8 maxErrors=3 taskCount:"+taskCount);
+       }
+}

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java
 Tue Jul 24 17:26:39 2018
@@ -1,4 +1,21 @@
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
 package org.apache.uima.ducc.ps.service;
 
 import java.util.Timer;
@@ -7,16 +24,23 @@ import java.util.concurrent.CountDownLat
 
 import org.apache.uima.ducc.ps.Client;
 import org.apache.uima.ducc.ps.service.builders.PullServiceStepBuilder;
+import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler;
 import org.apache.uima.ducc.ps.service.errors.ServiceInitializationException;
+import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler.Action;
+import org.apache.uima.ducc.ps.service.errors.builtin.WindowBasedErrorHandler;
 import org.apache.uima.ducc.ps.service.processor.IServiceProcessor;
 import org.apache.uima.ducc.ps.service.processor.uima.UimaServiceProcessor;
 import org.junit.Test;
 
 public class JunitPullServiceTestCase extends Client {
-
+       private static final long  DELAY=5000;
        CountDownLatch threadsReady;
        CountDownLatch stopLatch;
-
+       {
+               // static initializer sets amount of time the service delays
+               // sending READY to a monitor
+               System.setProperty("ducc.service.init.delay", "3000");
+       }
        @Test
        public void testPullService() throws Exception {
                System.out.println("----------------- testPullService 
-------------------");
@@ -24,9 +48,10 @@ public class JunitPullServiceTestCase ex
                super.startJetty(false);  // don't block
                String analysisEngineDescriptor = "TestAAE";
                System.setProperty("ducc.deploy.JpType", "uima");
+               
                IServiceProcessor processor = new 
                                UimaServiceProcessor(analysisEngineDescriptor);
-
+               
                String tasURL = "http://localhost:8080/test";;
                
                IService service = 
PullServiceStepBuilder.newBuilder().withProcessor(processor)
@@ -37,7 +62,7 @@ public class JunitPullServiceTestCase ex
                        service.initialize();
                        Timer fTimer = new Timer("testPullService Timer");
                        // after 5secs stop the pull service
-                       fTimer.schedule(new MyTimerTask(service, fTimer, 
false), 35000);
+                       fTimer.schedule(new MyTimerTask(service, fTimer, 
false), DELAY);
                        
                        service.start();
 
@@ -67,7 +92,7 @@ public class JunitPullServiceTestCase ex
                        service.initialize();
                        Timer fTimer = new Timer("testPullService Timer");
                        // after 5secs stop the pull service
-                       fTimer.schedule(new MyTimerTask(service, fTimer, true), 
35000);
+                       fTimer.schedule(new MyTimerTask(service, fTimer, true), 
DELAY);
                        
                        service.start();
 
@@ -98,7 +123,7 @@ public class JunitPullServiceTestCase ex
                        System.out.println("----------- Starting Service 
.....");
                        Timer fTimer = new Timer();
                        //after 10sec stop the service
-                       fTimer.schedule(new MyTimerTask(service, fTimer, 
false), 40000);
+                       fTimer.schedule(new MyTimerTask(service, fTimer, 
false), DELAY);
 
                        service.start();
 
@@ -111,8 +136,74 @@ public class JunitPullServiceTestCase ex
        }
        
        @Test
-       public void testPullServiceWithProcessFailure() throws Exception {
-               System.out.println("----------------- 
testPullServiceWithProcessFailure -------------------");
+       public void testStopOnFirstError() throws Exception {
+               System.out.println("----------------- testStopOnFirstError 
-------------------");
+               int scaleout = 10;
+               super.startJetty(false);  // don't block
+               String analysisEngineDescriptor = "NoOpAE";
+               System.setProperty("ducc.deploy.JpType", "uima");
+               
+               IServiceProcessor processor =
+                               new 
UimaServiceProcessor(analysisEngineDescriptor);
+               // fail on 1st error
+               processor.setErrorHandlerWindow(1,  5);
+
+               String tasURL = "http://localhost:8080/test";;
+               
+               IService service = 
PullServiceStepBuilder.newBuilder().withProcessor(processor)
+                               .withClientURL(tasURL).withType("Note 
Service").withScaleout(scaleout)
+                               .withOptionalsDone().build();
+
+               try {
+                       System.setProperty("ProcessFail","2");
+                       service.initialize();
+                       
+                       service.start();
+
+               } catch (ServiceInitializationException e) {
+                       throw e;
+               } catch (Exception e) {
+                       throw e;
+               } finally {
+                       System.getProperties().remove("ProcessFail");
+               }
+       }
+       @Test
+       public void testTerminateOn2ErrorsInWindowOf5() throws Exception {
+               System.out.println("----------------- 
testTerminateOn2ErrorsInWindowOf5 -------------------");
+               int scaleout = 10;
+               super.startJetty(false);  // don't block
+               String analysisEngineDescriptor = "NoOpAE";
+               System.setProperty("ducc.deploy.JpType", "uima");
+               
+               IServiceProcessor processor =
+                               new 
UimaServiceProcessor(analysisEngineDescriptor);
+               // fail on 2nd error in a window of 5
+               processor.setErrorHandlerWindow(2,  5);
+               String tasURL = "http://localhost:8080/test";;
+               
+               IService service = 
PullServiceStepBuilder.newBuilder().withProcessor(processor)
+                               .withClientURL(tasURL).withType("Note 
Service").withScaleout(scaleout)
+                               .withOptionalsDone().build();
+
+               try {
+                       // fail task#1 and task#3 which should stop the test
+                       System.setProperty("ProcessFail","1,3");
+                       service.initialize();
+                       
+                       service.start();
+
+               } catch (ServiceInitializationException e) {
+                       throw e;
+               } catch (Exception e) {
+                       throw e;
+               } finally {
+                       System.getProperties().remove("ProcessFail");
+               }
+       }
+       @Test
+       public void testProcessFailureDefaultErrorHandler() throws Exception {
+               System.out.println("----------------- 
testProcessFailureDefaultErrorHandler -------------------");
                int scaleout = 2;
                super.startJetty(false);  // don't block
                String analysisEngineDescriptor = "NoOpAE";
@@ -126,11 +217,12 @@ public class JunitPullServiceTestCase ex
                                .withOptionalsDone().build();
 
                try {
-                        System.setProperty("ProcessFail","true");
+                       // fail on 2nd task. This should terminate the test
+                        System.setProperty("ProcessFail","2");
                        service.initialize();
                        Timer fTimer = new Timer("testPullService Timer");
                        // after 5secs stop the pull service
-                       fTimer.schedule(new MyTimerTask(service, fTimer, 
false), 35000);
+                       fTimer.schedule(new MyTimerTask(service, fTimer, 
false), DELAY);
                        
                        service.start();
 
@@ -142,7 +234,7 @@ public class JunitPullServiceTestCase ex
                        System.getProperties().remove("ProcessFail");
                }
        }
-       
+
        /*
        @Test
        public void testPullServiceBadClientURL() throws Exception {

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java
 Tue Jul 24 17:26:39 2018
@@ -28,6 +28,12 @@ import org.apache.uima.ducc.ps.service.m
 import org.junit.Test;
 
 public class JUnitServiceWrapperTestCase extends Client  {
+       private static final long  DELAY=5000;
+       {
+               // static initializer sets amount of time the service delays
+               // sending READY to a monitor
+               System.setProperty("ducc.service.init.delay", "3000");
+       }
 
        @Test
        public void testPullServiceWrapper() throws Exception {
@@ -52,7 +58,7 @@ public class JUnitServiceWrapperTestCase
 
                        Timer fTimer = new Timer("testPullService Timer");
                        // after 5secs stop the pull service
-                       fTimer.schedule(new MyTimerTask(service, fTimer), 
35000);
+                       fTimer.schedule(new MyTimerTask(service, fTimer), 5000);
                                
                        service.initialize(new String[] 
{analysisEngineDescriptor});
 
@@ -81,19 +87,19 @@ public class JUnitServiceWrapperTestCase
 
                String tasURL = "http://localhost:8080/test";;
                try {
-                       // Force process failure
-                       System.setProperty("ProcessFail","true");
+                       // Force process failure of the first task
+                       System.setProperty("ProcessFail","1");
                         
                        System.setProperty("ducc.deploy.JdURL", tasURL);
                        System.setProperty("ducc.deploy.JpThreadCount","4");
                        System.setProperty("ducc.deploy.service.type", 
"NotesService");
                        System.setProperty("ducc.deploy.JpType", "uima");
-
+                       // use default error window (1,1)
                        ServiceWrapper service = new ServiceWrapper();
 
                        Timer fTimer = new Timer("testPullService Timer");
                        // after 5secs stop the pull service
-                       fTimer.schedule(new MyTimerTask(service, fTimer), 
35000);
+                       fTimer.schedule(new MyTimerTask(service, fTimer), 
10000);
                                
                        service.initialize(new String[] 
{analysisEngineDescriptor});
 
@@ -113,24 +119,23 @@ public class JUnitServiceWrapperTestCase
        class MyTimerTask extends TimerTask {
                final ServiceWrapper service;
                final Timer fTimer;
+
                MyTimerTask(ServiceWrapper service, Timer fTimer) {
                        this.service = service;
                        this.fTimer = fTimer;
                }
-               
-                       @Override
-               
-                       public void run() {
-                               this.cancel();
-                               fTimer.purge();
-                               fTimer.cancel();
-                               System.out.println("Timmer popped - stopping 
service");
-                               service.stop();
-                               
-                       }
-               
-                
-               
-                   }
+
+               @Override
+
+               public void run() {
+                       this.cancel();
+                       fTimer.purge();
+                       fTimer.cancel();
+                       System.out.println("Timmer popped - stopping service");
+                       service.stop();
+
+               }
+
+       }
 
 }

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java
 Tue Jul 24 17:26:39 2018
@@ -23,6 +23,9 @@ import java.io.File;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.UimaContext;
@@ -39,12 +42,18 @@ public class NoOpAE extends CasAnnotator
     Logger logger;
     static boolean initComplete = false;
     String AE_Identifier = "*^^^^^^^^^ AE ";
-
+    private static AtomicLong processCount = new AtomicLong();
+       String errorSequence;
 
     @Override
     public void initialize(UimaContext uimaContext) throws 
ResourceInitializationException 
     {
+       processCount.set(0);
         super.initialize(uimaContext);
+       errorSequence = System.getProperty("ProcessFail");
+       if ( Objects.isNull(errorSequence)) {
+               errorSequence="";
+       }
 
         long tid = Thread.currentThread().getId();
 
@@ -103,10 +112,18 @@ public class NoOpAE extends CasAnnotator
     @Override
     public void process(CAS cas) throws AnalysisEngineProcessException 
     {
-       String data = cas.getSofaDataString();
-       if ( System.getProperty("ProcessFail") != null ) {
-               throw new AnalysisEngineProcessException(new 
RuntimeException("Simulated Exception"));
-       }
+       long val = processCount.incrementAndGet();
+       //String data = cas.getSofaDataString();
+               String[] errors = errorSequence.split(",");
+               synchronized(NoOpAE.class) {
+                       for( String inx : errors) {
+                               long errorSeq = Long.parseLong(inx.trim());
+                               if ( errorSeq == val ) {
+                                       System.out.println(">>>> Error: 
errorSeq:"+errorSeq+" processCount:"+val);
+                               throw new AnalysisEngineProcessException(new 
RuntimeException("Simulated Exception"));
+                               }
+                       }
+               }
     }
 
  


Reply via email to