Author: cwiklik
Date: Fri Nov  7 17:33:53 2014
New Revision: 1637417

URL: http://svn.apache.org/r1637417
Log:
UIMA-4076 initial implementation of JD and JP transport

Modified:
    
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-container/pom.xml
    uima/sandbox/uima-ducc/trunk/uima-ducc-parent/pom.xml
    uima/sandbox/uima-ducc/trunk/uima-ducc-transport/pom.xml
    
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/agent/ProcessStateUpdate.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverConfiguration.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessConfiguration.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java?rev=1637417&r1=1637416&r2=1637417&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/deploy/ServiceStateNotificationAdapter.java
 Fri Nov  7 17:33:53 2014
@@ -23,12 +23,6 @@ import java.util.List;
 import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
 import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
 
-
-/**
- * Interface to 
- * 
- *
- */
 public interface ServiceStateNotificationAdapter {
        public void notifyAgentWithStatus(ProcessState state);
        public void notifyAgentWithStatus(ProcessState state, String message);

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/pom.xml
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/pom.xml?rev=1637417&r1=1637416&r2=1637417&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/pom.xml (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/pom.xml Fri Nov  7 
17:33:53 2014
@@ -1,76 +1,69 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one
-   or more contributor license agreements.  See the NOTICE file
-   distributed with this work for additional information
-   regarding copyright ownership.  The ASF licenses this file
-   to you under the Apache License, Version 2.0 (the
-   "License"); you may not use this file except in compliance
-   with the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing,
-   software distributed under the License is distributed on an
-   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-   KIND, either express or implied.  See the License for the
-   specific language governing permissions and limitations
-   under the License.    
--->    
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <artifactId>uima-ducc-parent</artifactId>
-    <groupId>org.apache.uima</groupId>
-    <version>2.0.0-SNAPSHOT</version>
-    <relativePath>../uima-ducc-parent/pom.xml</relativePath>
-  </parent>
-  
-  <!-- Inherits groupid and version from the parent pom project coordinates -->
-  <!-- Uses default packaging ie. jar                                       -->
-  <artifactId>uima-ducc-container</artifactId>
-  <name>${uima.ducc} ${project.artifactId}</name>
-  
-   <!-- Special inheritance note even though the <scm> element that follows 
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor 
+       license agreements. See the NOTICE file distributed with this work for 
additional 
+       information regarding copyright ownership. The ASF licenses this file 
to 
+       you under the Apache License, Version 2.0 (the "License"); you may not 
use 
+       this file except in compliance with the License. You may obtain a copy 
of 
+       the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required 
+       by applicable law or agreed to in writing, software distributed under 
the 
+       License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS 
+       OF ANY KIND, either express or implied. See the License for the 
specific 
+       language governing permissions and limitations under the License. -->
+       
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+       <parent>
+               <artifactId>uima-ducc-parent</artifactId>
+               <groupId>org.apache.uima</groupId>
+               <version>2.0.0-SNAPSHOT</version>
+               <relativePath>../uima-ducc-parent/pom.xml</relativePath>
+       </parent>
+
+       <!-- Inherits groupid and version from the parent pom project 
coordinates -->
+       <!-- Uses default packaging ie. jar -->
+       <artifactId>uima-ducc-container</artifactId>
+       <name>${uima.ducc} ${project.artifactId}</name>
+
+       <!-- Special inheritance note even though the <scm> element that 
follows 
                is exactly the same as those in super poms, it cannot be 
inherited because 
                there is some special code that computes the connection 
elements from the 
                chain of parent poms, if this is omitted. Keeping this a bit 
factored allows 
                cutting/pasting the <scm> element, and just changing the 
following two properties -->
-  <scm>
-       <connection>
+       <scm>
+               <connection>
       
scm:svn:http://svn.apache.org/repos/asf/uima/sandbox/uima-ducc/trunk/uima-ducc-container
     </connection>
-       <developerConnection>
+               <developerConnection>
       
scm:svn:https://svn.apache.org/repos/asf/uima/sandbox/uima-ducc/trunk/uima-ducc-container
     </developerConnection>
-       <url>
+               <url>
       
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container
     </url>
-  </scm>
-  <dependencyManagement>
+       </scm>
+       <dependencyManagement>
                <dependencies>
-            <dependency>
+                       <dependency>
                                <groupId>org.apache.uima</groupId>
-                           <artifactId>uima-ducc-user</artifactId>
+                               <artifactId>uima-ducc-user</artifactId>
                                <version>${project.version}</version>
                        </dependency>
                </dependencies>
-  </dependencyManagement>
-  
-  <dependencies>
-        <!-- Dependencies on other DUCC projects -->
-        <dependency>
+       </dependencyManagement>
+
+       <dependencies>
+               <!-- Dependencies on other DUCC projects -->
+               <dependency>
                        <groupId>org.apache.uima</groupId>
                        <artifactId>uima-ducc-common</artifactId>
                </dependency>
 
-       
-        <dependency>
+
+               <dependency>
                        <groupId>org.apache.uima</groupId>
                        <artifactId>uima-ducc-user</artifactId>
                </dependency>
        </dependencies>
-  
        <build>
                <pluginManagement>
                   <plugins>

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-parent/pom.xml
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-parent/pom.xml?rev=1637417&r1=1637416&r2=1637417&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-parent/pom.xml (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-parent/pom.xml Fri Nov  7 17:33:53 
2014
@@ -138,7 +138,9 @@
                <commons.cli.version>1.2</commons.cli.version>
                <joda.time.version>2.1</joda.time.version>
                <jsch.version>0.1.29</jsch.version>
-               <commons.httpclient.version>3.1</commons.httpclient.version>
+               <!-- 
commons.httpclient.version>4.3.5</commons.httpclient.version>
+               <commons.httpcore.version>4.3.5</commons.httpcore.version -->
+               
                <commons.codec.version>1.2</commons.codec.version>
                <commons.collections.version>3.2.1</commons.collections.version>
                <commons.lang.version>2.6</commons.lang.version>
@@ -165,9 +167,9 @@
                <servlet-api.version>2.5</servlet-api.version>
                <derby.version>10.10.1.1</derby.version>
         
-               <http.commons.client.version>4.2.1</http.commons.client.version>
-               
-               <http.commons.core.version>4.2.1</http.commons.core.version>
+               <http.commons.client.version>4.3.5</http.commons.client.version>
+               
<http.commons.client-cache.version>4.3.5</http.commons.client-cache.version>
+               <http.commons.core.version>4.3.2</http.commons.core.version>
         
         <!-- Needed for NOTICE file packaged in each jar under META-INF -->
         <projectTimeSpan>2012</projectTimeSpan>
@@ -230,6 +232,12 @@ ${uimaDUCCNoticeText}
             </dependency>
         
             <dependency>
+              <groupId>org.apache.httpcomponents</groupId>
+              <artifactId>httpclient-cache</artifactId>
+              <version>${http.commons.client-cache.version}</version>
+            </dependency>
+            
+            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpcore</artifactId>
                <version>${http.commons.core.version}</version>
@@ -541,11 +549,11 @@ ${uimaDUCCNoticeText}
                                <version>${aopalliance.version}</version>
                        </dependency -->
 
-                       <dependency>
+                       <!--dependency>
                                <groupId>commons-httpclient</groupId>
                                <artifactId>commons-httpclient</artifactId>
                                <version>${commons.httpclient.version}</version>
-                       </dependency>
+                       </dependency -->
 
                    
                
@@ -569,6 +577,13 @@ ${uimaDUCCNoticeText}
         <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
+           <scope>runtime</scope>
+        </dependency>
+        
+        <dependency>
+           <groupId>org.apache.httpcomponents</groupId>
+           <artifactId>httpclient-cache</artifactId>
+           <scope>runtime</scope>
         </dependency>
         
         <dependency>

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/pom.xml
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/pom.xml?rev=1637417&r1=1637416&r2=1637417&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/pom.xml (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/pom.xml Fri Nov  7 
17:33:53 2014
@@ -64,6 +64,11 @@
                        <scope>compile</scope>
                </dependency>
 
+                       <dependency>
+                               <groupId>org.apache.camel</groupId>
+                               <artifactId>camel-jetty</artifactId>
+                       </dependency>
+
 
                <dependency>
                        <groupId>org.apache.uima</groupId>
@@ -74,12 +79,17 @@
 
 
 
-        <dependency>
+        <!-- dependency>
             <groupId>commons-httpclient</groupId>
             <artifactId>commons-httpclient</artifactId>
         </dependency>
 
         <dependency>
+            <groupId>commons-httpcore</groupId>
+            <artifactId>commons-httpcore</artifactId>
+        </dependency -->
+
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
                <scope>test</scope>

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/agent/ProcessStateUpdate.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/agent/ProcessStateUpdate.java?rev=1637417&r1=1637416&r2=1637417&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/agent/ProcessStateUpdate.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/agent/ProcessStateUpdate.java
 Fri Nov  7 17:33:53 2014
@@ -25,6 +25,8 @@ import org.apache.uima.ducc.transport.ev
 
 
 public class ProcessStateUpdate implements Serializable {
+         public static final String ProcessStateUpdatePort = 
"ducc.agent.process.state.update.port";
+
        /**
    * 
    */

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverConfiguration.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverConfiguration.java?rev=1637417&r1=1637416&r2=1637417&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverConfiguration.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jd/JobDriverConfiguration.java
 Fri Nov  7 17:33:53 2014
@@ -1,5 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
 package org.apache.uima.ducc.transport.configuration.jd;
 
-public class JobDriverConfiguration {
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.jetty.JettyHttpComponent;
+import org.apache.uima.ducc.common.config.CommonConfiguration;
+import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.transport.DuccTransportConfiguration;
+import 
org.apache.uima.ducc.transport.configuration.jd.iface.IJobDriverComponent;
+import org.apache.uima.ducc.transport.event.JdStateDuccEvent;
+import org.apache.uima.ducc.transport.event.delegate.DuccEventDelegateListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+
+
+       /**
+        * A {@link JobDriverConfiguration} to configure JobDriver component. 
Depends on 
+        * properties loaded by a main program into System properties. 
+        * 
+        */
+       @Configuration
+       @Import({DuccTransportConfiguration.class,CommonConfiguration.class})
+       public class JobDriverConfiguration {
+               //      use Spring magic to autowire (instantiate and bind) 
CommonConfiguration to a local variable
+               @Autowired CommonConfiguration common;
+               //      use Spring magic to autowire (instantiate and bind) 
DuccTransportConfiguration to a local variable
+               @Autowired DuccTransportConfiguration jobDriverTransport;
+               
+               /**
+                * Instantiate {@link JobDriverEventListener} which will handle 
incoming messages.
+                * 
+                * @param jd - {@link JobDriverComponent} instance
+                * @return - {@link JobDriverEventListener}
+                */
+               public JobDriverEventListener 
jobDriverDelegateListener(IJobDriverComponent jdc) {
+                       JobDriverEventListener jdel =  new 
JobDriverEventListener(jdc);
+                       return jdel;
+               }
+               /**
+                * Create a Router to handle incoming messages from a given 
endpoint. All messages are delegated
+                * to a provided listener. Note: Camel uses introspection to 
determine which method to call when
+                * delegating a message. The name of the method doesnt matter 
it is the argument that needs
+                * to match the type of object in the message. If there is no 
method with a matching argument
+                * type the message will not be delegated.
+                * 
+                * @param endpoint - endpoint where messages are expected
+                * @param delegate - {@link JobDriverEventListener} instance
+                * @return - initialized {@link RouteBuilder} instance
+                * 
+                */
+               public synchronized RouteBuilder 
routeBuilderForIncomingRequests(final String endpoint, final 
JobDriverEventListener delegate) {
+               return new RouteBuilder() {
+                   public void configure() {
+                       from(endpoint)
+                       .bean(delegate);
+                   }
+               };
+           }
+
+               
+               /**
+                * Creates Camel router that will publish Dispatched Job state 
at regular intervals.
+                * 
+                * @param targetEndpointToReceiveJdStateUpdate - endpoint where 
to publish Jd state 
+                * @param statePublishRate - how often to publish state
+                * @return
+                * @throws Exception
+                */
+               private RouteBuilder routeBuilderForJdStatePost(final 
IJobDriverComponent jdc, final String targetEndpointToReceiveJdStateUpdate, 
final int statePublishRate) throws Exception {
+                       final JobDriverStateProcessor jdsp =  // an object 
responsible for generating the state 
+                               new JobDriverStateProcessor(jdc);
+                       
+                       return new RouteBuilder() {
+                             public void configure() {
+                               
from("timer:jdStateDumpTimer?fixedRate=true&period=" + statePublishRate)
+                                       .process(jdsp)
+                                       
.to(targetEndpointToReceiveJdStateUpdate);
+                             }
+                           };
+
+               }
+               private RouteBuilder routeBuilderForJpIncomingRequests(final 
CamelContext camelContext, final JobDriverEventListener delegate, final int 
port, final String app) throws Exception {
+                   return new RouteBuilder() {
+                       public void configure() throws Exception {
+                           JettyHttpComponent jetty = new JettyHttpComponent();
+                           jetty.setMaxThreads(4);  // Need to parameterize
+                           jetty.setMinThreads(1);
+                           camelContext.addComponent("jetty", jetty);
+                           // listen on all interfaces.
+                           from("jetty:http://0.0.0.0:"; + port + "/"+app).
+                             bean(delegate);
+                       }
+                   };
+               }
+               
+               /**
+                * Camel Processor responsible for generating Dispatched Job's 
state.
+                * 
+                */
+               private class JobDriverStateProcessor implements Processor {
+                       private IJobDriverComponent jdc;
+                       
+                       private JobDriverStateProcessor(IJobDriverComponent 
jdc) {
+                               this.jdc = jdc;
+                       }
+                       public void process(Exchange exchange) throws Exception 
{
+                               // Fetch new state from Dispatched Job
+//                             JdStateDuccEvent sse = jdc.getState();
+                               //      Add the state object to the Message
+//                             exchange.getIn().setBody(sse);
+                       }
+                       
+               }
+               
+               /**
+                * Creates and initializes {@link JobDriverComponent} instance. 
@Bean annotation identifies {@link JobDriverComponent}
+                * as a Spring framework Bean which will be managed by Spring 
container.  
+                * 
+                * @return {@link JobDriverComponent} instance
+                * 
+                * @throws Exception
+                */
+               @Bean 
+               public JobDriverComponent jobDriver() throws Exception {
+                       JobDriverComponent jdc = new 
JobDriverComponent("JobDriver", common.camelContext(), this);
+               //      Instantiate delegate listener to receive incoming 
messages. 
+               JobDriverEventListener delegateListener = 
this.jobDriverDelegateListener(jdc);
+                       //      Inject a dispatcher into the listener in case 
it needs to send
+                       //  a message to another component
+               
delegateListener.setDuccEventDispatcher(jobDriverTransport.duccEventDispatcher(common.orchestratorStateUpdateEndpoint,
 jdc.getContext()));
+                       //      Inject Camel Router that will delegate messages 
to JobDriver delegate listener
+                       
jdc.getContext().addRoutes(this.routeBuilderForIncomingRequests(common.orchestratorAbbreviatedStateUpdateEndpoint,
 delegateListener));
+                       
+                       int port = Utils.findFreePort();
+                       String jdUniqueId = "jdApp";
+                       
jdc.getContext().addRoutes(this.routeBuilderForJpIncomingRequests(jdc.getContext(),
 delegateListener, port, jdUniqueId));
+                       
jdc.getContext().addRoutes(this.routeBuilderForJdStatePost(jdc, 
common.jdStateUpdateEndpoint, Integer.parseInt(common.jdStatePublishRate)));
+                       return jdc;
+               }
 
 }

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessConfiguration.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessConfiguration.java?rev=1637417&r1=1637416&r2=1637417&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessConfiguration.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessConfiguration.java
 Fri Nov  7 17:33:53 2014
@@ -1,5 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.uima.ducc.transport.configuration.jp;
 
-public class JobProcessConfiguration {
+import java.net.InetAddress;
 
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.uima.ducc.common.config.CommonConfiguration;
+import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.container.jp.JobProcessManager;
+import org.apache.uima.ducc.transport.DuccExchange;
+import org.apache.uima.ducc.transport.DuccTransportConfiguration;
+import org.apache.uima.ducc.transport.agent.ProcessStateUpdate;
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+
+@Configuration
+@Import({ DuccTransportConfiguration.class, CommonConfiguration.class })
+public class JobProcessConfiguration  {
+       @Autowired
+       DuccTransportConfiguration transport;
+       @Autowired
+       CommonConfiguration common;
+       JobProcessComponent duccComponent = null;
+       JobProcessManager jobProcessManager = null;
+       AgentSession agent = null;
+       //protected ProcessState currentState = ProcessState.Undefined;
+       //protected ProcessState previousState = ProcessState.Undefined;
+       RouteBuilder routeBuilder;
+       CamelContext camelContext;
+
+       /**
+        * Creates Camel Router to handle incoming messages
+        * 
+        * @param delegate
+        *            - {@code AgentEventListener} to delegate messages to
+        * 
+        * @return {@code RouteBuilder} instance
+        */
+       public synchronized RouteBuilder routeBuilderForIncomingRequests(
+                       final String thisNodeIP, final JobProcessEventListener 
delegate) {
+               return new RouteBuilder() {
+                       // Custom filter to select messages that are targeted 
for this
+                       // process. Checks the Node IP in a message to 
determine if 
+                       // this process is the target.
+                       Predicate filter = new DuccProcessFilter(thisNodeIP);
+
+                       public void configure() throws Exception {
+                               System.out
+                                               .println("Service Wrapper 
Starting Request Channel on Endpoint:"
+                                                               + 
common.managedServiceEndpoint);
+                               onException(Exception.class).handled(true)
+                                               .process(new 
ErrorProcessor()).end();
+
+                               from(common.managedServiceEndpoint)
+
+                               .choice().when(filter).bean(delegate).end()
+                                               
.setId(common.managedServiceEndpoint);
+
+                       }
+               };
+       }
+
+       public class ErrorProcessor implements Processor {
+
+               public void process(Exchange exchange) throws Exception {
+                       // the caused by exception is stored in a property on 
the exchange
+                       Throwable caused = 
exchange.getProperty(Exchange.EXCEPTION_CAUGHT,
+                                       Throwable.class);
+                       caused.printStackTrace();
+                       // 
System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1");
+                       // assertNotNull(caused);
+                       // here you can do what you want, but Camel regard this 
exception as
+                       // handled, and
+                       // this processor as a failurehandler, so it wont do 
redeliveries.
+                       // So this is the
+                       // end of this route. But if we want to route it 
somewhere we can
+                       // just get a
+                       // producer template and send it.
+
+                       // send it to our mock endpoint
+                       // 
exchange.getContext().createProducerTemplate().send("mock:myerror",
+                       // exchange);
+               }
+       }
+
+       
+       @Bean
+       public JobProcessComponent getProcessManagerInstance() throws Exception 
{
+               try {
+                       // Assume IP address provided from environment. In 
production this
+                       // will be the actual node IP. In testing, the IP can 
be virtual
+                       // when running multiple agents on the same node. The 
agent is
+                       // responsible for providing the IP in this process 
environment.
+                       String thisNodeIP = (System.getenv("IP") == null) ? 
InetAddress
+                                       .getLocalHost().getHostAddress() : 
System.getenv("IP");
+                       camelContext = common.camelContext();
+                       int serviceSocketPort = 0;
+                       String agentSocketParams = "";
+                       String jpSocketParams = "";
+                       if (common.managedServiceEndpointParams != null) {
+                               jpSocketParams = "?" + 
common.managedServiceEndpointParams;
+                       }
+
+                       if (common.managedProcessStateUpdateEndpointParams != 
null) {
+                               agentSocketParams = "?"
+                                               + 
common.managedProcessStateUpdateEndpointParams;
+                       }
+                       // set up agent socket endpoint where this UIMA AS 
service will send
+                       // state updates
+                       if (common.managedProcessStateUpdateEndpointType != null
+                                       && 
common.managedProcessStateUpdateEndpointType
+                                                       
.equalsIgnoreCase("socket")) {
+                               common.managedProcessStateUpdateEndpoint = 
"mina:tcp://localhost:"
+                                               + 
System.getProperty(ProcessStateUpdate.ProcessStateUpdatePort)
+                                               + agentSocketParams;
+                       }
+                       // set up a socket endpoint where the UIMA AS service 
will receive
+                       // events sent from its agent
+                       if (common.managedServiceEndpointType != null
+                                       && common.managedServiceEndpointType
+                                                       
.equalsIgnoreCase("socket")) {
+                               serviceSocketPort = Utils.findFreePort();
+                               // service is on the same node as the agent
+                               common.managedServiceEndpoint = 
"mina:tcp://localhost:"
+                                               + serviceSocketPort + 
jpSocketParams;
+                       }
+
+                       DuccEventDispatcher eventDispatcher = transport
+                                       .duccEventDispatcher(
+                                                       
common.managedProcessStateUpdateEndpoint,
+                                                       camelContext);
+
+//                     ManagedUimaService service = 
+//                             new ManagedUimaService(common.saxonJarPath,
+//                                             common.dd2SpringXslPath, 
+//                                             
serviceAdapter(eventDispatcher,common.managedServiceEndpoint), camelContext);
+                       
+//                     service.setConfigFactory(this);
+//                 
service.setAgentStateUpdateEndpoint(common.managedProcessStateUpdateEndpoint);
+            
+                       // Create an Agent proxy. This is used to notify the 
Agent
+                       // of state changes.
+                       agent = new AgentSession(eventDispatcher,
+                                       System.getenv("ProcessDuccId"), 
common.managedServiceEndpoint);
+
+                       
+                       System.out
+                                       
.println("#######################################################");
+                       System.out.println("## Agent Service State Update 
Endpoint:"
+                                       + 
common.managedProcessStateUpdateEndpoint + " ##");
+                       System.out
+                                       
.println("#######################################################");
+
+//                     JobProcessEventListener delegateListener = 
processDelegateListener(jobProcessManager);
+//                     
delegateListener.setDuccEventDispatcher(eventDispatcher);
+                       
+                       jobProcessManager = new JobProcessManager();
+                       // Create Lifecycle manager responsible for handling 
start event
+                       // initiated by the Ducc framework. It will eventually 
call the
+                       // start(String[] args) method on 
JobProcessConfiguration object
+                       // which kicks off initialization of UIMA pipeline and 
processing
+                       // begins.
+                       duccComponent = 
+                                       new JobProcessComponent("UimaProcess", 
camelContext, this);
+                       duccComponent.setAgentSession(agent);
+                       duccComponent.setJobProcessManager(jobProcessManager);
+                       
+                       JobProcessEventListener eventListener = 
+                                       new 
JobProcessEventListener(duccComponent);
+                       routeBuilder = 
this.routeBuilderForIncomingRequests(thisNodeIP, eventListener);
+
+                       camelContext.addRoutes(routeBuilder);
+
+                       return duccComponent;
+
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       throw e;
+               }
+       }
+/*
+       public void start(String[] args) {
+               try {
+                       String jps = 
System.getProperty("org.apache.uima.ducc.userjarpath");
+                       if (null == jps) {
+                               System.err
+                                               .println("Missing the 
-Dorg.apache.uima.jarpath=XXXX property");
+                               System.exit(1);
+                       }
+                       String processJmxUrl = duccComponent.getProcessJmxUrl();
+                       agent.notify(ProcessState.Initializing, processJmxUrl);
+                       IUimaProcessor uimaProcessor = null; 
+                       ScheduledThreadPoolExecutor executor = null;
+                       
+                       try {
+                               executor = new ScheduledThreadPoolExecutor(1);
+                               executor.prestartAllCoreThreads();
+                               // Instantiate a UIMA AS jmx monitor to poll 
for status of the AE.
+                               // This monitor checks if the AE is 
initializing or ready.
+                               JmxAEProcessInitMonitor monitor = new 
JmxAEProcessInitMonitor(agent);
+                               executor.scheduleAtFixedRate(monitor, 20, 30, 
TimeUnit.SECONDS);
+
+                       // Deploy UIMA pipelines. This blocks until the 
pipelines initializes or
+                       // there is an exception. The IUimaProcessor is a 
wrapper around
+                       // processing container where the analysis is being 
done.
+                       uimaProcessor =
+                                       jobProcessManager.deploy(jps, args, 
"org.apache.uima.ducc.user.jp.UserProcessContainer");
+                               
+                       // pipelines deployed and initialized. This is process 
is Ready
+                       // for processing
+                       currentState = ProcessState.Running;
+                               // Update agent with the most up-to-date state 
of the pipeline
+                       //      monitor.run();
+                               // all is well, so notify agent that this 
process is in Running state
+                               agent.notify(currentState, processJmxUrl);
+                // Create thread pool and begin processing
+                               
+                               
+                               
+                   } catch( Exception ee) {
+                       currentState = ProcessState.FailedInitialization;
+                               System.out
+                                               .println(">>> Failed to Deploy 
UIMA Service. Check UIMA Log for Details");
+                               agent.notify(ProcessState.FailedInitialization);
+                   } finally {
+                               // Stop executor. It was only needed to poll AE 
initialization status.
+                               // Since deploy() completed
+                               // the UIMA AS service either succeeded 
initializing or it failed. In
+                               // either case we no longer
+                               // need to poll for initialization status
+                       if ( executor != null ) {
+                               executor.shutdownNow();
+                       }
+                       
+                   }
+                       
+
+
+               } catch( Exception e) {
+                       currentState = ProcessState.FailedInitialization;
+                       agent.notify(currentState);
+
+                       
+               }
+       }
+       */
+
+/*
+       public void stop() {
+        try {
+               //agent.stop();
+               
+               if (camelContext != null) {
+                       for (Route route : camelContext.getRoutes()) {
+
+                               route.getConsumer().stop();
+                               System.out.println(">>> configFactory.stop() - 
stopped route:"
+                                               + route.getId());
+                       }
+               }
+               } catch( Exception e) {
+                       
+               }
+               
+               
+       }
+*/
+       private class DuccProcessFilter implements Predicate {
+               String thisNodeIP;
+
+               public DuccProcessFilter(final String thisNodeIP) {
+                       this.thisNodeIP = thisNodeIP;
+               }
+
+               public synchronized boolean matches(Exchange exchange) {
+                       // String methodName="DuccProcessFilter.matches";
+                       boolean result = false;
+                       try {
+                               String pid = (String) 
exchange.getIn().getHeader(
+                                               DuccExchange.ProcessPID);
+                               String targetIP = (String) 
exchange.getIn().getHeader(
+                                               DuccExchange.DUCCNODEIP);
+                               // check if this message is targeting this 
process. Check if the
+                               // process PID
+                               // and the node match target process.
+                               if (Utils.getPID().equals(pid) && 
thisNodeIP.equals(targetIP)) { 
+                                       result = true;
+                                       System.out
+                                                       .println(">>>>>>>>> 
Process Received a Message. Is Process target for message:"
+                                                                       + 
result + ". Target PID:" + pid);
+                               }
+                       } catch (Throwable e) {
+                               e.printStackTrace();
+                       }
+                       return result;
+               }
+       }
 }

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java?rev=1637417&r1=1637416&r2=1637417&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-user/src/main/java/org/apache/uima/ducc/user/jp/UimaProcessContainer.java
 Fri Nov  7 17:33:53 2014
@@ -1,4 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.uima.ducc.user.jp;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.BindException;
@@ -40,10 +60,10 @@ public class UimaProcessContainer {
        private static final Class CLASS_NAME = UimaProcessContainer.class;
        private static final char FS = 
System.getProperty("file.separator").charAt(
                        0);
-       public static BrokerService broker = null;// new BrokerService();
+       public static BrokerService broker = null;
        private UimaSerializer uimaSerializer = new UimaSerializer();
 
-       public int  deploy(String[] args) throws Exception {
+       public int deploy(String[] args) throws Exception {
 
                broker = new BrokerService();
                broker.setDedicatedTaskRunner(false);
@@ -79,7 +99,7 @@ public class UimaProcessContainer {
                        ids[i] = deployService(dd);
                }
                // initialize and start UIMA-AS client. This sends GetMeta 
request to
-               // deployed top level service and waits for reply
+               // deployed top level service and waits for a reply
                initializeUimaAsClient(endpointName);
 
                return scaleout;
@@ -89,18 +109,18 @@ public class UimaProcessContainer {
                System.out.println("Stopping UIMA_AS Client");
                try {
                        uimaASClient.stop();
-                       
-               } catch( Exception e) {
+
+               } catch (Exception e) {
                        e.printStackTrace();
                }
                System.out.println("Stopping Broker");
                broker.stop();
                broker.waitUntilStopped();
        }
-       public void initializeUimaAsClient(String endpoint) throws Exception {
 
-               String brokerURL = System.getProperty("DefaultBrokerURL");// 
"vm://localhost?broker.persistent=false";
+       public void initializeUimaAsClient(String endpoint) throws Exception {
 
+               String brokerURL = System.getProperty("DefaultBrokerURL");
                Map<String, Object> appCtx = new HashMap<String, Object>();
                appCtx.put(UimaAsynchronousEngine.ServerUri, brokerURL);
                appCtx.put(UimaAsynchronousEngine.ENDPOINT, endpoint);
@@ -154,8 +174,9 @@ public class UimaProcessContainer {
                CAS cas = uimaASClient.getCAS();
                XmiSerializationSharedData deserSharedData = new 
XmiSerializationSharedData();
 
-               uimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, 
true,-1);
-      // System.out.println("Sending CAS to JD");
+               uimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, 
true,
+                               -1);
+
                uimaASClient.sendAndReceiveCAS(cas);
                cas.release();
        }
@@ -176,9 +197,8 @@ public class UimaProcessContainer {
                endpointName = getArg("-q", args);
 
                if (nbrOfArgs < 1
-                               || (deploymentDescriptors.length == 0
-//                                             || (args[0].startsWith("-") && 
(deploymentDescriptors.length == 0
-                                               || saxonURL.equals("") || 
xslTransform.equals(""))) {
+                               || (deploymentDescriptors.length == 0 || 
saxonURL.equals("") || xslTransform
+                                               .equals(""))) {
                        printUsageMessage();
                        return null; // Done here
                }
@@ -313,19 +333,19 @@ public class UimaProcessContainer {
 
                public void onBeforeProcessCAS(UimaASProcessStatus status,
                                String nodeIP, String pid) {
-//                     System.out
-//                                     .println("runTest: onBeforeProcessCAS() 
Notification - CAS:"
-//                                                     + 
status.getCasReferenceId()
-//                                                     + " is being processed 
on machine:"
-//                                                     + nodeIP
-//                                                     + " by process (PID):" 
+ pid);
+                       // System.out
+                       // .println("runTest: onBeforeProcessCAS() Notification 
- CAS:"
+                       // + status.getCasReferenceId()
+                       // + " is being processed on machine:"
+                       // + nodeIP
+                       // + " by process (PID):" + pid);
                }
 
                public synchronized void 
onBeforeMessageSend(UimaASProcessStatus status) {
                        // casSent = status.getCasReferenceId();
-//                     System.out
-//                                     .println("runTest: Received 
onBeforeMessageSend() Notification With CAS:"
-//                                                     + 
status.getCasReferenceId());
+                       // System.out
+                       // .println("runTest: Received onBeforeMessageSend() 
Notification With CAS:"
+                       // + status.getCasReferenceId());
                }
 
                public void onUimaAsServiceExit(EventTrigger cause) {
@@ -340,16 +360,16 @@ public class UimaProcessContainer {
                        String casReferenceId = ((UimaASProcessStatus) 
aProcessStatus)
                                        .getCasReferenceId();
 
-//                     if (aProcessStatus instanceof UimaASProcessStatus) {
-//                             if (aProcessStatus.isException()) {
-//                                     System.out
-//                                                     .println("--------- Got 
Exception While Processing CAS"
-//                                                                     + 
casReferenceId);
-//                             } else {
-//                                     System.out.println("Client Received 
Reply - CAS:"
-//                                                     + casReferenceId);
-//                             }
-//                     }
+                       // if (aProcessStatus instanceof UimaASProcessStatus) {
+                       // if (aProcessStatus.isException()) {
+                       // System.out
+                       // .println("--------- Got Exception While Processing 
CAS"
+                       // + casReferenceId);
+                       // } else {
+                       // System.out.println("Client Received Reply - CAS:"
+                       // + casReferenceId);
+                       // }
+                       // }
                }
 
                /**
@@ -362,16 +382,16 @@ public class UimaProcessContainer {
                        String casReferenceId = ((UimaASProcessStatus) 
aProcessStatus)
                                        .getCasReferenceId();
 
-//                     if (aProcessStatus instanceof UimaASProcessStatus) {
-//                             if (aProcessStatus.isException()) {
-//                                     System.out
-//                                                     .println("--------- Got 
Exception While Processing CAS"
-//                                                                     + 
casReferenceId);
-//                             } else {
-//                                     System.out.println("Client Received 
Reply - CAS:"
-//                                                     + casReferenceId);
-//                             }
-//                     }
+                       // if (aProcessStatus instanceof UimaASProcessStatus) {
+                       // if (aProcessStatus.isException()) {
+                       // System.out
+                       // .println("--------- Got Exception While Processing 
CAS"
+                       // + casReferenceId);
+                       // } else {
+                       // System.out.println("Client Received Reply - CAS:"
+                       // + casReferenceId);
+                       // }
+                       // }
 
                }
 


Reply via email to