Author: cwiklik
Date: Tue Jul 24 17:26:39 2018
New Revision: 1836574
URL: http://svn.apache.org/viewvc?rev=1836574&view=rev
Log:
UIMA-5843 - added support for error window
Added:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/errors/builtin/WindowBasedErrorHandler.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/metrics/builtin/
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/metrics/builtin/ProcessWindowStats.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitErrorHandlingTestSuite.java
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/ServiceConfiguration.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/builders/PullServiceStepBuilder.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/errors/builtin/DefaultErrorHandler.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/examples/processor/CustomProcessorExample.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/ServiceWrapper.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IServiceProcessor.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/AbstractServiceProcessor.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaProcessResult.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/ServiceConfiguration.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/ServiceConfiguration.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/ServiceConfiguration.java
(original)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/ServiceConfiguration.java
Tue Jul 24 17:26:39 2018
@@ -44,8 +44,23 @@ public class ServiceConfiguration {
private String ccOverrides;
private String cmOverrides;
private String aeOverrides;
+ private String maxErrors;
+ private String errorWindowSize;
private ClassLoader sysCL=null;
+ public String getMaxErrors() {
+ return maxErrors;
+ }
+ public void setMaxErrors(String maxErrors) {
+ this.maxErrors = maxErrors;
+ }
+
+ public String getErrorWindowSize() {
+ return errorWindowSize;
+ }
+ public void setErrorWindowSize(String errorWindowSize) {
+ this.errorWindowSize = errorWindowSize;
+ }
public ClassLoader getSysCL() {
return sysCL;
}
@@ -217,6 +232,9 @@ public class ServiceConfiguration {
clientURL = System.getProperty("ducc.deploy.JdURL");
threadCount = System.getProperty("ducc.deploy.JpThreadCount");
serviceType = System.getProperty("ducc.deploy.service.type");
+ maxErrors =
System.getProperty("ducc.deploy.service.error.threshold");
+ errorWindowSize =
System.getProperty("ducc.deploy.service.error.window");
+
jpType = System.getProperty("ducc.deploy.JpType");
assignedJmxPort = System.getProperty("ducc.jmx.port");
customRegistryClass =
System.getProperty("ducc.deploy.registry.class");
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/builders/PullServiceStepBuilder.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/builders/PullServiceStepBuilder.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/builders/PullServiceStepBuilder.java
(original)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/builders/PullServiceStepBuilder.java
Tue Jul 24 17:26:39 2018
@@ -103,7 +103,6 @@ public final class PullServiceStepBuilde
public interface OptionalsStep {
public OptionalsStep withScaleout(int scaleout);
public OptionalsStep withType(String type);
-
public BuildStep withOptionalsDone();
}
public interface BuildStep {
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/errors/builtin/DefaultErrorHandler.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/errors/builtin/DefaultErrorHandler.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/errors/builtin/DefaultErrorHandler.java
(original)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/errors/builtin/DefaultErrorHandler.java
Tue Jul 24 17:26:39 2018
@@ -20,59 +20,19 @@
package org.apache.uima.ducc.ps.service.errors.builtin;
import org.apache.uima.ducc.ps.service.IServiceComponent;
-import org.apache.uima.ducc.ps.service.Lifecycle;
import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler;
import org.apache.uima.ducc.ps.service.metrics.IWindowStats;
public class DefaultErrorHandler implements IServiceErrorHandler {
- private int frameWorkErrorLimit=-1; // no limit
- private Action actionOnProcessError;
- private int windowSize = 0;
- private Action actionOnExceedsWindowSize;
- private Lifecycle lifecycleMonitor;
- private long errorCount=0;
- public DefaultErrorHandler(Action action) {
-
- this.actionOnProcessError = action;
- }
- public DefaultErrorHandler() {
- this.actionOnProcessError = Action.TERMINATE;
- }
-
- public DefaultErrorHandler withMaxFrameworkErrors(int
maxFrameworkError) {
- this.frameWorkErrorLimit = maxFrameworkError;
- return this;
- }
- public DefaultErrorHandler withProcessErrorWindow(int errorWindow,
Action errorAction ) {
- this.actionOnExceedsWindowSize = errorAction;
- this.windowSize = errorWindow;
- return this;
- }
- private boolean exceedsProcessWindow() {
- return (errorCount % windowSize == 0);
- }
@Override
public Action handleProcessError(Exception e, IServiceComponent source,
IWindowStats stats) {
-
return Action.TERMINATE;
}
@Override
public Action handle(Exception e, IServiceComponent source) {
- errorCount++;
- if ( exceedsProcessWindow() ) {
- Thread t = new Thread( new Runnable() {
- public void run() {
- lifecycleMonitor.stop();
- }
- });
- t.start();
-
- }
-
-
-
- return Action.TERMINATE; }
+ return Action.TERMINATE;
+ }
}
Added:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/errors/builtin/WindowBasedErrorHandler.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/errors/builtin/WindowBasedErrorHandler.java?rev=1836574&view=auto
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/errors/builtin/WindowBasedErrorHandler.java
(added)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/errors/builtin/WindowBasedErrorHandler.java
Tue Jul 24 17:26:39 2018
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+package org.apache.uima.ducc.ps.service.errors.builtin;
+
+import java.util.Arrays;
+
+import org.apache.uima.ducc.ps.service.IServiceComponent;
+import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler;
+import org.apache.uima.ducc.ps.service.metrics.IWindowStats;
+
+public class WindowBasedErrorHandler implements IServiceErrorHandler {
+
+ private int errorThreshold=1;
+ private int windowSize = 1;
+ private long errorCount=0;
+ private long errorSequences[];
+
+
+ public WindowBasedErrorHandler withMaxFrameworkErrors(int
maxFrameworkError) {
+ this.errorThreshold = maxFrameworkError;
+ return this;
+ }
+ public WindowBasedErrorHandler withProcessErrorWindow(int errorWindow )
{
+ this.windowSize = errorWindow;
+ return this;
+ }
+ public WindowBasedErrorHandler build() {
+ if (windowSize >= errorThreshold && errorThreshold > 1) {
+ errorSequences = new long[errorThreshold - 1];
+ Arrays.fill(errorSequences, -windowSize);
+ }
+ return this;
+ }
+
+ public boolean exceededErrorWindow(long taskCount) {
+ if (errorThreshold == 0) {
+ return false;
+ }
+ ++errorCount;
+
+ // If no window check if total errors have REACHED the threshold
+ if (errorSequences == null) {
+ return (errorCount >= errorThreshold);
+ }
+ // Insert in array by replacing one that is outside the window.
+ int i = errorThreshold - 1;
+ while (--i >= 0) {
+ if (errorSequences[i] <= taskCount - windowSize) {
+ errorSequences[i] = taskCount;
+ return false;
+ }
+ }
+ // If insert fails then have reached threshold.
+ // Should not be called again after returning true as may
return false!
+ // But may be called again if no action specified, but then it
doesn't matter.
+ return true;
+ }
+
+ @Override
+ public Action handleProcessError(Exception e, IServiceComponent source,
IWindowStats stats) {
+ Action action = Action.CONTINUE;
+ if ( exceededErrorWindow(stats.getSuccessCount()) ) {
+ action = Action.TERMINATE;
+ }
+ return action;
+ }
+ @Override
+ public Action handle(Exception e, IServiceComponent source) {
+ return Action.TERMINATE;
+ }
+
+
+}
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/examples/processor/CustomProcessorExample.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/examples/processor/CustomProcessorExample.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/examples/processor/CustomProcessorExample.java
(original)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/examples/processor/CustomProcessorExample.java
Tue Jul 24 17:26:39 2018
@@ -51,4 +51,10 @@ public class CustomProcessorExample impl
logger.log(Level.INFO,"... getScaleout() called");
return 1;
}
+
+ @Override
+ public void setErrorHandlerWindow(int maxErrors, int windowSize) {
+ logger.log(Level.INFO,"... setErrorHandlerWindow() called");
+
+ }
}
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java
(original)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java
Tue Jul 24 17:26:39 2018
@@ -166,9 +166,6 @@ public class PullService implements ISer
// this down just before thread dies.
CountDownLatch stopLatch = new CountDownLatch(scaleout);
serviceProcessor.setScaleout(scaleout);
-// if ( serviceProcessor instanceof IScaleable ) {
-// ((IScaleable)
serviceProcessor).setScaleout(scaleout);
-// }
// add default protocol handler
protocolHandler =
new
DefaultServiceProtocolHandler.Builder()
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/ServiceWrapper.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/ServiceWrapper.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/ServiceWrapper.java
(original)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/ServiceWrapper.java
Tue Jul 24 17:26:39 2018
@@ -28,8 +28,11 @@ import org.apache.uima.ducc.ps.service.I
import org.apache.uima.ducc.ps.service.ServiceConfiguration;
import org.apache.uima.ducc.ps.service.builders.PullServiceStepBuilder;
import org.apache.uima.ducc.ps.service.dgen.DeployableGeneration;
+import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler;
import org.apache.uima.ducc.ps.service.errors.ServiceException;
import org.apache.uima.ducc.ps.service.errors.ServiceInitializationException;
+import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler.Action;
+import org.apache.uima.ducc.ps.service.errors.builtin.WindowBasedErrorHandler;
import org.apache.uima.ducc.ps.service.jmx.JMXAgent;
import org.apache.uima.ducc.ps.service.processor.IServiceProcessor;
import org.apache.uima.ducc.ps.service.processor.uima.UimaAsServiceProcessor;
@@ -71,6 +74,7 @@ public class ServiceWrapper {
*/
private IServiceProcessor createProcessor(String
analysisEngineDescriptorPath, String[] args)
throws ServiceInitializationException{
+ IServiceProcessor serviceProcessor=null;
if ( serviceConfiguration.getCustomProcessorClass() != null ) {
try {
Class<?> clz =
Class.forName(serviceConfiguration.getCustomProcessorClass());
@@ -78,20 +82,48 @@ public class ServiceWrapper {
if ( !IServiceProcessor.class.isAssignableFrom(clz) ) {
throw new
ServiceInitializationException(serviceConfiguration.getCustomProcessorClass()+"
Processor Class does not implement IServiceProcessor ");
}
- return (IServiceProcessor) clz.newInstance();
+ serviceProcessor = (IServiceProcessor)
clz.newInstance();
+ int maxErrors = 0;
+ int windowSize = 0;
+ if ( serviceConfiguration.getMaxErrors() != null ) {
+ maxErrors =
Integer.parseInt(serviceConfiguration.getMaxErrors());
+ }
+ if ( serviceConfiguration.getMaxErrors() != null ) {
+ windowSize =
Integer.parseInt(serviceConfiguration.getErrorWindowSize());
+ }
+ serviceProcessor.setErrorHandlerWindow(maxErrors,
windowSize);
+
} catch( Exception e) {
logger.log(Level.WARNING,"",e);
throw new
ServiceInitializationException("Unable to instantiate Custom Processor from
class:"+serviceConfiguration.getCustomProcessorClass());
}
} else {
- if ( "uima".equals(serviceConfiguration.getJpType() )
){
- return new
UimaServiceProcessor(analysisEngineDescriptorPath, serviceConfiguration);
+ if ( "uima".equals(serviceConfiguration.getJpType() )
) {
+ serviceProcessor = new
UimaServiceProcessor(analysisEngineDescriptorPath, serviceConfiguration);
+
} else if (
"uima-as".equals(serviceConfiguration.getJpType()) ) {
- return new UimaAsServiceProcessor(args,
serviceConfiguration);
+ serviceProcessor = new
UimaAsServiceProcessor(args, serviceConfiguration);
+
} else {
throw new RuntimeException("Invalid deployment.
Set either -Dducc.deploy.JpType=[uima,uima-as] or provide
-Dducc.deploy.custom.processor.class=XX where XX implements IServiceProcessor
");
}
}
+ return serviceProcessor;
+ }
+ private IServiceErrorHandler getErrorHandler() {
+ int maxErrors = 1;
+ int windowSize = 1;
+
+ if ( serviceConfiguration.getMaxErrors() != null ) {
+ maxErrors =
Integer.parseInt(serviceConfiguration.getMaxErrors());
+ }
+ if ( serviceConfiguration.getErrorWindowSize() != null ) {
+ windowSize =
Integer.parseInt(serviceConfiguration.getErrorWindowSize());
+ }
+ // Error handler which terminates service on the 1st error
+ return new WindowBasedErrorHandler()
+ .withMaxFrameworkErrors(maxErrors)
+ .withProcessErrorWindow(windowSize).build();
}
/**
* Check if AE descriptor is provided or we need to create it from parts
Added:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/metrics/builtin/ProcessWindowStats.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/metrics/builtin/ProcessWindowStats.java?rev=1836574&view=auto
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/metrics/builtin/ProcessWindowStats.java
(added)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/metrics/builtin/ProcessWindowStats.java
Tue Jul 24 17:26:39 2018
@@ -0,0 +1,50 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.ps.service.metrics.builtin;
+
+import org.apache.uima.ducc.ps.service.metrics.IWindowStats;
+
+public class ProcessWindowStats implements IWindowStats {
+ private final long errorCount;
+ private final long successCount;
+ private final long errorsSinceLastSuccess;
+
+ public ProcessWindowStats(long errorCount, long successCount, long
errorsSinceLastSuccess) {
+ this.errorCount = errorCount;
+ this.successCount = successCount;
+ this.errorsSinceLastSuccess = errorsSinceLastSuccess;
+ }
+
+ @Override
+ public long getErrorCount() {
+ return errorCount;
+ }
+
+ @Override
+ public long getSuccessCount() {
+ return successCount;
+ }
+
+ @Override
+ public long getErrorCountSinceLastSuccess() {
+ return errorsSinceLastSuccess;
+ }
+
+}
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IServiceProcessor.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IServiceProcessor.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IServiceProcessor.java
(original)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/IServiceProcessor.java
Tue Jul 24 17:26:39 2018
@@ -33,4 +33,6 @@ public interface IServiceProcessor exten
public void setScaleout(int scaleout);
public int getScaleout();
+
+ public void setErrorHandlerWindow(int maxErrors, int windowSize);
}
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/AbstractServiceProcessor.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/AbstractServiceProcessor.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/AbstractServiceProcessor.java
(original)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/AbstractServiceProcessor.java
Tue Jul 24 17:26:39 2018
@@ -20,7 +20,10 @@ package org.apache.uima.ducc.ps.service.
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler;
+import org.apache.uima.ducc.ps.service.errors.builtin.WindowBasedErrorHandler;
import org.apache.uima.ducc.ps.service.utils.UimaSerializer;
import org.apache.uima.util.Level;
import org.apache.uima.util.Logger;
@@ -28,12 +31,32 @@ import org.apache.uima.util.Logger;
public class AbstractServiceProcessor {
// Map to store DuccUimaSerializer instances. Each has affinity to a
thread
protected Map<Long, UimaSerializer> serializerMap = new HashMap<>();
+ protected AtomicLong errorCount = new AtomicLong();
+ protected AtomicLong successCount = new AtomicLong();
+ protected AtomicLong errorCountSinceLastSuccess = new AtomicLong();
+ protected int maxErrors=1; // default is to fail on 1st error
+ protected int windowSize=1;
+ protected int DEFAULT_INIT_DELAY=30000;
+
+ protected IServiceErrorHandler getErrorHandler() {
+ // concrete implementation of this abstract class should
+ // provide a way to set maxErrors and windowSize. The
IServiceProcessor
+ // provides a setter method for overriding default values
+ return new WindowBasedErrorHandler().
+ withMaxFrameworkErrors(maxErrors).
+ withProcessErrorWindow(windowSize).build();
+
+ }
protected void delay(Logger logger, long howLong) {
+ long delay = DEFAULT_INIT_DELAY;
+ if ( System.getProperty("ducc.service.init.delay") != null ) {
+ delay =
Long.parseLong(System.getProperty("ducc.service.init.delay").trim());
+ }
logger.log(Level.INFO, "Wait for the initialized state to
propagate to the SM " +
- "so any processing errors are not treates as
initialization failures");
+ "so any processing errors are not treates as
initialization failures - delay="+delay);
try {
- Thread.sleep(30000);
+ Thread.sleep(delay);
} catch (InterruptedException e1) {
}
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java
(original)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaAsServiceProcessor.java
Tue Jul 24 17:26:39 2018
@@ -41,8 +41,11 @@ import org.apache.uima.ducc.ps.service.I
import org.apache.uima.ducc.ps.service.ServiceConfiguration;
import org.apache.uima.ducc.ps.service.dgen.DeployableGenerator;
import org.apache.uima.ducc.ps.service.dgen.DuccUimaReferenceByName;
+import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler;
import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler.Action;
import org.apache.uima.ducc.ps.service.errors.ServiceInitializationException;
+import org.apache.uima.ducc.ps.service.metrics.IWindowStats;
+import org.apache.uima.ducc.ps.service.metrics.builtin.ProcessWindowStats;
import org.apache.uima.ducc.ps.service.monitor.IServiceMonitor;
import org.apache.uima.ducc.ps.service.monitor.builtin.RemoteStateObserver;
import org.apache.uima.ducc.ps.service.processor.IProcessResult;
@@ -62,7 +65,6 @@ public class UimaAsServiceProcessor exte
Logger logger = UIMAFramework.getLogger(UimaServiceProcessor.class);
// Map to store DuccUimaSerializer instances. Each has affinity to a
thread
-// private Map<Long, UimaSerializer> serializerMap = new HashMap<>();
private static Object brokerInstance = null;
private UimaAsClientWrapper uimaASClient = null;
private String saxonURL = null;
@@ -82,13 +84,13 @@ public class UimaAsServiceProcessor exte
private static Object platformMBeanServer;
private ServiceConfiguration serviceConfiguration;
private IServiceMonitor monitor;
+ private IServiceErrorHandler errorHandler;
public volatile boolean initialized = false;
private String[] deploymentDescriptors = null;
private String[] ids = null;
private String duccHome = null;
boolean enablePerformanceBreakdownReporting = false;
private AtomicInteger numberOfInitializedThreads = new AtomicInteger();
-
static {
// try to get platform MBean Server (Java 1.5 only)
try {
@@ -101,12 +103,12 @@ public class UimaAsServiceProcessor exte
}
private String[] args;
+
public UimaAsServiceProcessor(String[] args, ServiceConfiguration
serviceConfiguration) {
this.args = args;
this.serviceConfiguration = serviceConfiguration;
// start a thread which will collect AE initialization state
launchStateInitializationCollector();
-
}
@Override
@@ -118,7 +120,10 @@ public class UimaAsServiceProcessor exte
public int getScaleout() {
return scaleout;
}
-
+ public void setErrorHandlerWindow(int maxErrors, int windowSize) {
+ this.maxErrors = maxErrors;
+ this.windowSize = windowSize;
+ }
private int generateDescriptorsAndGetScaleout(String[] args) throws
Exception {
deploymentDescriptors = getDescriptors(args);
ids = new String[deploymentDescriptors.length];
@@ -157,6 +162,7 @@ public class UimaAsServiceProcessor exte
}
// Needed to resolve ${queue.name} placeholder in DD
generated by DUCC
System.setProperty(queuePropertyName, endpointName);
+ errorHandler = getErrorHandler();
// generate Spring context file once
synchronized (UimaAsServiceProcessor.class) {
@@ -195,7 +201,7 @@ public class UimaAsServiceProcessor exte
doDeploy();
}
if ( numberOfInitializedThreads.incrementAndGet() ==
scaleout ) {
- super.delay(logger, 30000);
+ super.delay(logger, DEFAULT_INIT_DELAY);
monitor.onStateChange(IServiceState.State.Running.toString(), new Properties());
}
@@ -232,21 +238,29 @@ public class UimaAsServiceProcessor exte
matcher.appendTail(sb);
return sb.toString();
}
+
+ private CAS getCAS(String serializedTask) throws Exception {
+ CAS cas = uimaASClient.getCAS();
+ // DUCC JP services are given a serialized CAS ... others just
the doc-text for
+ // a CAS
+ if (serviceConfiguration.getJpType() != null) {
+ // Use thread dedicated UimaSerializer to de-serialize
the CAS
+
getUimaSerializer().deserializeCasFromXmi(serializedTask, cas);
+ } else {
+ cas.setDocumentText(serializedTask);
+ cas.setDocumentLanguage("en");
+ }
+ return cas;
+ }
+
+
@Override
public IProcessResult process(String serializedTask) {
CAS cas = null;
IProcessResult result;
try {
- cas = uimaASClient.getCAS();
- // DUCC JP services are given a serialized CAS ...
others just the doc-text for a CAS
- if (serviceConfiguration.getJpType() != null) {
- // Use thread dedicated UimaSerializer to
de-serialize the CAS
-
getUimaSerializer().deserializeCasFromXmi(serializedTask, cas);
- } else {
- cas.setDocumentText(serializedTask);
- cas.setDocumentLanguage("en");
- }
+ cas = getCAS(serializedTask);
List<PerformanceMetrics> casMetrics = new ArrayList<>();
if (enablePerformanceBreakdownReporting) {
@@ -254,9 +268,18 @@ public class UimaAsServiceProcessor exte
try {
uimaASClient.sendAndReceive(cas,
perfMetrics);
+ successCount.incrementAndGet();
+ errorCountSinceLastSuccess.set(0);
+
} catch (Exception t) {
logger.log(Level.WARNING, "", t);
- result = new UimaProcessResult(t,
Action.TERMINATE);
+ IWindowStats stats =
+ new
ProcessWindowStats(errorCount.incrementAndGet(),
+
successCount.get(),
+
errorCountSinceLastSuccess.incrementAndGet());
+ Action action =
+
errorHandler.handleProcessError(t, this, stats);
+ result = new UimaProcessResult(t,
action);
return result;
}
@@ -280,10 +303,21 @@ public class UimaAsServiceProcessor exte
} else {
// delegate processing to the UIMA-AS service
and wait for a reply
try {
+
uimaASClient.sendAndReceive(cas);
+ successCount.incrementAndGet();
+ errorCountSinceLastSuccess.set(0);
+
} catch (Exception t) {
logger.log(Level.WARNING, "", t);
- result = new UimaProcessResult(t,
Action.TERMINATE);
+ IWindowStats stats =
+ new
ProcessWindowStats(errorCount.incrementAndGet(),
+
successCount.get(),
+
errorCountSinceLastSuccess.incrementAndGet());
+ Action action =
+
errorHandler.handleProcessError(t, this, stats);
+
+ result = new UimaProcessResult(t,
action);
return result;
}
PerformanceMetrics pm = new PerformanceMetrics(
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaProcessResult.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaProcessResult.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaProcessResult.java
(original)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaProcessResult.java
Tue Jul 24 17:26:39 2018
@@ -20,6 +20,7 @@ package org.apache.uima.ducc.ps.service.
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.util.Objects;
import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler.Action;
import org.apache.uima.ducc.ps.service.processor.IProcessResult;
@@ -47,6 +48,9 @@ public class UimaProcessResult implement
}
@Override
public String getError() {
+ if ( Objects.isNull(exception)) {
+ return null;
+ }
StringWriter sw = new StringWriter();
exception.printStackTrace(new PrintWriter(sw));
return sw.toString();
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java
(original)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/processor/uima/UimaServiceProcessor.java
Tue Jul 24 17:26:39 2018
@@ -21,7 +21,6 @@ package org.apache.uima.ducc.ps.service.
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
//import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
@@ -36,6 +35,8 @@ import org.apache.uima.ducc.ps.service.S
//import org.apache.uima.ducc.ps.service.dgen.DeployableGeneration;
import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler;
import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler.Action;
+import org.apache.uima.ducc.ps.service.metrics.IWindowStats;
+import org.apache.uima.ducc.ps.service.metrics.builtin.ProcessWindowStats;
//import org.apache.uima.ducc.ps.service.jmx.JmxAEProcessInitMonitor;
import org.apache.uima.ducc.ps.service.monitor.IServiceMonitor;
import org.apache.uima.ducc.ps.service.monitor.builtin.RemoteStateObserver;
@@ -77,6 +78,7 @@ public class UimaServiceProcessor extend
private ServiceConfiguration serviceConfiguration;
private IServiceMonitor monitor;
private AtomicInteger numberOfInitializedThreads = new AtomicInteger();
+ private IServiceErrorHandler errorHandler;
static {
// try to get platform MBean Server (Java 1.5 only)
@@ -104,8 +106,25 @@ public class UimaServiceProcessor extend
if (serviceConfiguration.getJpType() != null) {
serializerMap = new HashMap<>();
}
+ // check if error window override has been set via -D
+ if ( serviceConfiguration.getMaxErrors() != null ) {
+ this.maxErrors =
Integer.parseInt(serviceConfiguration.getMaxErrors());
+ }
+ // check if error window override has been set via -D
+ if ( serviceConfiguration.getErrorWindowSize() != null ) {
+ this.windowSize =
Integer.parseInt(serviceConfiguration.getErrorWindowSize());
+ }
+ }
+ /*
+ * Defines error handling parameters
+ *
+ * @param maxErrors - maximum error threshold within an error window
+ * @param windowSize - error window size
+ */
+ public void setErrorHandlerWindow(int maxErrors, int windowSize) {
+ this.maxErrors = maxErrors;
+ this.windowSize = windowSize;
}
-
private void launchStateInitializationCollector() {
monitor =
new RemoteStateObserver(serviceConfiguration,
logger);
@@ -124,6 +143,8 @@ public class UimaServiceProcessor extend
logger.log(Level.FINE, "Process Thread:"+
Thread.currentThread().getName()+" Initializing AE");
}
+ errorHandler = getErrorHandler();
+
try {
// multiple threads may call this method. Send
initializing state once
initStateLock.lockInterruptibly();
@@ -176,7 +197,7 @@ public class UimaServiceProcessor extend
}
if ( numberOfInitializedThreads.incrementAndGet() == scaleout )
{
- super.delay(logger, 30000);
+ super.delay(logger, DEFAULT_INIT_DELAY);
monitor.onStateChange(IServiceState.State.Running.toString(), new Properties());
}
}
@@ -189,10 +210,6 @@ public class UimaServiceProcessor extend
casPool = new CasPool(scaleout, analysisEngineMetadata, rm);
}
-// private UimaSerializer getUimaSerializer() {
-//
-// return serializerMap.get(Thread.currentThread().getId());
-// }
@Override
public IProcessResult process(String serializedTask) {
AnalysisEngine ae = null;
@@ -229,13 +246,22 @@ public class UimaServiceProcessor extend
// get the delta
List<PerformanceMetrics> casMetrics =
UimaMetricsGenerator.getDelta(
afterAnalysis, beforeAnalysis);
+
+ successCount.incrementAndGet();
+ errorCountSinceLastSuccess.set(0);
return new
UimaProcessResult(resultSerializer.serialize(casMetrics));
} catch( Exception e ) {
logger.log(Level.WARNING,"",e);
- result = new UimaProcessResult(e, Action.TERMINATE);
+ IWindowStats stats =
+ new
ProcessWindowStats(errorCount.incrementAndGet(),
+ successCount.get(),
+
errorCountSinceLastSuccess.incrementAndGet());
+ Action action =
+ errorHandler.handleProcessError(e,
this, stats);
+
+ result = new UimaProcessResult(e, action);
return result;
- }
- finally {
+ } finally {
if (cas != null) {
casPool.releaseCas(cas);
@@ -245,7 +271,7 @@ public class UimaServiceProcessor extend
public void setErrorHandler(IServiceErrorHandler errorHandler) {
-
+ this.errorHandler = errorHandler;
}
@Override
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
(original)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
Tue Jul 24 17:26:39 2018
@@ -19,6 +19,7 @@
package org.apache.uima.ducc.ps.service.protocol.builtin;
import java.io.InvalidClassException;
+import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
@@ -237,15 +238,20 @@ public class DefaultServiceProtocolHandl
// assume success
Action action = Action.CONTINUE;
+ // check if process error occurred.
+ String errorAsString = processResult.getError();
+
if (processResult.terminateProcess()) {
action = Action.TERMINATE;
- String errorAsString =
processResult.getError();
+ } else if ( Objects.isNull(errorAsString)){
+ // success
+
transaction.getMetaTask().setPerformanceMetrics(processResult.getResult());
+ }
+ if ( Objects.nonNull(errorAsString ) ) {
IMetaTask mc =
transaction.getMetaTask();
mc.setUserSpaceException(errorAsString);
- } else {
- // success
-
transaction.getMetaTask().setPerformanceMetrics(processResult.getResult());
}
+
// send END Request
callEnd(transaction);
if (running && Action.TERMINATE.equals(action))
{
@@ -275,16 +281,19 @@ public class DefaultServiceProtocolHandl
}
}
stopLatch.countDown();
+ logger.log(Level.INFO,"ProtocolHandler stopped requesting new
tasks - Stopping processor");
+
if ( processor != null ) {
processor.stop();
}
- logger.log(Level.INFO,"ProtocolHandler terminated");
return String.valueOf(Thread.currentThread().getId());
}
private void delegateStop() {
- service.stop(); // dont quiesce
+ service.stop(); // dont quiesce
+
+ //service.quiesceAndStop();
}
@Override
public void stop() {
@@ -298,15 +307,14 @@ public class DefaultServiceProtocolHandl
public void quiesceAndStop() {
quiescing = true;
running = false;
- if ( logger.isLoggable(Level.INFO)) {
- logger.log(Level.INFO, this.getClass().getName()+"
quiesceAndStop() called");
- }
+ logger.log(Level.INFO, this.getClass().getName()+"
quiesceAndStop() called");
try {
// wait for process threads to terminate
stopLatch.await();
- logger.log(Level.INFO, this.getClass().getName()+" All
process threads completed quiesce");
} catch( Exception e ) {
+
}
+ logger.log(Level.INFO, this.getClass().getName()+" All process
threads completed quiesce");
}
@Override
public void start() {
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java
(original)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java
Tue Jul 24 17:26:39 2018
@@ -21,6 +21,7 @@ package org.apache.uima.ducc.ps;
import java.io.BufferedReader;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
@@ -49,6 +50,7 @@ import org.junit.After;
public class Client {
private Server server;
private boolean block = false;
+ private AtomicLong errorCount = new AtomicLong();
private final static String app="test";
private int httpPort = 8080;
private int maxThreads = 50;
@@ -162,6 +164,9 @@ public class Client {
case End:
System.out.println("---- Driver
handling END Request - "+imt.getMetaTask().getAppData());
//handleMetaCasTransationEnd(trans, taskConsumer);
+ if (
imt.getMetaTask().getUserSpaceException() != null ) {
+
System.out.println("Client received error#"+errorCount.incrementAndGet());
+ }
break;
case InvestmentReset:
//
handleMetaCasTransationInvestmentReset(trans, rwt);
@@ -203,6 +208,9 @@ public class Client {
}
}
+ public long getErrorCount() {
+ return errorCount.get();
+ }
private IMetaTask getMetaCas(String serializedCas) {
if ( serializedCas == null ) {
return null;
Added:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitErrorHandlingTestSuite.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitErrorHandlingTestSuite.java?rev=1836574&view=auto
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitErrorHandlingTestSuite.java
(added)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitErrorHandlingTestSuite.java
Tue Jul 24 17:26:39 2018
@@ -0,0 +1,140 @@
+package org.apache.uima.ducc.ps.service;
+
+import org.apache.uima.ducc.ps.service.errors.builtin.WindowBasedErrorHandler;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+public class JunitErrorHandlingTestSuite {
+
+ public JunitErrorHandlingTestSuite() {
+
+
+ }
+
+ @Test
+ public void testNoWindow() throws Exception {
+ // fails on 3rd error
+ WindowBasedErrorHandler errorHandler =
+ new WindowBasedErrorHandler();
+ errorHandler.withMaxFrameworkErrors(3).build();
+ int taskCount = 10;
+ int failures=0;
+ for( int i = 0; i < taskCount; i++) {
+ failures++;
+ if ( errorHandler.exceededErrorWindow(i) ) {
+ break;
+ }
+ }
+ // there should be 3 failures
+ Assert.assertEquals(3, failures);
+ }
+ @Test
+ public void testDefaults() throws Exception {
+ // fails on 1st error
+ WindowBasedErrorHandler errorHandler =
+ new WindowBasedErrorHandler();
+ errorHandler.build();
+ int taskCount = 10;
+ int failures=0;
+ for( int i = 0; i < taskCount; i++) {
+ failures++;
+ if ( errorHandler.exceededErrorWindow(i) ) {
+ break;
+ }
+ }
+ // there should be 1 failure
+ Assert.assertEquals(1, failures);
+ }
+ @Test
+ public void testErrorFirstTask() throws Exception {
+ // fails on 3rd error
+ WindowBasedErrorHandler errorHandler =
+ new WindowBasedErrorHandler();
+
errorHandler.withMaxFrameworkErrors(1).withProcessErrorWindow(1).build();
+ int taskCount = 10;
+ int failures=0;
+ for( int i = 0; i < taskCount; i++) {
+ failures++;
+ if ( errorHandler.exceededErrorWindow(i) ) {
+ break;
+ }
+ }
+ // there should be 1 failure
+ Assert.assertEquals(1, failures);
+ }
+ @Test
+ public void testZeroThresholdWindow() throws Exception {
+ WindowBasedErrorHandler errorHandler =
+ new WindowBasedErrorHandler();
+ errorHandler.withMaxFrameworkErrors(0).
+ withProcessErrorWindow(5).build();
+ int taskCount = 10;
+ for( int i = 0; i < taskCount; i++) {
+ // simulate failure on task3. The test should not fail
since max errors is 0, meaning
+ // don't ever fail.
+ if ( i == 3 ) {
+ if ( errorHandler.exceededErrorWindow(i) ) {
+ Assert.fail("Unexpected test failure -
window=5 maxErrors=0 taskCount:"+i);
+ }
+ }
+ }
+ }
+ @Test
+ public void testErrorsLessWindow() throws Exception {
+ WindowBasedErrorHandler errorHandler =
+ new WindowBasedErrorHandler();
+ errorHandler.withMaxFrameworkErrors(3).
+ withProcessErrorWindow(5).build();
+ int taskCount = 10;
+ for( int i = 0; i < taskCount; i++) {
+ // simulate failure on task1, task3, and task9. Setup
calls for failure
+ // if 3 errors occur within 5 tasks window. In this
case, task1 and task3
+ // should not cause a failure. Also task9 should not
cause error since
+ // its a new window.
+ if ( i == 1 || i == 3 || i == 9) {
+ if ( errorHandler.exceededErrorWindow(i) ) {
+ Assert.fail("Unexpected test failure -
window=5 maxErrors=3 taskCount:"+i);
+ }
+ }
+ }
+ }
+ @Test
+ public void testErrorsExceedingWindow() throws Exception {
+ WindowBasedErrorHandler errorHandler =
+ new WindowBasedErrorHandler();
+ errorHandler.withMaxFrameworkErrors(3).
+ withProcessErrorWindow(8).build();
+ int taskCount = 10;
+ for( int i = 0; i < taskCount; i++) {
+ // simulate failure on task1, task3, and task7. Setup
calls for failure
+ // if 3 errors occur within 8 tasks window. In this
case, task1, task3, and task7
+ // should cause failure.
+ if ( i == 1 || i == 3 || i == 7) {
+ if ( errorHandler.exceededErrorWindow(i) ) {
+ return;
+ }
+ }
+ }
+ Assert.fail("Unexpected test success - Should have failed -
window=8 maxErrors=3 taskCount:"+taskCount);
+ }
+}
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java
(original)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java
Tue Jul 24 17:26:39 2018
@@ -1,4 +1,21 @@
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
package org.apache.uima.ducc.ps.service;
import java.util.Timer;
@@ -7,16 +24,23 @@ import java.util.concurrent.CountDownLat
import org.apache.uima.ducc.ps.Client;
import org.apache.uima.ducc.ps.service.builders.PullServiceStepBuilder;
+import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler;
import org.apache.uima.ducc.ps.service.errors.ServiceInitializationException;
+import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler.Action;
+import org.apache.uima.ducc.ps.service.errors.builtin.WindowBasedErrorHandler;
import org.apache.uima.ducc.ps.service.processor.IServiceProcessor;
import org.apache.uima.ducc.ps.service.processor.uima.UimaServiceProcessor;
import org.junit.Test;
public class JunitPullServiceTestCase extends Client {
-
+ private static final long DELAY=5000;
CountDownLatch threadsReady;
CountDownLatch stopLatch;
-
+ {
+ // static initializer sets amount of time the service delays
+ // sending READY to a monitor
+ System.setProperty("ducc.service.init.delay", "3000");
+ }
@Test
public void testPullService() throws Exception {
System.out.println("----------------- testPullService
-------------------");
@@ -24,9 +48,10 @@ public class JunitPullServiceTestCase ex
super.startJetty(false); // don't block
String analysisEngineDescriptor = "TestAAE";
System.setProperty("ducc.deploy.JpType", "uima");
+
IServiceProcessor processor = new
UimaServiceProcessor(analysisEngineDescriptor);
-
+
String tasURL = "http://localhost:8080/test";
IService service =
PullServiceStepBuilder.newBuilder().withProcessor(processor)
@@ -37,7 +62,7 @@ public class JunitPullServiceTestCase ex
service.initialize();
Timer fTimer = new Timer("testPullService Timer");
// after 5secs stop the pull service
- fTimer.schedule(new MyTimerTask(service, fTimer,
false), 35000);
+ fTimer.schedule(new MyTimerTask(service, fTimer,
false), DELAY);
service.start();
@@ -67,7 +92,7 @@ public class JunitPullServiceTestCase ex
service.initialize();
Timer fTimer = new Timer("testPullService Timer");
// after 5secs stop the pull service
- fTimer.schedule(new MyTimerTask(service, fTimer, true),
35000);
+ fTimer.schedule(new MyTimerTask(service, fTimer, true),
DELAY);
service.start();
@@ -98,7 +123,7 @@ public class JunitPullServiceTestCase ex
System.out.println("----------- Starting Service
.....");
Timer fTimer = new Timer();
//after 10sec stop the service
- fTimer.schedule(new MyTimerTask(service, fTimer,
false), 40000);
+ fTimer.schedule(new MyTimerTask(service, fTimer,
false), DELAY);
service.start();
@@ -111,8 +136,74 @@ public class JunitPullServiceTestCase ex
}
@Test
- public void testPullServiceWithProcessFailure() throws Exception {
- System.out.println("-----------------
testPullServiceWithProcessFailure -------------------");
+ public void testStopOnFirstError() throws Exception {
+ System.out.println("----------------- testStopOnFirstError
-------------------");
+ int scaleout = 10;
+ super.startJetty(false); // don't block
+ String analysisEngineDescriptor = "NoOpAE";
+ System.setProperty("ducc.deploy.JpType", "uima");
+
+ IServiceProcessor processor =
+ new
UimaServiceProcessor(analysisEngineDescriptor);
+ // fail on 1st error
+ processor.setErrorHandlerWindow(1, 5);
+
+ String tasURL = "http://localhost:8080/test";
+
+ IService service =
PullServiceStepBuilder.newBuilder().withProcessor(processor)
+ .withClientURL(tasURL).withType("Note
Service").withScaleout(scaleout)
+ .withOptionalsDone().build();
+
+ try {
+ System.setProperty("ProcessFail","2");
+ service.initialize();
+
+ service.start();
+
+ } catch (ServiceInitializationException e) {
+ throw e;
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ System.getProperties().remove("ProcessFail");
+ }
+ }
+ @Test
+ public void testTerminateOn2ErrorsInWindowOf5() throws Exception {
+ System.out.println("-----------------
testTerminateOn2ErrorsInWindowOf5 -------------------");
+ int scaleout = 10;
+ super.startJetty(false); // don't block
+ String analysisEngineDescriptor = "NoOpAE";
+ System.setProperty("ducc.deploy.JpType", "uima");
+
+ IServiceProcessor processor =
+ new
UimaServiceProcessor(analysisEngineDescriptor);
+ // fail on 2nd error in a window of 5
+ processor.setErrorHandlerWindow(2, 5);
+ String tasURL = "http://localhost:8080/test";
+
+ IService service =
PullServiceStepBuilder.newBuilder().withProcessor(processor)
+ .withClientURL(tasURL).withType("Note
Service").withScaleout(scaleout)
+ .withOptionalsDone().build();
+
+ try {
+ // fail task#1 and task#3 which should stop the test
+ System.setProperty("ProcessFail","1,3");
+ service.initialize();
+
+ service.start();
+
+ } catch (ServiceInitializationException e) {
+ throw e;
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ System.getProperties().remove("ProcessFail");
+ }
+ }
+ @Test
+ public void testProcessFailureDefaultErrorHandler() throws Exception {
+ System.out.println("-----------------
testProcessFailureDefaultErrorHandler -------------------");
int scaleout = 2;
super.startJetty(false); // don't block
String analysisEngineDescriptor = "NoOpAE";
@@ -126,11 +217,12 @@ public class JunitPullServiceTestCase ex
.withOptionalsDone().build();
try {
- System.setProperty("ProcessFail","true");
+ // fail on 2nd task. This should terminate the test
+ System.setProperty("ProcessFail","2");
service.initialize();
Timer fTimer = new Timer("testPullService Timer");
// after 5secs stop the pull service
- fTimer.schedule(new MyTimerTask(service, fTimer,
false), 35000);
+ fTimer.schedule(new MyTimerTask(service, fTimer,
false), DELAY);
service.start();
@@ -142,7 +234,7 @@ public class JunitPullServiceTestCase ex
System.getProperties().remove("ProcessFail");
}
}
-
+
/*
@Test
public void testPullServiceBadClientURL() throws Exception {
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java
(original)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java
Tue Jul 24 17:26:39 2018
@@ -28,6 +28,12 @@ import org.apache.uima.ducc.ps.service.m
import org.junit.Test;
public class JUnitServiceWrapperTestCase extends Client {
+ private static final long DELAY=5000;
+ {
+ // static initializer sets amount of time the service delays
+ // sending READY to a monitor
+ System.setProperty("ducc.service.init.delay", "3000");
+ }
@Test
public void testPullServiceWrapper() throws Exception {
@@ -52,7 +58,7 @@ public class JUnitServiceWrapperTestCase
Timer fTimer = new Timer("testPullService Timer");
// after 5secs stop the pull service
- fTimer.schedule(new MyTimerTask(service, fTimer),
35000);
+ fTimer.schedule(new MyTimerTask(service, fTimer), 5000);
service.initialize(new String[]
{analysisEngineDescriptor});
@@ -81,19 +87,19 @@ public class JUnitServiceWrapperTestCase
String tasURL = "http://localhost:8080/test";
try {
- // Force process failure
- System.setProperty("ProcessFail","true");
+ // Force process failure of the first task
+ System.setProperty("ProcessFail","1");
System.setProperty("ducc.deploy.JdURL", tasURL);
System.setProperty("ducc.deploy.JpThreadCount","4");
System.setProperty("ducc.deploy.service.type",
"NotesService");
System.setProperty("ducc.deploy.JpType", "uima");
-
+ // use default error window (1,1)
ServiceWrapper service = new ServiceWrapper();
Timer fTimer = new Timer("testPullService Timer");
// after 5secs stop the pull service
- fTimer.schedule(new MyTimerTask(service, fTimer),
35000);
+ fTimer.schedule(new MyTimerTask(service, fTimer),
10000);
service.initialize(new String[]
{analysisEngineDescriptor});
@@ -113,24 +119,23 @@ public class JUnitServiceWrapperTestCase
class MyTimerTask extends TimerTask {
final ServiceWrapper service;
final Timer fTimer;
+
MyTimerTask(ServiceWrapper service, Timer fTimer) {
this.service = service;
this.fTimer = fTimer;
}
-
- @Override
-
- public void run() {
- this.cancel();
- fTimer.purge();
- fTimer.cancel();
- System.out.println("Timmer popped - stopping
service");
- service.stop();
-
- }
-
-
-
- }
+
+ @Override
+
+ public void run() {
+ this.cancel();
+ fTimer.purge();
+ fTimer.cancel();
+ System.out.println("Timmer popped - stopping service");
+ service.stop();
+
+ }
+
+ }
}
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java?rev=1836574&r1=1836573&r2=1836574&view=diff
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java
(original)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java
Tue Jul 24 17:26:39 2018
@@ -23,6 +23,9 @@ import java.io.File;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.uima.UIMAFramework;
import org.apache.uima.UimaContext;
@@ -39,12 +42,18 @@ public class NoOpAE extends CasAnnotator
Logger logger;
static boolean initComplete = false;
String AE_Identifier = "*^^^^^^^^^ AE ";
-
+ private static AtomicLong processCount = new AtomicLong();
+ String errorSequence;
@Override
public void initialize(UimaContext uimaContext) throws
ResourceInitializationException
{
+ processCount.set(0);
super.initialize(uimaContext);
+ errorSequence = System.getProperty("ProcessFail");
+ if ( Objects.isNull(errorSequence)) {
+ errorSequence="";
+ }
long tid = Thread.currentThread().getId();
@@ -103,10 +112,18 @@ public class NoOpAE extends CasAnnotator
@Override
public void process(CAS cas) throws AnalysisEngineProcessException
{
- String data = cas.getSofaDataString();
- if ( System.getProperty("ProcessFail") != null ) {
- throw new AnalysisEngineProcessException(new
RuntimeException("Simulated Exception"));
- }
+ long val = processCount.incrementAndGet();
+ //String data = cas.getSofaDataString();
+ String[] errors = errorSequence.split(",");
+ synchronized(NoOpAE.class) {
+ for( String inx : errors) {
+ long errorSeq = Long.parseLong(inx.trim());
+ if ( errorSeq == val ) {
+ System.out.println(">>>> Error:
errorSeq:"+errorSeq+" processCount:"+val);
+ throw new AnalysisEngineProcessException(new
RuntimeException("Simulated Exception"));
+ }
+ }
+ }
}