Author: burn
Date: Fri Jul 13 14:04:33 2018
New Revision: 1835841

URL: http://svn.apache.org/viewvc?rev=1835841&view=rev
Log:
UIMA-5822 Delay first work request

Modified:
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java
    
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java?rev=1835841&r1=1835840&r2=1835841&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java
 Fri Jul 13 14:04:33 2018
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -67,10 +67,10 @@ public class PullService implements ISer
        private String clientURL;
        private IRegistryClient registryClient;
        // ******************************************
-       
+
        // internal error handler
        private IServiceErrorHandler errorHandler=null;
-       // 
+       //
        private IServiceMonitor serviceMonitor=null;
        // internal transport to communicate with remote client
        private IServiceTransport transport=null;
@@ -84,12 +84,12 @@ public class PullService implements ISer
        // holds Future to every process thread
        private List<Future<String>> threadHandleList =
                        new ArrayList<>();
-       
+
        private Lock initLock = new ReentrantLock();
-       
-       public PullService(String type) { 
+
+       public PullService(String type) {
                this.type = type;
-               
+
        }
        public String getType() {
                return type;
@@ -134,12 +134,12 @@ public class PullService implements ISer
                registryClient = new DefaultRegistryClient(target);
 
        }
-       
+
        @Override
        public void initialize() throws ServiceInitializationException {
                // only one thread can call this method
                initLock.lock();
-               
+
                try {
                        if ( initialized ) {
                                // Already initialized
@@ -179,13 +179,13 @@ public class PullService implements ISer
                                           .withDoneLatch(stopLatch)
                                           .withInitCompleteLatch(threadsReady)
                                           .build();
-                                          
-                       
-                       // first initialize Processors. The 
ServiceThreadFactory creates 
+
+
+                       // first initialize Processors. The 
ServiceThreadFactory creates
                        // as many threads as defined in 'scaleout'
-                       threadPool = 
+                       threadPool =
                                        new 
ScheduledThreadPoolExecutor(scaleout, new ServiceThreadFactory());
-                       
+
                // Create and start worker threads that pull Work Items from a 
client.
                        // Each worker thread calls processor.initialize() and 
counts down the
                        // 'threadsReady' latch. When all threads finish 
initializing they all
@@ -198,10 +198,10 @@ public class PullService implements ISer
 
                        initializeMonitor();
                        initializeTransport();
-                       
+
                        initialized = true;
-                       
-                       
+
+
                } catch( ServiceInitializationException e) {
                        throw e;
                } catch( InterruptedException e) {
@@ -210,13 +210,13 @@ public class PullService implements ISer
                        throw new ServiceInitializationException("Service 
interrupted during initialization - shutting down process threads");
                } catch( Exception e) {
                        throw new ServiceInitializationException("",e);
-               }  
+               }
                finally {
                        initLock.unlock();
                }
 
        }
-       
+
        @Override
        public void start() throws IllegalStateException, ExecutionException, 
ServiceException {
                if ( !initialized ) {
@@ -252,7 +252,7 @@ public class PullService implements ISer
                stopTransport();
                stopProtocolHandler();
                stopServiceProcessor();
-        // monitor should be stopped last to keep posting updates to observer  
        
+        // monitor should be stopped last to keep posting updates to observer
                stopMonitor();
        }
 
@@ -260,8 +260,8 @@ public class PullService implements ISer
                for (Future<String> future : threadHandleList) {
                        // print the return value of Future, notice the output 
delay in console
                        // because Future.get() waits for task to get completed
-                       logger.log(Level.INFO,
-                                       "Thread:" + 
Thread.currentThread().getName() + " Terminated " + new Date() + "::" + 
future.get());
+                       String result = future.get();
+                       logger.log(Level.INFO, "Thread:" + 
Thread.currentThread().getName() + " Terminated " + new Date() + "::" + result);
                }
        }
 
@@ -307,7 +307,7 @@ public class PullService implements ISer
                }
        }
        private void stopProtocolHandler() {
-               
+
        }
        private void stopTransport() {
                transport.stop();

Modified: 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java?rev=1835841&r1=1835840&r2=1835841&view=diff
==============================================================================
--- 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
 (original)
+++ 
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
 Fri Jul 13 14:04:33 2018
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -44,8 +44,8 @@ import org.apache.uima.util.Level;
 import org.apache.uima.util.Logger;
 
 /**
- * 
- * This protocol handler is a Runnable 
+ *
+ * This protocol handler is a Runnable
  *
  */
 public class DefaultServiceProtocolHandler implements IServiceProtocolHandler {
@@ -66,13 +66,13 @@ public class DefaultServiceProtocolHandl
        private IService service;
        // forces process threads to initialize serially
        private ReentrantLock initLock = new ReentrantLock();
-       
+
        private static AtomicInteger idGenerator = new AtomicInteger();
 
-       
-       private DefaultServiceProtocolHandler(Builder builder) { 
-               this.initLatch = builder.initLatch; 
-               this.stopLatch = builder.stopLatch; 
+
+       private DefaultServiceProtocolHandler(Builder builder) {
+               this.initLatch = builder.initLatch;
+               this.stopLatch = builder.stopLatch;
                this.service = builder.service;
                this.transport = builder.transport;
                this.processor = builder.processor;
@@ -138,7 +138,7 @@ public class DefaultServiceProtocolHandl
                                throw new TransportException("Received invalid 
content (null) in response from client - rejecting request");
                        }
                        o = XStreamUtils.unmarshall(content);
-                       
+
                } catch ( Exception e) {
                        if ( !running ) {
                                throw new TransportException("Service stopping 
- rejecting request");
@@ -171,7 +171,7 @@ public class DefaultServiceProtocolHandl
        }
 
        private IMetaTaskTransaction callGet(IMetaTaskTransaction transaction) 
throws Exception {
-               transaction.setType(Type.Get); 
+               transaction.setType(Type.Get);
                if ( logger.isLoggable(Level.FINE)) {
                        logger.log(Level.FINE, "ProtocolHandler calling GET");
                }
@@ -179,7 +179,7 @@ public class DefaultServiceProtocolHandl
        }
        /**
         * Block until service start() is called
-        * 
+        *
         * @throws ServiceInitializationException
         */
        private void awaitStart() throws ServiceInitializationException {
@@ -194,19 +194,25 @@ public class DefaultServiceProtocolHandl
                // we may fail in initialize() in which case the 
ServiceInitializationException
                // is thrown
                initialize();
-               
+
                // now wait for application to call start
                awaitStart();
-               
+
                // all threads intialized, enter running state
 
                IMetaTaskTransaction transaction = null;
-               
+
                if ( logger.isLoggable(Level.INFO)) {
                        logger.log(Level.INFO, ".............. Thread 
"+Thread.currentThread().getId() + " ready to process");
                }
 
-               
+               logger.log(Level.INFO, "Wait for the initialized state to 
propagate to the SM " +
+                         "so any processing errors are not treates as 
initialization failures");
+               try {
+                       Thread.sleep(30000);
+               } catch (InterruptedException e1) {
+               };
+
                while (running) {
 
                        try {
@@ -218,13 +224,13 @@ public class DefaultServiceProtocolHandl
                                        break;
                                }
                                if (transaction.getMetaTask() == null || 
transaction.getMetaTask().getUserSpaceTask() == null ) {
-                                       // the client has no tasks to give. 
+                                       // the client has no tasks to give.
                                        noTaskStrategy.handleNoTaskSupplied();
                                        continue;
                                }
                                Object task = 
transaction.getMetaTask().getUserSpaceTask();
-                               
-                               // send ACK 
+
+                               // send ACK
                                transaction = callAck(transaction);
                                if (!running) {
                                        break;
@@ -259,8 +265,8 @@ public class DefaultServiceProtocolHandl
                                        }).start();
                                        running = false;
                                }
-                                       
-                               
+
+
 
                        } catch( IllegalStateException e) {
                                break;
@@ -269,14 +275,14 @@ public class DefaultServiceProtocolHandl
                        }
                        catch (Exception e) {
                                logger.log(Level.WARNING,"",e);
-                       }               
+                       }
                }
                stopLatch.countDown();
                logger.log(Level.INFO,"ProtocolHandler terminated");
                return String.valueOf(Thread.currentThread().getId());
        }
 
-       
+
        private void delegateStop() {
                service.stop();
        }
@@ -302,8 +308,8 @@ public class DefaultServiceProtocolHandl
        public void setTransport(IServiceTransport transport) {
                this.transport = transport;
        }
-       
-       
+
+
         public static class Builder {
                        private IServiceTransport transport;
                        private IServiceProcessor processor;
@@ -320,15 +326,15 @@ public class DefaultServiceProtocolHandl
                        public Builder withProcessor(IServiceProcessor 
processor) {
                                this.processor = processor;
                                return this;
-                       }                       
+                       }
                        public Builder withInitCompleteLatch(CountDownLatch 
initLatch) {
                                this.initLatch = initLatch;
                                return this;
-                       }                       
+                       }
                        public Builder withDoneLatch(CountDownLatch stopLatch) {
                                this.stopLatch = stopLatch;
                                return this;
-                       }                       
+                       }
                        public Builder 
withNoTaskStrategy(INoTaskAvailableStrategy strategy) {
                                this.strategy = strategy;
                                return this;


Reply via email to