Author: cwiklik
Date: Wed May 1 13:42:39 2019
New Revision: 1858488
URL: http://svn.apache.org/viewvc?rev=1858488&view=rev
Log:
UIMA-6026 removed synchronization from client and pullservice. Also created
XStream instance per thread and pin it via ThreadLocal
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/sd/task/DuccServiceTaskProtocolHandler.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/sd/task/transport/HttpTaskTransportHandler.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/IServiceTransport.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/XStreamUtils.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/http/HttpServiceTransport.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/Client.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/JunitPullServiceTestCase.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/service/wrapper/JUnitServiceWrapperTestCase.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/transport/JunitTransportTestCase.java
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/sd/task/DuccServiceTaskProtocolHandler.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/sd/task/DuccServiceTaskProtocolHandler.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/sd/task/DuccServiceTaskProtocolHandler.java
(original)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/sd/task/DuccServiceTaskProtocolHandler.java
Wed May 1 13:42:39 2019
@@ -1,20 +1,20 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.uima.ducc.ps.sd.task;
@@ -42,138 +42,145 @@ import org.apache.uima.util.Level;
import org.apache.uima.util.Logger;
public class DuccServiceTaskProtocolHandler implements TaskProtocolHandler {
- Logger logger =
UIMAFramework.getLogger(DuccServiceTaskProtocolHandler.class);
- private volatile boolean running = true;;
- private static AtomicInteger atomicCounter =
- new AtomicInteger(0);
-
- public DuccServiceTaskProtocolHandler(TaskAllocatorCallbackListener
taskAllocator) {
- }
-
- @Override
- public String initialize(Properties props) throws TaskProtocolException
{
- return null;
- }
-
- @Override
- public void handle(IMetaTaskTransaction wi) throws
TaskProtocolException {
- handleMetaTaskTransation(wi);
-
- }
- private void handleMetaTaskTransation(IMetaTaskTransaction trans) {
- try {
- trans.setResponseHints(new ArrayList<Hint>());
-
- TaskConsumer taskConsumer =
- new WiTaskConsumer(trans);
-
- MessageBuffer mb = new MessageBuffer();
-
mb.append(Standardize.Label.remote.get()+taskConsumer.toString());
- mb.append(Standardize.Label.type.get()+trans.getType());
- Type type = trans.getType();
- switch(type) {
- case Get:
- if ( logger.isLoggable(Level.FINE)) {
- logger.log(Level.FINE,"---- Driver
handling GET Request - Requestor:"+taskConsumer.toString());
- }
- handleMetaTaskTransationGet(trans,
taskConsumer);
- break;
- case Ack:
- if ( logger.isLoggable(Level.FINE)) {
- logger.log(Level.FINE,"---- Driver
handling ACK Request - Requestor:"+taskConsumer.toString());
- }
- handleMetaTaskTransationAck(trans,
taskConsumer);
- break;
- case End:
- if ( logger.isLoggable(Level.FINE)) {
- logger.log(Level.FINE,"---- Driver
handling END Request - Requestor:"+taskConsumer.toString());
- }
- handleMetaTaskTransationEnd(trans,
taskConsumer);
- break;
- case InvestmentReset:
- // handleMetaCasTransationInvestmentReset(trans,
rwt);
- break;
- default:
- break;
- }
- IMetaTask metaCas = trans.getMetaTask();
- if(metaCas != null) {
- metaCas.setPerformanceMetrics(null);
- metaCas.setUserSpaceException(null);
- }
- }
- catch(Exception e) {
- logger.log(Level.WARNING,"Error",e);
- }
- finally {
- }
- }
-
- @Override
- public String start() throws Exception {
- running = true;
- return null;
- }
-
- @Override
- public void stop() throws Exception {
- running = false;
- }
- private void handleMetaTaskTransationGet(IMetaTaskTransaction trans,
TaskConsumer taskConsumer) {
- IMetaMetaTask mmc = getMetaMetaTask(taskConsumer);
- trans.setMetaTask( mmc.getMetaCas());
- }
- private IMetaTask getMetaTask(String serializedCas) {
- if ( serializedCas == null ) {
- return null;
- }
- return new MetaTask(atomicCounter.incrementAndGet(), "",
serializedCas);
- }
-
- private synchronized IMetaMetaTask getMetaMetaTask(TaskConsumer
taskConsumer) {
- IMetaMetaTask mmc = new MetaMetaTask();
- ServiceDriver sd = DuccServiceDriver.getInstance();
- TaskAllocatorCallbackListener taskAllocator =
- sd.getTaskAllocator();
- ITask task;
-
- task = taskAllocator.getTask(taskConsumer);
- if ( task != null && !task.isEmpty() ) {
- IMetaTask metaTask = getMetaTask(task.asString());
- mmc.setMetaCas(metaTask);
- if ( logger.isLoggable(Level.FINE)) {
- logger.log(Level.FINE,"Returning TASK with
appdata:"+task.getMetadata()+" to the service");
- }
- mmc.getMetaCas().setAppData(task.getMetadata());
- }
- return mmc;
- }
- private void handleMetaTaskTransationAck(IMetaTaskTransaction trans,
TaskConsumer taskConsumer) {
-
- }
-
- private void handleMetaTaskTransationEnd(IMetaTaskTransaction trans,
TaskConsumer taskConsumer) {
- ServiceDriver sd = DuccServiceDriver.getInstance();
- TaskAllocatorCallbackListener taskAllocator =
- sd.getTaskAllocator();
- if ( trans.getMetaTask().getUserSpaceException() != null ) {
- // The service returns a stringified stacktrace ... not an
Exception
- String exceptionAsString = (String)
trans.getMetaTask().getUserSpaceException();
- taskAllocator.onTaskFailure( taskConsumer,
trans.getMetaTask().getAppData(), exceptionAsString );
-
- } else {
- String m =
-
trans.getMetaTask().getPerformanceMetrics();
- if ( logger.isLoggable(Level.FINE)) {
-
logger.log(Level.FINE,"handleMetaTaskTransationEnd()...........
appdata:"+trans.getMetaTask().getAppData());
- }
- taskAllocator.onTaskSuccess(
taskConsumer,trans.getMetaTask().getAppData(), m );
- }
- }
+ Logger logger =
UIMAFramework.getLogger(DuccServiceTaskProtocolHandler.class);
- public static void main(String[] args) {
- // TODO Auto-generated method stub
+ private volatile boolean running = true;
- }
+ private static AtomicInteger atomicCounter = new AtomicInteger(0);
+
+ private static AtomicInteger nHandles = new AtomicInteger(0);
+
+ public DuccServiceTaskProtocolHandler(TaskAllocatorCallbackListener
taskAllocator) {
+ }
+
+ @Override
+ public String initialize(Properties props) throws TaskProtocolException {
+ return null;
+ }
+
+ @Override
+ public void handle(IMetaTaskTransaction wi) throws TaskProtocolException {
+ handleMetaTaskTransation(wi);
+ }
+
+ private void handleMetaTaskTransation(IMetaTaskTransaction trans) {
+ try {
+ trans.setResponseHints(new ArrayList<Hint>());
+
+ TaskConsumer taskConsumer = new WiTaskConsumer(trans);
+
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.remote.get() + taskConsumer.toString());
+ mb.append(Standardize.Label.type.get() + trans.getType());
+ Type type = trans.getType();
+ switch (type) {
+ case Get:
+ if (logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE,
+ "---- Driver handling GET Request - Requestor:" +
taskConsumer.toString());
+ }
+ handleMetaTaskTransationGet(trans, taskConsumer);
+ break;
+ case Ack:
+ if (logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE,
+ "---- Driver handling ACK Request - Requestor:" +
taskConsumer.toString());
+ }
+ handleMetaTaskTransationAck(trans, taskConsumer);
+ break;
+ case End:
+ if (logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE,
+ "---- Driver handling END Request - Requestor:" +
taskConsumer.toString());
+ }
+ handleMetaTaskTransationEnd(trans, taskConsumer);
+ break;
+ case InvestmentReset:
+ // handleMetaCasTransationInvestmentReset(trans, rwt);
+ break;
+ default:
+ break;
+ }
+ IMetaTask metaCas = trans.getMetaTask();
+ if (metaCas != null) {
+ metaCas.setPerformanceMetrics(null);
+ metaCas.setUserSpaceException(null);
+ }
+ } catch (Exception e) {
+ logger.log(Level.WARNING, "Error", e);
+ } finally {
+ }
+ }
+
+ @Override
+ public String start() throws Exception {
+ running = true;
+ return null;
+ }
+
+ @Override
+ public void stop() throws Exception {
+ running = false;
+ }
+
+ private void handleMetaTaskTransationGet(IMetaTaskTransaction trans,
TaskConsumer taskConsumer) {
+ IMetaMetaTask mmc = getMetaMetaTask(taskConsumer);
+ trans.setMetaTask(mmc.getMetaCas());
+ }
+
+ private IMetaTask getMetaTask(String serializedCas) {
+ if (serializedCas == null) {
+ return null;
+ }
+ return new MetaTask(atomicCounter.incrementAndGet(), "", serializedCas);
+ }
+
+ private IMetaMetaTask getMetaMetaTask(TaskConsumer taskConsumer) {
+
+ IMetaMetaTask mmc = new MetaMetaTask();
+ ServiceDriver sd = DuccServiceDriver.getInstance();
+ TaskAllocatorCallbackListener taskAllocator = sd.getTaskAllocator();
+ ITask task;
+
+ task = taskAllocator.getTask(taskConsumer);
+ if (task != null && !task.isEmpty()) {
+ IMetaTask metaTask = getMetaTask(task.asString());
+ mmc.setMetaCas(metaTask);
+ if (logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE,
+ "Returning TASK with appdata:" + task.getMetadata() + " to the
service");
+ }
+ mmc.getMetaCas().setAppData(task.getMetadata());
+ }
+ return mmc;
+ }
+
+ private void handleMetaTaskTransationAck(IMetaTaskTransaction trans,
TaskConsumer taskConsumer) {
+
+ }
+
+ private void handleMetaTaskTransationEnd(IMetaTaskTransaction trans,
TaskConsumer taskConsumer) {
+ ServiceDriver sd = DuccServiceDriver.getInstance();
+ TaskAllocatorCallbackListener taskAllocator = sd.getTaskAllocator();
+ if (trans.getMetaTask().getUserSpaceException() != null) {
+ // The service returns a stringified stacktrace ... not an Exception
+ String exceptionAsString = (String)
trans.getMetaTask().getUserSpaceException();
+ taskAllocator.onTaskFailure(taskConsumer,
trans.getMetaTask().getAppData(),
+ exceptionAsString);
+
+ } else {
+ String m = trans.getMetaTask().getPerformanceMetrics();
+ if (logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE, "handleMetaTaskTransationEnd()...........
appdata:"
+ + trans.getMetaTask().getAppData());
+ }
+ taskAllocator.onTaskSuccess(taskConsumer,
trans.getMetaTask().getAppData(), m);
+ }
+ }
+
+ public static void main(String[] args) {
+ // TODO Auto-generated method stub
+
+ }
}
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/sd/task/transport/HttpTaskTransportHandler.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/sd/task/transport/HttpTaskTransportHandler.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/sd/task/transport/HttpTaskTransportHandler.java
(original)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/sd/task/transport/HttpTaskTransportHandler.java
Wed May 1 13:42:39 2019
@@ -22,6 +22,7 @@ package org.apache.uima.ducc.ps.sd.task.
import java.io.BufferedReader;
import java.io.IOException;
import java.net.ServerSocket;
+import java.util.HashMap;
import java.util.Properties;
import javax.servlet.ServletException;
@@ -47,218 +48,231 @@ import org.eclipse.jetty.servlet.Servlet
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import com.thoughtworks.xstream.XStream;
+
public class HttpTaskTransportHandler implements TaskTransportHandler {
- Logger logger = UIMAFramework.getLogger(HttpTaskTransportHandler.class);
- // Jetty
- private Server server = null;
- // Delegate to handle incoming messages
- private TaskProtocolHandler taskProtocolHandler = null;
- private volatile boolean running = false;
- // mux is used to synchronize start()
- private Object mux = new Object();
-
- public HttpTaskTransportHandler() {
- }
-
- public void setTaskProtocolHandler(TaskProtocolHandler
taskProtocolHandler) {
- this.taskProtocolHandler = taskProtocolHandler;
- }
-
- public String start() throws Exception {
- synchronized( mux ) {
- if ( !running ) {
- if ( taskProtocolHandler == null ) {
- throw new
TaskProtocolException("start() called before initialize() - task protocol
handler not started");
- }
- if ( server == null ) {
- throw new
TaskProtocolException("start() called before initialize() - Jetty not started
yet");
- }
-
- server.start();
- logger.log(Level.INFO, "Jetty Started - Waiting
for Messages ...");
- running = true;
- }
- }
- return "";
- }
-
- public void stop() throws Exception {
- synchronized( mux ) {
- if ( server != null && server.isRunning() ) {
- server.stop();
- }
- }
- }
+ Logger logger = UIMAFramework.getLogger(HttpTaskTransportHandler.class);
+
+ // Jetty
+ private Server server = null;
+
+ // Delegate to handle incoming messages
+ private TaskProtocolHandler taskProtocolHandler = null;
+
+ private volatile boolean running = false;
+
+ // mux is used to synchronize start()
+ private Object mux = new Object();
+
+ // Create ThreadLocal Map containing instances of XStream for each thread
+ private ThreadLocal<HashMap<Long, XStream>> threadLocalXStream = new
ThreadLocal<HashMap<Long, XStream>>() {
+ @Override
+ protected HashMap<Long, XStream> initialValue() {
+ return new HashMap<Long, XStream>();
+ }
+ };
+
+ public HttpTaskTransportHandler() {
+ }
- public Server createServer(int httpPort, int maxThreads, String app,
TaskProtocolHandler handler)
+ public void setTaskProtocolHandler(TaskProtocolHandler taskProtocolHandler) {
+ this.taskProtocolHandler = taskProtocolHandler;
+ }
+
+ public String start() throws Exception {
+ synchronized (mux) {
+ if (!running) {
+ if (taskProtocolHandler == null) {
+ throw new TaskProtocolException(
+ "start() called before initialize() - task protocol handler
not started");
+ }
+ if (server == null) {
+ throw new TaskProtocolException(
+ "start() called before initialize() - Jetty not started
yet");
+ }
+
+ server.start();
+ logger.log(Level.INFO, "Jetty Started - Waiting for Messages ...");
+ running = true;
+ }
+ }
+ return "";
+ }
+
+ public void stop() throws Exception {
+ synchronized (mux) {
+ if (server != null && server.isRunning()) {
+ server.stop();
+ }
+ }
+ }
+
+ public Server createServer(int httpPort, int maxThreads, String app,
TaskProtocolHandler handler)
throws Exception {
- // Server thread pool
- QueuedThreadPool threadPool = new QueuedThreadPool();
- if (maxThreads < threadPool.getMinThreads()) {
- // logger.warn("JobDriver", jobid,
- // "Invalid value for jetty MaxThreads("+maxThreads+")
- it should be greater or equal to "+threadPool.getMinThreads()+". Defaulting
to jettyMaxThreads="+threadPool.getMaxThreads());
- threadPool.setMaxThreads(threadPool.getMinThreads());
- } else {
- threadPool.setMaxThreads(maxThreads);
- }
-
- Server server = new Server(threadPool);
-
- // Server connector
- ServerConnector connector = new ServerConnector(server);
- connector.setPort(httpPort);
- server.setConnectors(new Connector[] { connector });
-
- ServletContextHandler context = new ServletContextHandler(
- ServletContextHandler.SESSIONS);
- context.setContextPath("/");
- server.setHandler(context);
-
- context.addServlet(new ServletHolder(new
TaskHandlerServlet(handler)), "/"+app);
-
- return server;
- }
-
- public int findFreePort() {
- ServerSocket socket = null;
- int port = 0;
- try {
- // by passing 0 as an arg, let ServerSocket choose an arbitrary
- // port that is available.
- socket = new ServerSocket(0);
- port = socket.getLocalPort();
- } catch (IOException e) {
- } finally {
- try {
- // Clean up
- if (socket != null) {
- socket.close();
- }
- } catch( Exception ex) {
- ex.printStackTrace();
- }
- }
- return port;
- }
- @Override
- public String initialize(Properties properties) throws
TaskTransportException {
- // Max cores
+ // Server thread pool
+ QueuedThreadPool threadPool = new QueuedThreadPool();
+ if (maxThreads < threadPool.getMinThreads()) {
+ // logger.warn("JobDriver", jobid,
+ // "Invalid value for jetty MaxThreads("+maxThreads+") - it should be
greater or equal to
+ // "+threadPool.getMinThreads()+". Defaulting to
+ // jettyMaxThreads="+threadPool.getMaxThreads());
+ threadPool.setMaxThreads(threadPool.getMinThreads());
+ } else {
+ threadPool.setMaxThreads(maxThreads);
+ }
+
+ Server server = new Server(threadPool);
+
+ // Server connector
+ ServerConnector connector = new ServerConnector(server);
+ connector.setPort(httpPort);
+ server.setConnectors(new Connector[] { connector });
+
+ ServletContextHandler context = new
ServletContextHandler(ServletContextHandler.SESSIONS);
+ context.setContextPath("/");
+ server.setHandler(context);
+
+ context.addServlet(new ServletHolder(new TaskHandlerServlet(handler)), "/"
+ app);
+
+ return server;
+ }
+
+ public int findFreePort() {
+ ServerSocket socket = null;
+ int port = 0;
+ try {
+ // by passing 0 as an arg, let ServerSocket choose an arbitrary
+ // port that is available.
+ socket = new ServerSocket(0);
+ port = socket.getLocalPort();
+ } catch (IOException e) {
+ } finally {
+ try {
+ // Clean up
+ if (socket != null) {
+ socket.close();
+ }
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+ return port;
+ }
+
+ @Override
+ public String initialize(Properties properties) throws
TaskTransportException {
+ // Max cores
int cores = Runtime.getRuntime().availableProcessors();
String maxThreadsString = (String)
properties.get(ServiceDriver.MaxThreads);
String appName = (String) properties.get(ServiceDriver.Application);
- int maxThreads = cores;
- int httpPort = 0;
- if (maxThreadsString != null) {
- try {
- maxThreads =
Integer.parseInt(maxThreadsString.trim());
- } catch (NumberFormatException e) {
- logger.log(Level.WARNING,"Error",e);
- }
- }
- if (cores > maxThreads) {
- maxThreads = cores;
- }
-
- String portString = (String) properties.get(ServiceDriver.Port);
- if (portString != null) {
- try {
- httpPort = Integer.parseInt(portString.trim());
- } catch (NumberFormatException e) {
- logger.log(Level.WARNING,"Error",e);
- throw new TaskTransportException("Unable to
start Server using provided port:"+httpPort);
- }
- }
- if (httpPort == 0) { // Use any free port if none or 0 specified
+ int maxThreads = cores;
+ int httpPort = 0;
+ if (maxThreadsString != null) {
+ try {
+ maxThreads = Integer.parseInt(maxThreadsString.trim());
+ } catch (NumberFormatException e) {
+ logger.log(Level.WARNING, "Error", e);
+ }
+ }
+ if (cores > maxThreads) {
+ maxThreads = cores;
+ }
+
+ String portString = (String) properties.get(ServiceDriver.Port);
+ if (portString != null) {
+ try {
+ httpPort = Integer.parseInt(portString.trim());
+ } catch (NumberFormatException e) {
+ logger.log(Level.WARNING, "Error", e);
+ throw new TaskTransportException("Unable to start Server using
provided port:" + httpPort);
+ }
+ }
+ if (httpPort == 0) { // Use any free port if none or 0 specified
httpPort = findFreePort();
}
- if (appName == null) {
- appName = "test";
- logger.log(Level.WARNING, "The "+ServiceDriver.Application+"
property is not specified - using "+appName);
- }
- try {
- // create and initialize Jetty Server
- server = createServer(httpPort, maxThreads, appName,
taskProtocolHandler);
- } catch (Exception e) {
- throw new TaskTransportException(e);
- }
-
- // Establish the URL we could register for our customers
+ if (appName == null) {
+ appName = "test";
+ logger.log(Level.WARNING,
+ "The " + ServiceDriver.Application + " property is not specified
- using " + appName);
+ }
+ try {
+ // create and initialize Jetty Server
+ server = createServer(httpPort, maxThreads, appName,
taskProtocolHandler);
+ } catch (Exception e) {
+ throw new TaskTransportException(e);
+ }
+
+ // Establish the URL we could register for our customers
String taskUrl = server.getURI().toString();
if (taskUrl.endsWith("/")) {
taskUrl = taskUrl.substring(0, taskUrl.length() - 1);
}
taskUrl += ":" + httpPort + "/" + appName;
- logger.log(Level.INFO, "Service Driver URL: " + taskUrl);
-
+ logger.log(Level.INFO, "Service Driver URL: " + taskUrl);
+
return taskUrl;
- }
+ }
+
+ public class TaskHandlerServlet extends HttpServlet {
+ private static final long serialVersionUID = 1L;
+
+ TaskProtocolHandler taskProtocolHandler = null;
+ public TaskHandlerServlet(TaskProtocolHandler handler) {
+ this.taskProtocolHandler = handler;
+ }
+
+ protected void doPost(HttpServletRequest request, HttpServletResponse
response)
+ throws ServletException, IOException {
+ try {
+ StringBuilder sb = new StringBuilder();
+ BufferedReader reader = request.getReader();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ sb.append(line);
+ }
+ String content = sb.toString().trim();
+ // check ThreadLocal for a Map entry for this thread id. If not found,
create
+ // dedicated XStream instance for this thread which will be useed to
serialize/deserialize
+ // this thread's tasks
+
+ if (threadLocalXStream.get().get(Thread.currentThread().getId()) ==
null) {
+ threadLocalXStream.get().put(Thread.currentThread().getId(),
+ XStreamUtils.getXStreamInstance());
+ }
+
+ IMetaTaskTransaction imt = null;
+
+ // imt = (IMetaTaskTransaction) XStreamUtils.unmarshall(content);
+ // Use dedicated instance of XStream to deserialize request
+ imt = (IMetaTaskTransaction)
threadLocalXStream.get().get(Thread.currentThread().getId())
+ .fromXML(content);
+
+ // process service request
+ taskProtocolHandler.handle(imt);
+
+ // setup reply
+ imt.setDirection(Direction.Response);
+
+ response.setStatus(HttpServletResponse.SC_OK);
+
+ response.setHeader("content-type", "text/xml");
+ // String body = XStreamUtils.marshall(imt);
+ // Use dedicated instance of XStream to serialize reply
+ String body =
threadLocalXStream.get().get(Thread.currentThread().getId()).toXML(imt);
+ response.getWriter().write(body);
+
+ } catch (Throwable e) {
+ throw new ServletException(e);
+ }
+ }
+ }
- public class TaskHandlerServlet extends HttpServlet {
- private static final long serialVersionUID = 1L;
- TaskProtocolHandler taskProtocolHandler = null;
-
- public TaskHandlerServlet(TaskProtocolHandler handler) {
- this.taskProtocolHandler = handler;
- }
-
- protected void doPost(HttpServletRequest request,
- HttpServletResponse response) throws
ServletException,
- IOException {
- try {
- //long post_stime = System.nanoTime();
- StringBuilder sb = new StringBuilder();
- BufferedReader reader = request.getReader();
- String line;
- while ((line = reader.readLine()) != null) {
- sb.append(line);
- }
- // char[] content = new
char[request.getContentLength()];
- String content = sb.toString().trim();
-
- // char[] content = new
char[request.getContentLength()];
-
- // request.getReader().read(content);
- // logger.debug("doPost",jobid,
- // "Http Request
Body:::"+String.valueOf(content));
-
- IMetaTaskTransaction imt = null;
- // String t = String.valueOf(content);
-
- // imt = (IMetaCasTransaction) XStreamUtils
- // .unmarshall(t.trim());
- imt = (IMetaTaskTransaction)
XStreamUtils.unmarshall(content);
- //MessageHandler.accumulateTimes("Unmarshall",
post_stime);
-
- // process service request
- taskProtocolHandler.handle(imt);
-
- //long marshall_stime = System.nanoTime();
- // setup reply
- imt.setDirection(Direction.Response);
-
- response.setStatus(HttpServletResponse.SC_OK);
-
- response.setHeader("content-type", "text/xml");
- String body = XStreamUtils.marshall(imt);
-
- response.getWriter().write(body);
-
- // When debugging accumulate times taken by
each stage of the
- // message processing
- // MessageHandler.accumulateTimes("Marshall",
marshall_stime);
- // MessageHandler.accumulateTimes("Post",
post_stime);
- } catch (Throwable e) {
- throw new ServletException(e);
- }
- }
-
- }
- public static void main(String[] args) {
- // TODO Auto-generated method stub
+ public static void main(String[] args) {
+ // TODO Auto-generated method stub
- }
+ }
}
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
(original)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
Wed May 1 13:42:39 2019
@@ -1,27 +1,26 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.uima.ducc.ps.service.protocol.builtin;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
-import java.io.PrintWriter;
-import java.io.StringWriter;
+import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@@ -48,430 +47,502 @@ import org.apache.uima.ducc.ps.service.u
import org.apache.uima.util.Level;
import org.apache.uima.util.Logger;
+import com.thoughtworks.xstream.XStream;
+
/**
- *
+ *
* This protocol handler is a Runnable
- *
+ *
*/
public class DefaultServiceProtocolHandler implements IServiceProtocolHandler {
- Logger logger =
UIMAFramework.getLogger(DefaultServiceProtocolHandler.class);
- private volatile boolean initError = false;
- private volatile boolean running = false;
- private volatile boolean quiescing = false;
- private IServiceTransport transport;
- private IServiceProcessor processor;
- private INoTaskAvailableStrategy noTaskStrategy;
- // each process thread will count down the latch after intialization
- private CountDownLatch initLatch;
- // this PH will count the stopLatch down when it is about to stop. The
service
- // is the owner of this latch and awaits termination blocking in start()
- private CountDownLatch stopLatch;
- // each process thread block on startLatch until application calls
start()
- private CountDownLatch startLatch;
- // reference to a service so that stop() can be called
- private IService service;
- // forces process threads to initialize serially
- private static ReentrantLock initLock = new ReentrantLock();
-
- private static AtomicInteger idGenerator = new AtomicInteger();
-
- private Thread retryThread = null;
-
- private DefaultServiceProtocolHandler(Builder builder) {
- this.initLatch = builder.initLatch;
- this.stopLatch = builder.stopLatch;
- this.service = builder.service;
- this.transport = builder.transport;
- this.processor = builder.processor;
- this.noTaskStrategy = builder.strategy;
- }
-
- private void waitForAllThreadsToInitialize() {
- try {
- initLatch.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
-
- }
-
- private void initialize() throws ServiceInitializationException {
- // this latch blocks all process threads after initialization
- // until application calls start()
- startLatch = new CountDownLatch(1);
- try {
- // use a lock to serialize initialization one thread at
a time
- initLock.lock();
- if (initError) {
- return;
- }
- processor.initialize();
- } catch (Throwable e) {
- initError = true;
- running = false;
- e.printStackTrace();
- logger.log(Level.WARNING, "ProtocolHandler initialize()
failed -",e);
- throw new ServiceInitializationException(
- "Thread:" +
Thread.currentThread().getName() + " Failed initialization - "+ e);
- } finally {
-
- initLatch.countDown();
- initLock.unlock();
- if (!initError) {
- // wait on startLatch
- waitForAllThreadsToInitialize();
- }
- }
- }
- public boolean initialized() {
- return ( initError==false );
- }
- private IMetaTaskTransaction sendAndReceive(IMetaTaskTransaction
transaction) throws Exception {
- TransactionId tid;
- if (Type.Get.equals(transaction.getType())) {
- int major = idGenerator.addAndGet(1);
- int minor = 0;
-
- tid = new TransactionId(major, minor);
- } else {
- tid = transaction.getTransactionId();
- // increment minor
- tid.next();
- }
-
- transaction.setRequesterProcessName(service.getType());
- transport.addRequestorInfo(transaction);
- IMetaTaskTransaction reply = null;
- try {
- // XStream is thread safe so multiple threads can
serialize concurrently
- String body = XStreamUtils.marshall(transaction);
- // dispatch implements waiting if no task is given by
the driver
- reply = transport.dispatch(body);
-
- if ( Objects.isNull(reply) ) {
- throw new TransportException("Received invalid
content (null) in response from client - rejecting request");
- }
-
- } catch ( Exception e) {
- if ( !running ) {
- throw new TransportException("Service stopping
- rejecting request");
- }
- throw e;
- }
- return reply;
- }
-
- private IMetaTaskTransaction callEnd(IMetaTaskTransaction transaction)
throws Exception {
- transaction.setType(Type.End);
- if ( logger.isLoggable(Level.FINE)) {
- logger.log(Level.FINE, "ProtocolHandler calling END");
- }
- return sendAndReceive(transaction);
-
- }
-
- private IMetaTaskTransaction callAck(IMetaTaskTransaction transaction)
throws Exception {
- transaction.setType(Type.Ack);
- if ( logger.isLoggable(Level.FINE)) {
- logger.log(Level.FINE, "ProtocolHandler calling ACK");
- }
- return sendAndReceive(transaction);
- }
- /**
- * Fetch new task from a remote driver. This method is synchronized to
prevent overrunning the
- * driver when a service scales up (many threads) and out (many
instances). Only one thread
- * at a time is allowed to pull tasks per service instance.
- *
- * When the driver is out of tasks, a single thread first sleeps for
awhile and than tries
- * again until a task is returned.
- *
- * @param transaction
- * @return
- * @throws Exception
- */
- private synchronized IMetaTaskTransaction callGet(IMetaTaskTransaction
transaction) throws Exception {
- transaction.setType(Type.Get);
- if ( logger.isLoggable(Level.FINE)) {
- logger.log(Level.FINE, "ProtocolHandler calling GET");
- }
- IMetaTaskTransaction metaTransaction=null;
- boolean logOutOfTasks = true;
- while(running) {
- metaTransaction = sendAndReceive(transaction);
- // check if driver is out of tasks
- if ( metaTransaction.getMetaTask() == null ||
metaTransaction.getMetaTask().getUserSpaceTask() == null) {
- if ( logOutOfTasks ) {
- if ( logger.isLoggable(Level.FINE)) {
- logger.log(Level.FINE,"Process
Thread:"+Thread.currentThread().getId()+" - Driver is out of tasks - waiting
for awhile ("+noTaskStrategy.getWaitTimeInMillis()+" ms) and will try again ");
- }
- logOutOfTasks = false;
- }
- noTaskStrategy.handleNoTaskSupplied();
- } else {
- // Got a task
- break;
- }
- }
- return metaTransaction;
- }
- /**
- * Block until service start() is called
- *
- * @throws ServiceInitializationException
- */
- private void awaitStart() throws ServiceInitializationException {
- try {
- startLatch.await();
- } catch(InterruptedException e ) {
- Thread.currentThread().interrupt();
- throw new ServiceInitializationException("Thread
interrupted while awaiting start()");
- }
- }
-
-
- public String call() throws ServiceInitializationException,
ServiceException {
- // we may fail in initialize() in which case the
ServiceInitializationException
- // is thrown
- initialize();
-
- // now wait for application to call start
- awaitStart();
-
- // all threads intialized, enter running state
-
- IMetaTaskTransaction transaction = null;
-
- if ( logger.isLoggable(Level.INFO)) {
- logger.log(Level.INFO, ".............. Thread
"+Thread.currentThread().getId() + " ready to process");
- }
-
-
- while (running) {
-
- try {
- // send GET Request
- transaction = callGet(new
MetaTaskTransaction());
- // the code may have blocked in callGet for
awhile, so check
- // if service is still running. If this service
is in quiescing
- // mode, finish processing current task. The
while-loop will
- // terminate when the task is finished.
- if ( !running && !quiescing ) {
- break;
- }
- // transaction may be null if
retryUntilSuccessfull was interrupted
- // due to stop
- if (Objects.isNull(transaction) || (!running
&& !quiescing)) {
- break;
- }
- logger.log(Level.FINE, ".............. Thread
"+Thread.currentThread().getId() + " processing new task");
- if ( Objects.isNull(transaction.getMetaTask())
) {
- // this should only be the case when
the service is stopping and transport is shutdown
- if ( running ) {
- logger.log(Level.INFO, "..............
Thread "+Thread.currentThread().getId() + " GET returned null MetaTask while
service is in a running state - this is unexpected");
- }
- // if !running, the while loop above will
terminate
- continue;
- }
-
- Object task =
transaction.getMetaTask().getUserSpaceTask();
-
- // send ACK
- transaction = callAck(transaction);
- if (!running && !quiescing ) {
- break;
- }
- IProcessResult processResult =
processor.process((String) task);
-
- // assume success
- Action action = Action.CONTINUE;
- // check if process error occurred.
- String errorAsString = processResult.getError();
-
- if (processResult.terminateProcess()) {
- action = Action.TERMINATE;
- } else if ( Objects.isNull(errorAsString)){
- // success
-
transaction.getMetaTask().setPerformanceMetrics(processResult.getResult());
- }
- if ( Objects.nonNull(errorAsString ) ) {
- IMetaTask mc =
transaction.getMetaTask();
- // the ducc.deploy.JpType is only
present for jobs. If not specified
- // we return stringified exception to
the client. The JD expects
- // Java Exception object for its error
handling
- if (
Objects.isNull(System.getProperty("ducc.deploy.JpType")) ) {
-
-
mc.setUserSpaceException(errorAsString);
- } else {
- logger.log(Level.INFO, "Sending
Exception to JD:\n" +
-
((Exception)processResult.getExceptionObject()));
- // JD expects serialized
exception as byte[]
-
mc.setUserSpaceException(serializeError(processResult.getExceptionObject()));
- }
-
- }
-
- // send END Request
- callEnd(transaction);
- if (running && Action.TERMINATE.equals(action))
{
- logger.log(Level.WARNING, "Processor
Failure - Action=Terminate");
- // Can't stop using the current thread.
This thread
- // came from a thread pool we want to
stop. Need
- // a new/independent thread to call
stop()
- new Thread(new Runnable() {
-
- @Override
- public void run() {
- delegateStop();
- }
- }).start();
- running = false;
- }
-
-
-
- } catch( IllegalStateException e) {
- break;
- } catch( TransportException e) {
- break;
- }
- catch (Exception e) {
-
- logger.log(Level.WARNING,"",e);
- }
- }
- stopLatch.countDown();
- System.out.println(Utils.getTimestamp()+">>>>>>>
"+Utils.getShortClassname(this.getClass())+".call() >>>>>>>>>> Thread
["+Thread.currentThread().getId()+"] "+ " ProtocolHandler stopped requesting
new tasks - Stopping processor");
- logger.log(Level.INFO,"ProtocolHandler stopped requesting new
tasks - Stopping processor");
-
- if ( processor != null ) {
- processor.stop();
- }
- return String.valueOf(Thread.currentThread().getId());
- }
-
- private byte[] serializeError(Throwable t) throws Exception {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
-
- try {
- oos.writeObject(t);
- } catch (Exception e) {
- try {
- logger.log(Level.WARNING, "Unable to Serialize
"+t.getClass().getName()+" - Will Stringify It Instead");
-
- } catch( Exception ee) {}
- throw e;
- } finally {
- oos.close();
- }
-
- return baos.toByteArray();
- }
- private void delegateStop() {
- service.quiesceAndStop();
- }
- @Override
- public void stop() {
- quiescing = false;
- running = false;
- try {
- // use try catch to handle a possible race condition
- // when retryThread is not null, but it becomes null
- // before we call interrupt causing NPE. All this would
- // mean is that retryUntilSuccess() succeeded.
- if ( retryThread != null ) {
- retryThread.interrupt();
- }
- } catch( Exception ee) {
- } //noTaskStrategy.interrupt();
- if ( logger.isLoggable(Level.INFO)) {
- logger.log(Level.INFO, this.getClass().getName()+"
stop() called");
- }
- }
- @Override
- public void quiesceAndStop() {
-
- // Use System.out since the logger's ShutdownHook may have
closed streams
- System.out.println(Utils.getTimestamp()+">>>>>>>
"+Utils.getShortClassname(this.getClass())+".queisceAndStop()");
- logger.log(Level.INFO, this.getClass().getName()+"
quiesceAndStop() called");
- // change state of transport to not running but keep connection
open
- // so that other threads can quiesce (send results)
- transport.stop(true);
-
- quiescing = true;
- running = false;
- try {
- // use try catch to handle a possible race condition
- // when retryThread is not null, but it becomes null
- // before we call interrupt causing NPE. All this would
- // mean is that retryUntilSuccess() succeeded.
- if ( retryThread != null ) {
- retryThread.interrupt();
- }
- } catch( Exception ee) {
- }
- try {
- // wait for process threads to terminate
- stopLatch.await();
- } catch( Exception e ) {
-
- }
- // Use System.out since the logger's ShutdownHook may have
closed streams
- System.out.println(Utils.getTimestamp()+">>>>>>>
"+Utils.getShortClassname(this.getClass())+".queisceAndStop() All process
threads completed quiesce");
- logger.log(Level.INFO, this.getClass().getName()+" All process
threads completed quiesce");
- }
- @Override
- public void start() {
- running = true;
- // process threads are initialized and are awaiting latch
countdown
- startLatch.countDown();
- }
- @Override
- public void setServiceProcessor(IServiceProcessor processor) {
- this.processor = processor;
- }
-
- @Override
- public void setTransport(IServiceTransport transport) {
- this.transport = transport;
- }
-
-
- public static class Builder {
- private IServiceTransport transport;
- private IServiceProcessor processor;
- private INoTaskAvailableStrategy strategy;
- // each thread will count down the latch
- private CountDownLatch initLatch;
- private CountDownLatch stopLatch;
- private IService service;
-
- public Builder withTransport(IServiceTransport
transport) {
- this.transport = transport;
- return this;
- }
- public Builder withProcessor(IServiceProcessor
processor) {
- this.processor = processor;
- return this;
- }
- public Builder withInitCompleteLatch(CountDownLatch
initLatch) {
- this.initLatch = initLatch;
- return this;
- }
- public Builder withDoneLatch(CountDownLatch stopLatch) {
- this.stopLatch = stopLatch;
- return this;
- }
- public Builder
withNoTaskStrategy(INoTaskAvailableStrategy strategy) {
- this.strategy = strategy;
- return this;
- }
- public Builder withService(IService service) {
- this.service = service;
- return this;
- }
- public DefaultServiceProtocolHandler build() {
- return new DefaultServiceProtocolHandler(this);
- }
- }
+ Logger logger = UIMAFramework.getLogger(DefaultServiceProtocolHandler.class);
+
+ private volatile boolean initError = false;
+
+ private volatile boolean running = false;
+
+ private volatile boolean quiescing = false;
+
+ private IServiceTransport transport;
+
+ private IServiceProcessor processor;
+
+ private INoTaskAvailableStrategy noTaskStrategy;
+
+ // each process thread will count down the latch after intialization
+ private CountDownLatch initLatch;
+
+ // this PH will count the stopLatch down when it is about to stop. The
service
+ // is the owner of this latch and awaits termination blocking in start()
+ private CountDownLatch stopLatch;
+
+ // each process thread block on startLatch until application calls start()
+ private CountDownLatch startLatch;
+
+ // reference to a service so that stop() can be called
+ private IService service;
+
+ // forces process threads to initialize serially
+ private static ReentrantLock initLock = new ReentrantLock();
+
+ private ReentrantLock noWorkLock = new ReentrantLock();
+
+ private static AtomicInteger idGenerator = new AtomicInteger();
+
+ private Thread retryThread = null;
+
+ // Create ThreadLocal Map containing instances of XStream for each thread
+ private ThreadLocal<HashMap<Long, XStream>> threadLocalXStream = new
ThreadLocal<HashMap<Long, XStream>>() {
+ @Override
+ protected HashMap<Long, XStream> initialValue() {
+ return new HashMap<>();
+ }
+ };
+
+ private DefaultServiceProtocolHandler(Builder builder) {
+ this.initLatch = builder.initLatch;
+ this.stopLatch = builder.stopLatch;
+ this.service = builder.service;
+ this.transport = builder.transport;
+ this.processor = builder.processor;
+ this.noTaskStrategy = builder.strategy;
+ }
+
+ private void waitForAllThreadsToInitialize() {
+ try {
+ initLatch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ }
+
+ private void initialize() throws ServiceInitializationException {
+ // this latch blocks all process threads after initialization
+ // until application calls start()
+ startLatch = new CountDownLatch(1);
+ try {
+ // use a lock to serialize initialization one thread at a time
+ initLock.lock();
+ if (initError) {
+ return;
+ }
+ processor.initialize();
+ } catch (Throwable e) {
+ initError = true;
+ running = false;
+ logger.log(Level.WARNING, "ProtocolHandler initialize() failed -", e);
+ throw new ServiceInitializationException(
+ "Thread:" + Thread.currentThread().getName() + " Failed
initialization - " + e);
+ } finally {
+
+ initLatch.countDown();
+ initLock.unlock();
+ if (!initError) {
+ // wait on startLatch
+ waitForAllThreadsToInitialize();
+ }
+ }
+ }
+
+ public boolean initialized() {
+ return (initError == false);
+ }
+
+ private IMetaTaskTransaction sendAndReceive(IMetaTaskTransaction
transaction) throws Exception {
+ TransactionId tid;
+ if (Type.Get.equals(transaction.getType())) {
+ int major = idGenerator.addAndGet(1);
+ int minor = 0;
+
+ tid = new TransactionId(major, minor);
+ } else {
+ tid = transaction.getTransactionId();
+ // increment minor
+ tid.next();
+ }
+
+ transaction.setRequesterProcessName(service.getType());
+ transport.addRequestorInfo(transaction);
+ IMetaTaskTransaction reply = null;
+ try {
+ // XStream is thread safe so multiple threads can serialize concurrently
+ // String body = XStreamUtils.marshall(transaction);
+ String body =
threadLocalXStream.get().get(Thread.currentThread().getId()).toXML(transaction);
+ // dispatch implements waiting if no task is given by the driver
+ reply = transport.dispatch(body, threadLocalXStream);
+
+ if (Objects.isNull(reply)) {
+ throw new TransportException(
+ "Received invalid content (null) in response from client -
rejecting request");
+ }
+
+ } catch (Exception e) {
+ if (!running) {
+ throw new TransportException("Service stopping - rejecting request");
+ }
+ throw e;
+ }
+ return reply;
+ }
+
+ private IMetaTaskTransaction callEnd(IMetaTaskTransaction transaction)
throws Exception {
+ transaction.setType(Type.End);
+ if (logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE, "ProtocolHandler calling END");
+ }
+ return sendAndReceive(transaction);
+
+ }
+
+ private IMetaTaskTransaction callAck(IMetaTaskTransaction transaction)
throws Exception {
+ transaction.setType(Type.Ack);
+ if (logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE, "ProtocolHandler calling ACK");
+ }
+ return sendAndReceive(transaction);
+ }
+
+ /**
+ * Fetch new task from a remote driver.
+ *
+ * When the driver is out of tasks, a single thread first sleeps for awhile
and than tries again
+ * until a task is returned.
+ *
+ * @param transaction
+ * @return
+ * @throws Exception
+ */
+ private IMetaTaskTransaction callGet(IMetaTaskTransaction transaction)
throws Exception {
+ transaction.setType(Type.Get);
+ if (logger.isLoggable(Level.FINE)) {
+ logger.log(Level.FINE, "ProtocolHandler calling GET");
+ }
+ IMetaTaskTransaction metaTransaction = null;
+
+ while (running) {
+ metaTransaction = sendAndReceive(transaction);
+ if (metaTransaction.getMetaTask() != null
+ && metaTransaction.getMetaTask().getUserSpaceTask() != null) {
+ return metaTransaction;
+ }
+
+ // If the first thread to get the lock poll for work and unlock when
work found
+ // If don't immediately get the lock then wait for the lock to be
released when
+ // work becomes available,
+ // and immediately release the lock and loop back to retry
+ boolean firstLocker = noWorkLock.tryLock();
+ if (!firstLocker) {
+ noWorkLock.lock();
+ noWorkLock.unlock();
+ continue;
+ }
+
+ // If the first one here hold the lock and sleep before retrying
+ if (logger.isLoggable(Level.INFO)) {
+ logger.log(Level.INFO, "Driver is out of tasks - waiting for "
+ + noTaskStrategy.getWaitTimeInMillis() + "ms before trying
again ");
+ }
+ while (running) {
+ noTaskStrategy.handleNoTaskSupplied();
+ metaTransaction = sendAndReceive(transaction);
+ if (metaTransaction.getMetaTask() != null
+ && metaTransaction.getMetaTask().getUserSpaceTask() != null) {
+ noWorkLock.unlock();
+ return metaTransaction;
+ }
+ }
+ }
+ ;
+
+ return metaTransaction; // When shutting down
+ }
+
+ /**
+ * Block until service start() is called
+ *
+ * @throws ServiceInitializationException
+ */
+ private void awaitStart() throws ServiceInitializationException {
+ try {
+ startLatch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ServiceInitializationException("Thread interrupted while
awaiting start()");
+ }
+ }
+
+ public String call() throws ServiceInitializationException, ServiceException
{
+ // we may fail in initialize() in which case the
ServiceInitializationException
+ // is thrown
+ initialize();
+
+ // now wait for application to call start
+ awaitStart();
+
+ // check ThreadLocal for a Map entry for this thread id. If not found,
create
+ // dedicated XStream instance for this thread which will be useed to
serialize/deserialize
+ // this thread's tasks
+ if (threadLocalXStream.get().get(Thread.currentThread().getId()) == null) {
+ threadLocalXStream.get().put(Thread.currentThread().getId(),
+ XStreamUtils.getXStreamInstance());// new XStream(new
DomDriver()));
+ }
+ // all threads intialized, enter running state
+
+ IMetaTaskTransaction transaction = null;
+
+ if (logger.isLoggable(Level.INFO)) {
+ logger.log(Level.INFO,
+ ".............. Thread " + Thread.currentThread().getId() + "
ready to process");
+ }
+
+ while (running) {
+
+ try {
+ // send GET Request
+ transaction = callGet(new MetaTaskTransaction());
+ // the code may have blocked in callGet for awhile, so check
+ // if service is still running. If this service is in quiescing
+ // mode, finish processing current task. The while-loop will
+ // terminate when the task is finished.
+ if (!running && !quiescing) {
+ break;
+ }
+ // transaction may be null if retryUntilSuccessfull was interrupted
+ // due to stop
+ if (Objects.isNull(transaction) || (!running && !quiescing)) {
+ break;
+ }
+ logger.log(Level.FINE,
+ ".............. Thread " + Thread.currentThread().getId() + "
processing new task");
+ if (Objects.isNull(transaction.getMetaTask())) {
+ // this should only be the case when the service is stopping and
transport is
+ // shutdown
+ if (running) {
+ logger.log(Level.INFO, ".............. Thread " +
Thread.currentThread().getId()
+ + " GET returned null MetaTask while service is in a
running state - this is unexpected");
+ }
+ // if !running, the while loop above will terminate
+ continue;
+ }
+
+ Object task = transaction.getMetaTask().getUserSpaceTask();
+
+ // send ACK
+ transaction = callAck(transaction);
+ if (!running && !quiescing) {
+ break;
+ }
+ IProcessResult processResult = processor.process((String) task);
+
+ // assume success
+ Action action = Action.CONTINUE;
+ // check if process error occurred.
+ String errorAsString = processResult.getError();
+
+ if (processResult.terminateProcess()) {
+ action = Action.TERMINATE;
+ } else if (Objects.isNull(errorAsString)) {
+ // success
+
transaction.getMetaTask().setPerformanceMetrics(processResult.getResult());
+ }
+ if (Objects.nonNull(errorAsString)) {
+ IMetaTask mc = transaction.getMetaTask();
+ // the ducc.deploy.JpType is only present for jobs. If not specified
+ // we return stringified exception to the client. The JD expects
+ // Java Exception object for its error handling
+ if (Objects.isNull(System.getProperty("ducc.deploy.JpType"))) {
+
+ mc.setUserSpaceException(errorAsString);
+ } else {
+ logger.log(Level.INFO, "Sending Exception to JD:\n"
+ + ((Exception) processResult.getExceptionObject()));
+ // JD expects serialized exception as byte[]
+
mc.setUserSpaceException(serializeError(processResult.getExceptionObject()));
+ }
+
+ }
+
+ // send END Request
+ callEnd(transaction);
+ if (running && Action.TERMINATE.equals(action)) {
+ logger.log(Level.WARNING, "Processor Failure - Action=Terminate");
+ // Can't stop using the current thread. This thread
+ // came from a thread pool we want to stop. Need
+ // a new/independent thread to call stop()
+ new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ delegateStop();
+ }
+ }).start();
+ running = false;
+ }
+
+ } catch (IllegalStateException e) {
+ break;
+ } catch (TransportException e) {
+ break;
+ } catch (Exception e) {
+ logger.log(Level.WARNING, "", e);
+ break;
+ }
+ }
+ stopLatch.countDown();
+ System.out.println(Utils.getTimestamp() + ">>>>>>> " +
Utils.getShortClassname(this.getClass())
+ + ".call() >>>>>>>>>> Thread [" + Thread.currentThread().getId() +
"] "
+ + " ProtocolHandler stopped requesting new tasks - Stopping
processor");
+ logger.log(Level.INFO, "ProtocolHandler stopped requesting new tasks -
Stopping processor");
+
+ if (processor != null) {
+ processor.stop();
+ }
+ return String.valueOf(Thread.currentThread().getId());
+ }
+
+ private byte[] serializeError(Throwable t) throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+
+ try {
+ oos.writeObject(t);
+ } catch (Exception e) {
+ try {
+ logger.log(Level.WARNING,
+ "Unable to Serialize " + t.getClass().getName() + " - Will
Stringify It Instead");
+
+ } catch (Exception ee) {
+ }
+ throw e;
+ } finally {
+ oos.close();
+ }
+
+ return baos.toByteArray();
+ }
+
+ private void delegateStop() {
+ service.quiesceAndStop();
+ }
+
+ @Override
+ public void stop() {
+ quiescing = false;
+ running = false;
+ try {
+ // use try catch to handle a possible race condition
+ // when retryThread is not null, but it becomes null
+ // before we call interrupt causing NPE. All this would
+ // mean is that retryUntilSuccess() succeeded.
+ if (retryThread != null) {
+ retryThread.interrupt();
+ }
+ } catch (Exception ee) {
+ } // noTaskStrategy.interrupt();
+ if (logger.isLoggable(Level.INFO)) {
+ logger.log(Level.INFO, this.getClass().getName() + " stop() called");
+ }
+ }
+
+ @Override
+ public void quiesceAndStop() {
+
+ // Use System.out since the logger's ShutdownHook may have closed streams
+ System.out.println(Utils.getTimestamp() + ">>>>>>> " +
Utils.getShortClassname(this.getClass())
+ + ".queisceAndStop()");
+ logger.log(Level.INFO, this.getClass().getName() + " quiesceAndStop()
called");
+ // change state of transport to not running but keep connection open
+ // so that other threads can quiesce (send results)
+ transport.stop(true);
+
+ quiescing = true;
+ running = false;
+ try {
+ // use try catch to handle a possible race condition
+ // when retryThread is not null, but it becomes null
+ // before we call interrupt causing NPE. All this would
+ // mean is that retryUntilSuccess() succeeded.
+ if (retryThread != null) {
+ retryThread.interrupt();
+ }
+ } catch (Exception ee) {
+ }
+ try {
+ // wait for process threads to terminate
+ stopLatch.await();
+ } catch (Exception e) {
+
+ }
+ // Use System.out since the logger's ShutdownHook may have closed streams
+ System.out.println(Utils.getTimestamp() + ">>>>>>> " +
Utils.getShortClassname(this.getClass())
+ + ".queisceAndStop() All process threads completed quiesce");
+ logger.log(Level.INFO, this.getClass().getName() + " All process threads
completed quiesce");
+ }
+
+ @Override
+ public void start() {
+ running = true;
+ // process threads are initialized and are awaiting latch countdown
+ startLatch.countDown();
+ }
+
+ @Override
+ public void setServiceProcessor(IServiceProcessor processor) {
+ this.processor = processor;
+ }
+
+ @Override
+ public void setTransport(IServiceTransport transport) {
+ this.transport = transport;
+ }
+
+ public static class Builder {
+ private IServiceTransport transport;
+
+ private IServiceProcessor processor;
+
+ private INoTaskAvailableStrategy strategy;
+
+ // each thread will count down the latch
+ private CountDownLatch initLatch;
+
+ private CountDownLatch stopLatch;
+
+ private IService service;
+
+ public Builder withTransport(IServiceTransport transport) {
+ this.transport = transport;
+ return this;
+ }
+
+ public Builder withProcessor(IServiceProcessor processor) {
+ this.processor = processor;
+ return this;
+ }
+
+ public Builder withInitCompleteLatch(CountDownLatch initLatch) {
+ this.initLatch = initLatch;
+ return this;
+ }
+
+ public Builder withDoneLatch(CountDownLatch stopLatch) {
+ this.stopLatch = stopLatch;
+ return this;
+ }
+
+ public Builder withNoTaskStrategy(INoTaskAvailableStrategy strategy) {
+ this.strategy = strategy;
+ return this;
+ }
+
+ public Builder withService(IService service) {
+ this.service = service;
+ return this;
+ }
+
+ public DefaultServiceProtocolHandler build() {
+ return new DefaultServiceProtocolHandler(this);
+ }
+ }
}
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/IServiceTransport.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/IServiceTransport.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/IServiceTransport.java
(original)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/IServiceTransport.java
Wed May 1 13:42:39 2019
@@ -18,14 +18,20 @@
*/
package org.apache.uima.ducc.ps.service.transport;
+import java.util.HashMap;
+
import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction;
import org.apache.uima.ducc.ps.service.IServiceComponent;
import org.apache.uima.ducc.ps.service.errors.ServiceInitializationException;
+import com.thoughtworks.xstream.XStream;
+
public interface IServiceTransport extends IServiceComponent {
// called by Protocal Handler. Any errors will be handled
// by instance of IServiceErrorHandler
- public IMetaTaskTransaction dispatch(String request) throws
TransportException;
+// public IMetaTaskTransaction dispatch(String request) throws
TransportException;
+ public IMetaTaskTransaction dispatch(String serializedRequest,
ThreadLocal<HashMap<Long, XStream>> localXStream) throws TransportException;
+
// initialize transport
public void initialize() throws ServiceInitializationException;
// stop transport
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/XStreamUtils.java
URL:
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/XStreamUtils.java?rev=1858488&r1=1858487&r2=1858488&view=diff
==============================================================================
---
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/XStreamUtils.java
(original)
+++
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/transport/XStreamUtils.java
Wed May 1 13:42:39 2019
@@ -45,4 +45,9 @@ public class XStreamUtils {
return xStream.fromXML(targetToUnmarshall);
}
}
+ public static XStream getXStreamInstance() {
+ XStream xStream = new XStream(new DomDriver());
+ initXStreanSecurity(xStream);
+ return xStream;
+ }
}