[
https://issues.apache.org/jira/browse/CAMEL-12931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684806#comment-16684806
]
ASF GitHub Bot commented on CAMEL-12931:
----------------------------------------
oscerd closed pull request #2609: CAMEL-12931 - Upgrade jBPM component to use 7
series with consumer ca…
URL: https://github.com/apache/camel/pull/2609
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/components/camel-jbpm/pom.xml b/components/camel-jbpm/pom.xml
index 6c197d48b95..2a5a161e5db 100644
--- a/components/camel-jbpm/pom.xml
+++ b/components/camel-jbpm/pom.xml
@@ -43,32 +43,55 @@
<artifactId>camel-core</artifactId>
</dependency>
<dependency>
- <groupId>org.kie.remote</groupId>
- <artifactId>kie-remote-client</artifactId>
+ <groupId>org.kie.server</groupId>
+ <artifactId>kie-server-client</artifactId>
<version>${jbpm-version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.hornetq</groupId>
- <artifactId>hornetq-core-client</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.hornetq</groupId>
- <artifactId>hornetq-jms-client</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.jboss.spec.javax.jms</groupId>
- <artifactId>jboss-jms-api_1.1_spec</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.jboss.spec.javax.xml.ws</groupId>
- <artifactId>jboss-jaxws-api_2.2_spec</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.jboss.logging</groupId>
- <artifactId>jboss-logging</artifactId>
- </exclusion>
- </exclusions>
</dependency>
+
+ <!-- jBPM consumer -->
+ <dependency>
+ <groupId>org.kie</groupId>
+ <artifactId>kie-api</artifactId>
+ <version>${jbpm-version}</version>
+ <scope>provided</scope>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.kie</groupId>
+ <artifactId>kie-internal</artifactId>
+ <version>${jbpm-version}</version>
+ <scope>provided</scope>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.jbpm</groupId>
+ <artifactId>jbpm-services-api</artifactId>
+ <version>${jbpm-version}</version>
+ <scope>provided</scope>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.jbpm</groupId>
+ <artifactId>jbpm-case-mgmt-api</artifactId>
+ <version>${jbpm-version}</version>
+ <scope>provided</scope>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.kie.server</groupId>
+ <artifactId>kie-server-services-common</artifactId>
+ <version>${jbpm-version}</version>
+ <scope>provided</scope>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.jbpm</groupId>
+ <artifactId>jbpm-persistence-api</artifactId>
+ <version>${jbpm-version}</version>
+ <scope>provided</scope>
+ <optional>true</optional>
+ </dependency>
+
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
diff --git a/components/camel-jbpm/src/main/docs/jbpm-component.adoc
b/components/camel-jbpm/src/main/docs/jbpm-component.adoc
index 70e7796cdde..46928d21565 100644
--- a/components/camel-jbpm/src/main/docs/jbpm-component.adoc
+++ b/components/camel-jbpm/src/main/docs/jbpm-component.adoc
@@ -4,9 +4,9 @@
*Available as of Camel version 2.6*
The *jbpm* component provides integration with Business Process
-Management (BPM) Suit http://www.jbpm.org/[jBPM]. It uses
-kie-remote-client API to interact with jBPM instance over REST. The
-component supports only producer.
+Management http://www.jbpm.org/[jBPM]. It uses
+kie-server-client API to interact with jBPM instance over REST. The
+component supports both producer and consumer.
Maven users will need to add the following dependency to their `pom.xml`
for this component:
@@ -20,6 +20,111 @@ for this component:
</dependency>
------------------------------------------------------------------------------------
+## Consumer
+
+jBPM Consumer allows to attach routes to
+
+* ProcessEventListeners
+* TaskEventListners
+* CaseEventListeners
+
+### URI format
+
+[source,java]
+---------------------------------------------
+jbpm::events:type:[classifier][?options]
+---------------------------------------------
+
+==== Path Parameters (3 parameters):
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *events* | Classifier for the consumer to know which type of data it should
attach to | | URL
+| *type* | Type of event listener - supports: process, task, case | | String
+| *classifier* | Used to distinguish routes for same event type| | String
+|===
+
+Each route would then receive events when they are being produced by jBPM
engine.
+
+Routes can be defined either in global way - on application level or deployed
+together with business assets projects also knows as KJARs.
+
+Consumers are configured via KieServerExtension that is a pluggable interface
to enhance
+jBPM with additional capabilities. It reacts to different life cycle phases of
the KIE Server
+and by that is able to configure individual endpoints properly.
+
+### KJAR routes
+
+Create file named `camel-routes.xml` in the root folder of your KJAR
(src/main/resources) so it will be automatically
+discovered and Camel Context for given KJAR will be created.
+
+### Global routes
+
+Create file name `global-camel-routes` in the root of the class path of KIE
Server. It will be automatically found and registered
+on every KJAR deployed to KIE Server.
+
+
+Example camel-routes.xml file that can be placed in the KJAR
+
+[source, xml]
+----
+<routes xmlns="http://camel.apache.org/schema/spring">
+
+ <route id="processes">
+ <from uri="jbpm:events:process:test"/>
+ <filter>
+ <simple>${in.header.EventType} == 'beforeProcessStarted'</simple>
+ <to
uri="log:kjar.processes?level=INFO&showBody=true&showHeaders=true"/>
+ </filter>
+ </route>
+
+ <route id="tasks">
+ <from uri="jbpm:events:task:test"/>
+ <filter>
+ <simple>${in.header.EventType} starts with 'before'</simple>
+ <to
uri="log:kjar.tasks?level=INFO&showBody=true&showHeaders=true"/>
+ </filter>
+ </route>
+</routes>
+----
+
+
+### Use of jBPM Component in KIE Server
+
+To make use of camel-jbpm component in a KIE Server it is as simple as just
adding two jars into KIE Server application
+
+* camel-core
+* camel-jbpm
+
+then start KIE Server and you will see once booted following information in
logs
+
+[source, plain]
+----
+Camel KIE Server extension has been successfully registered as server extension
+....
+
+Route: tasks started and consuming from:
jbpm://events:task:test?deploymentId=form-rendering_1.0.0
+Total 2 routes, of which 2 are started
+Apache Camel 2.23.0-SNAPSHOT (CamelContext: KIE Server Camel context for
container evaluation_1.0.0) started in 0.378 seconds
+o.k.server.services.impl.KieServerImpl : Container evaluation_1.0.0 (for
release id evaluation:evaluation:1.0.0) successfully started
+----
+
+To make use of jBPM Consumer jBPM deployment descriptor must also define camel
specific event listeners of following types
+
+* `new org.apache.camel.component.jbpm.listeners.CamelProcessEventListener()`
+* `new org.apache.camel.component.jbpm.listeners.CamelTaskEventListener()`
+* `new org.apache.camel.component.jbpm.listeners.CamelCaseEventListener()`
+
+These must be set in either server level of kjar deployment descriptor (use
MVEL as resolver type) - see jbpm docs for more details about
+deployment descriptors.
+
+## Producer
+
+Producer is dedicated to interact with jBPM via kie-server-client that uses
exposed REST api of
+jBPM (KIE Server).
+
### URI format
[source,java]
@@ -47,40 +152,46 @@ jbpm:connectionURL
with the following path and query parameters:
-==== Path Parameters (1 parameters):
+==== Path Parameters (2 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
|===
| Name | Description | Default | Type
| *connectionURL* | *Required* The URL to the jBPM server. | | URL
+| *eventListenerType* | Sets the event listener type to attach to | | String
|===
-==== Query Parameters (25 parameters):
+==== Query Parameters (30 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
|===
| Name | Description | Default | Type
-| *attachmentId* (producer) | attachId to use when retrieving attachments | |
Long
-| *contentId* (producer) | contentId to use when retrieving attachments | |
Long
-| *deploymentId* (producer) | *Required* The id of the deployment | | String
-| *event* (producer) | the data associated with this event when signalEvent
operation is performed | | Object
-| *eventType* (producer) | the type of event to use when signalEvent operation
is performed | | String
-| *identifier* (producer) | identifier the global identifier | | String
-| *language* (producer) | The language to use when filtering user tasks | |
String
-| *maxNumber* (producer) | the maximum number of rules that should be fired |
| Integer
+| *attachmentId* (common) | attachId to use when retrieving attachments | |
Long
+| *contentId* (common) | contentId to use when retrieving attachments | | Long
+| *deploymentId* (common) | *Required* The id of the deployment | | String
+| *emitterSendItems* (common) | Sets if event produced by emitter should be
sent as single items or complete collection | | Boolean
+| *event* (common) | the data associated with this event when signalEvent
operation is performed | | Object
+| *eventType* (common) | the type of event to use when signalEvent operation
is performed | | String
+| *identifier* (common) | identifier the global identifier | | String
+| *maxNumber* (common) | the maximum number of rules that should be fired | |
Integer
+| *page* (common) | The page to use when retrieving user tasks | | Integer
+| *pageSize* (common) | The page size to use when retrieving user tasks | |
Integer
+| *processId* (common) | the id of the process that should be acted upon | |
String
+| *processInstanceId* (common) | the id of the process instance | | Long
+| *targetUserId* (common) | The targetUserId used when delegating a task | |
String
+| *task* (common) | The task instance to use with task operations | | Task
+| *taskId* (common) | the id of the task | | Long
+| *timeout* (common) | A timeout value | | Integer
+| *userId* (common) | userId to use with task operations | | String
+| *value* (common) | the value to assign to the global identifier | | Object
+| *workItemId* (common) | the id of the work item | | Long
+| *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the
Camel routing Error Handler, which mean any exceptions occurred while the
consumer is trying to pickup incoming messages, or the likes, will now be
processed as a message and handled by the routing Error Handler. By default the
consumer will use the org.apache.camel.spi.ExceptionHandler to deal with
exceptions, that will be logged at WARN or ERROR level and ignored. | false |
boolean
+| *exceptionHandler* (consumer) | To let the consumer use a custom
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this
option is not in use. By default the consumer will deal with exceptions, that
will be logged at WARN or ERROR level and ignored. | | ExceptionHandler
+| *exchangePattern* (consumer) | Sets the exchange pattern when the consumer
creates an exchange. | | ExchangePattern
| *operation* (producer) | The operation to perform | startProcess | String
-| *processId* (producer) | the id of the process that should be acted upon |
| String
-| *processInstanceId* (producer) | the id of the process instance | | Long
-| *targetUserId* (producer) | The targetUserId used when delegating a task |
| String
-| *task* (producer) | The task instance to use with task operations | | Task
-| *taskId* (producer) | the id of the task | | Long
-| *timeout* (producer) | A timeout value | | Integer
-| *userId* (producer) | userId to use with task operations | | String
-| *value* (producer) | the value to assign to the global identifier | | Object
-| *workItemId* (producer) | the id of the work item | | Long
| *entities* (advanced) | The potentialOwners when nominateTask operation is
performed | | List
| *extraJaxbClasses* (advanced) | To load additional classes when working with
XML | | Class[]
| *parameters* (advanced) | the variables that should be set for various
operations | | Map
@@ -153,23 +264,29 @@
org.infinispan.notifications.cachelistener.event.Event.Type
|CamelJBPMContentId |0 |Long |contentId to use when retrieving attachments
-|CamelJBPMEntityList |null |List<OrganizationalEntity> |The potentialOwners
when nominateTask operation is performed
+|CamelJBPMEntityList |null |List<String> |The potentialOwners when
nominateTask operation is performed
-|CamelJBPMStatusList |null |List<Status> |The list of status to use when
filtering tasks
+|CamelJBPMStatusList |null |List<String> |The list of status to use when
filtering tasks
|=======================================================================
### Example
Below is an example route that starts a business process with id
-project1.integration-test and deploymentId
-org.kie.example:project1:1.0.0-SNAPSHOT
+evaluation. To run this example you need jBPM to run locally, easiest is to
use single zip
+distribution - downloaded from jbpm.org. Next, start it and import Evaluation
sample project, build and deploy.
+Once done this test can be ran out of the box.
[source,java]
----------------------------------------------------------------------------------------------
+Map<String, Object> params = new HashMap<>();
+params.put("employee", "wbadmin");
+params.put("reason", "Camel asks for it");
+
from("direct:start")
- .setHeader(JBPMConstants.PROCESS_ID,
constant("project1.integration-test"))
-
.to("jbpm:http://localhost:8080/business-central?userName=bpmsAdmin&password=pa$word1"
- + "&deploymentId=org.kie.example:project1:1.0.0-SNAPSHOT");
+ .setHeader(JBPMConstants.PROCESS_ID, constant("evaluation"))
+ .setHeader((JBPMConstants.PARAMETERS, params))
+
.to("jbpm:http://localhost:8080/kie-server/services/rest/server?userName=wbadmin&password=wbadmin
+ &deploymentId=evaluation");
----------------------------------------------------------------------------------------------
### See Also
diff --git
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMCamelConsumerAware.java
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMCamelConsumerAware.java
new file mode 100644
index 00000000000..85b4df2a626
--- /dev/null
+++
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMCamelConsumerAware.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm;
+
+/**
+ * Indicates that class implementing this interface should receive (at some
point)
+ * JBPMConsumer instance that is required to operate.
+ */
+public interface JBPMCamelConsumerAware {
+
+ void addConsumer(JBPMConsumer consumer);
+
+ void removeConsumer(JBPMConsumer consumer);
+}
diff --git
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMComponent.java
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMComponent.java
index f562234ed47..ed03b515f17 100644
---
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMComponent.java
+++
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMComponent.java
@@ -26,7 +26,11 @@
protected Endpoint createEndpoint(String uri, String remaining,
Map<String, Object> parameters) throws Exception {
JBPMConfiguration configuration = new JBPMConfiguration();
- configuration.setConnectionURL(new URL(remaining));
+ if (remaining.startsWith("events")) {
+ configuration.setEventListenerType(remaining.split(":")[1]);
+ } else {
+ configuration.setConnectionURL(new URL(remaining));
+ }
setProperties(configuration, parameters);
return new JBPMEndpoint(uri, this, configuration);
}
diff --git
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConfiguration.java
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConfiguration.java
index c4a30c70690..9633aaa765e 100644
---
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConfiguration.java
+++
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConfiguration.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.jbpm;
import java.net.URL;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -24,8 +25,6 @@
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriParams;
import org.apache.camel.spi.UriPath;
-import org.kie.api.task.model.OrganizationalEntity;
-import org.kie.api.task.model.Status;
import org.kie.api.task.model.Task;
@UriParams
@@ -58,7 +57,9 @@
@UriParam
private String userId;
@UriParam
- private String language;
+ private Integer page = 0;
+ @UriParam
+ private Integer pageSize = 10;
@UriParam
private String targetUserId;
@UriParam
@@ -68,9 +69,9 @@
@UriParam
private Task task;
@UriParam(label = "advanced")
- private List<OrganizationalEntity> entities;
+ private List<String> entities;
@UriParam(label = "filter")
- private List<Status> statuses;
+ private List<String> statuses;
@UriParam(label = "security", secret = true)
private String userName;
@UriParam(label = "security", secret = true)
@@ -81,6 +82,11 @@
private Map<String, Object> parameters;
@UriParam(label = "advanced")
private Class[] extraJaxbClasses;
+ @UriParam
+ private Boolean emitterSendItems;
+
+ @UriPath
+ private String eventListenerType;
public String getOperation() {
return operation;
@@ -225,15 +231,26 @@ public void setTask(Task task) {
this.task = task;
}
- public String getLanguage() {
- return language;
+ public Integer getPage() {
+ return page;
+ }
+
+ /**
+ * The page to use when retrieving user tasks
+ */
+ public void setPage(Integer page) {
+ this.page = page;
+ }
+
+ public Integer getPageSize() {
+ return pageSize;
}
/**
- * The language to use when filtering user tasks
+ * The page size to use when retrieving user tasks
*/
- public void setLanguage(String language) {
- this.language = language;
+ public void setPageSize(Integer pageSize) {
+ this.pageSize = pageSize;
}
public String getTargetUserId() {
@@ -269,25 +286,25 @@ public void setContentId(Long contentId) {
this.contentId = contentId;
}
- public List<OrganizationalEntity> getEntities() {
+ public List<String> getEntities() {
return entities;
}
/**
* The potentialOwners when nominateTask operation is performed
*/
- public void setEntities(List<OrganizationalEntity> entities) {
+ public void setEntities(List<String> entities) {
this.entities = entities;
}
- public List<Status> getStatuses() {
+ public List<String> getStatuses() {
return statuses;
}
/**
* The list of status to use when filtering tasks
*/
- public void setStatuses(List<Status> statuses) {
+ public void setStatuses(List<String> statuses) {
this.statuses = statuses;
}
@@ -356,4 +373,32 @@ public void setTimeout(Integer timeout) {
public void setExtraJaxbClasses(Class[] extraJaxbClasses) {
this.extraJaxbClasses = extraJaxbClasses;
}
+
+
+ public String getEventListenerType() {
+ return eventListenerType;
+ }
+
+ /**
+ * Sets the event listener type to attach to
+ */
+ public void setEventListenerType(String eventListenerType) {
+ this.eventListenerType = eventListenerType;
+ }
+
+ public Boolean getEmitterSendItems() {
+ return emitterSendItems;
+ }
+
+ /**
+ * Sets if event produced by emitter should be sent as single items or
complete collection
+ */
+ public void setEmitterSendItems(Boolean emiterSendItems) {
+ this.emitterSendItems = emiterSendItems;
+ }
+
+ @Override
+ public String toString() {
+ return "JBPMConfiguration [connectionURL=" + connectionURL + ",
operation=" + operation + ", deploymentId=" + deploymentId + ",
processInstanceId=" + processInstanceId + ", value=" + value + ", processId=" +
processId + ", eventType=" + eventType + ", event=" + event + ", maxNumber=" +
maxNumber + ", identifier=" + identifier + ", workItemId=" + workItemId + ",
taskId=" + taskId + ", userId=" + userId + ", page=" + page + ", pageSize=" +
pageSize + ", targetUserId=" + targetUserId + ", attachmentId=" + attachmentId
+ ", contentId=" + contentId + ", task=" + task + ", entities=" + entities + ",
statuses=" + statuses + ", userName=" + userName + ", password=" + password +
", timeout=" + timeout + ", parameters=" + parameters + ", extraJaxbClasses=" +
Arrays.toString(extraJaxbClasses) + ", eventListenerType=" + eventListenerType
+ "]";
+ }
}
diff --git
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConstants.java
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConstants.java
index 6b00d02598a..8341cfd619c 100644
---
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConstants.java
+++
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConstants.java
@@ -30,10 +30,17 @@
String TASK_ID = "CamelJBPMTaskId";
String TASK = "CamelJBPMTask";
String USER_ID = "CamelJBPMUserId";
- String TARGET_USER_ID = "CamelJBPMTargetUserId";
- String LANGUAGE = "CamelJBPMLanguage";
+ String TARGET_USER_ID = "CamelJBPMTargetUserId";
String ATTACHMENT_ID = "CamelJBPMAttachmentId";
String CONTENT_ID = "CamelJBPMContentId";
String ENTITY_LIST = "CamelJBPMEntityList";
String STATUS_LIST = "CamelJBPMStatusList";
+ String RESULT_PAGE = "CamelJBPMResultPage";
+ String RESULT_PAGE_SIZE = "CamelJBPMResultPageSize";
+
+
+ String JBPM_PROCESS_EVENT_LISTENER = "process";
+ String JBPM_TASK_EVENT_LISTENER = "task";
+ String JBPM_CASE_EVENT_LISTENER = "case";
+ String JBPM_EVENT_EMITTER = "emitter";
}
diff --git
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java
new file mode 100644
index 00000000000..6c5c6f90647
--- /dev/null
+++
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.component.jbpm.emitters.CamelEventEmitter;
+import org.apache.camel.component.jbpm.listeners.CamelCaseEventListener;
+import org.apache.camel.component.jbpm.listeners.CamelProcessEventListener;
+import org.apache.camel.component.jbpm.listeners.CamelTaskEventListener;
+import org.apache.camel.impl.DefaultConsumer;
+import org.jbpm.services.api.DeploymentEvent;
+import org.jbpm.services.api.DeploymentEventListener;
+import org.jbpm.services.api.DeploymentService;
+import org.jbpm.services.api.ListenerSupport;
+import org.jbpm.services.api.model.DeployedUnit;
+import org.jbpm.services.api.service.ServiceRegistry;
+import org.kie.internal.runtime.manager.CacheManager;
+import org.kie.internal.runtime.manager.InternalRuntimeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JBPMConsumer extends DefaultConsumer implements
DeploymentEventListener {
+
+ private static final transient Logger LOGGER =
LoggerFactory.getLogger(JBPMConsumer.class);
+
+ private JBPMEndpoint endpoint;
+ private JBPMConfiguration configuration;
+
+ public JBPMConsumer(Endpoint endpoint, Processor processor) {
+ super(endpoint, processor);
+
+ this.endpoint = (JBPMEndpoint) endpoint;
+ this.configuration = ((JBPMEndpoint) getEndpoint()).getConfiguration();
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+
+ DeploymentService deploymentService = (DeploymentService)
ServiceRegistry.get().service(ServiceRegistry.DEPLOYMENT_SERVICE);
+
+ if (configuration.getDeploymentId() != null) {
+ InternalRuntimeManager manager = (InternalRuntimeManager)
deploymentService.getRuntimeManager(configuration.getDeploymentId());
+ configure(manager, this);
+
+ LOGGER.debug("JBPM Camel Consumer configured and started for
deployment id {}", configuration.getDeploymentId());
+ } else {
+
+ ((ListenerSupport) deploymentService).addListener(this);
+
+ for (DeployedUnit deployed : deploymentService.getDeployedUnits())
{
+ InternalRuntimeManager manager = (InternalRuntimeManager)
deployed.getRuntimeManager();
+ configure(manager, this);
+ }
+
+ LOGGER.debug("JBPM Camel Consumer configured and started on all
available deployments");
+ }
+
+
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ DeploymentService deploymentService = (DeploymentService)
ServiceRegistry.get().service(ServiceRegistry.DEPLOYMENT_SERVICE);
+ if (configuration.getDeploymentId() != null) {
+ LOGGER.debug("JBPM Camel Consumer unconfigured and stopped for
deployment id {}", configuration.getDeploymentId());
+ } else {
+ ((ListenerSupport) deploymentService).removeListener(this);
+
+ LOGGER.debug("JBPM Camel Consumer unconfigured and stopped on all
available deployments");
+ }
+
+ if
(JBPMConstants.JBPM_EVENT_EMITTER.equals(configuration.getEventListenerType()))
{
+ ServiceRegistry.get().remove("CamelEventEmitter");
+ }
+
+ }
+
+ public void sendMessage(String eventType, Object body) {
+ Exchange exchange =
getEndpoint().createExchange(ExchangePattern.InOnly);
+ exchange.getIn().setHeader("EventType", eventType);
+
+ exchange.getIn().setBody(body);
+
+ if (!endpoint.isSynchronous()) {
+ getAsyncProcessor().process(exchange, new AsyncCallback() {
+ @Override
+ public void done(boolean doneSync) {
+ // handle any thrown exception
+ if (exchange.getException() != null) {
+ getExceptionHandler().handleException("Error
processing exchange", exchange, exchange.getException());
+ }
+ }
+ });
+ } else {
+ try {
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+
+ // handle any thrown exception
+ if (exchange.getException() != null) {
+ getExceptionHandler().handleException("Error processing
exchange", exchange, exchange.getException());
+ }
+ }
+ }
+
+ @Override
+ public void onDeploy(DeploymentEvent event) {
+ InternalRuntimeManager manager = (InternalRuntimeManager)
event.getDeployedUnit().getRuntimeManager();
+ configure(manager, this);
+
+ }
+
+ @Override
+ public void onUnDeploy(DeploymentEvent event) {
+ // no-op
+ }
+
+ @Override
+ public void onActivate(DeploymentEvent event) {
+ // no-op
+
+ }
+
+ @Override
+ public void onDeactivate(DeploymentEvent event) {
+ // no-op
+
+ }
+
+
+ protected void configure(InternalRuntimeManager manager, JBPMConsumer
consumer) {
+ String eventListenerType = configuration.getEventListenerType();
+ if (eventListenerType == null) {
+ return;
+ }
+
+
+ configureConsumer(eventListenerType, manager, consumer);
+
+ }
+
+ protected void configureConsumer(String eventListenerType,
InternalRuntimeManager manager, JBPMConsumer consumer) {
+ LOGGER.debug("Configuring Camel JBPM Consumer for {} on runtime
manager {}", eventListenerType, manager);
+
+ CacheManager cacheManager = manager.getCacheManager();
+ JBPMCamelConsumerAware consumerAware = null;
+ if
(JBPMConstants.JBPM_PROCESS_EVENT_LISTENER.equals(eventListenerType)) {
+ consumerAware = (JBPMCamelConsumerAware) cacheManager.get("new
org.apache.camel.component.jbpm.listeners.CamelProcessEventListener()");
+ if (consumerAware == null) {
+ consumerAware = new CamelProcessEventListener();
+ cacheManager.add("new
org.apache.camel.component.jbpm.listeners.CamelProcessEventListener()",
consumerAware);
+ }
+ LOGGER.debug("Configuring JBPMConsumer on process event listener
{}", consumerAware);
+ } else if
(JBPMConstants.JBPM_TASK_EVENT_LISTENER.equals(eventListenerType)) {
+ consumerAware = (JBPMCamelConsumerAware) cacheManager.get("new
org.apache.camel.component.jbpm.listeners.CamelTaskEventListener()");
+ if (consumerAware == null) {
+ consumerAware = new CamelTaskEventListener();
+ cacheManager.add("new
org.apache.camel.component.jbpm.listeners.CamelTaskEventListener()",
consumerAware);
+ }
+ LOGGER.debug("Configuring JBPMConsumer on task event listener {}",
consumerAware);
+ } else if
(JBPMConstants.JBPM_CASE_EVENT_LISTENER.equals(eventListenerType)) {
+ consumerAware = (JBPMCamelConsumerAware) cacheManager.get("new
org.apache.camel.component.jbpm.listeners.CamelCaseEventListener()");
+ if (consumerAware == null) {
+ consumerAware = new CamelCaseEventListener();
+ cacheManager.add("new
org.apache.camel.component.jbpm.listeners.CamelCaseEventListener()",
consumerAware);
+ }
+ LOGGER.debug("Configuring JBPMConsumer on case event listener {}",
consumerAware);
+ } else if (JBPMConstants.JBPM_EVENT_EMITTER.equals(eventListenerType))
{
+ LOGGER.debug("Configuring JBPMConsumer for event emitter");
+ ServiceRegistry.get().register("CamelEventEmitter", new
CamelEventEmitter(this, configuration.getEmitterSendItems()));
+
+ return;
+ }
+
+ LOGGER.debug("Adding consumer {} on {}", consumer, consumerAware);
+ consumerAware.addConsumer(consumer);
+
+ }
+
+ @Override
+ public String toString() {
+ return "JBPMConsumer [endpoint=" + endpoint + ", configuration=" +
configuration + "]";
+ }
+}
diff --git
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMEndpoint.java
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMEndpoint.java
index 1a5d5e5e0d6..eb0472f94c6 100644
---
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMEndpoint.java
+++
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMEndpoint.java
@@ -18,6 +18,9 @@
import java.net.MalformedURLException;
import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
@@ -25,16 +28,16 @@
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
-import org.kie.api.runtime.manager.RuntimeEngine;
-import org.kie.remote.client.api.RemoteRestRuntimeEngineBuilder;
-import org.kie.services.client.api.RemoteRuntimeEngineFactory;
+import org.kie.server.client.KieServicesClient;
+import org.kie.server.client.KieServicesConfiguration;
+import org.kie.server.client.KieServicesFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The jbpm component provides integration with jBPM (Business Process
Management).
*/
-@UriEndpoint(firstVersion = "2.6.0", scheme = "jbpm", title = "JBPM", syntax =
"jbpm:connectionURL", producerOnly = true, label = "process")
+@UriEndpoint(firstVersion = "2.6.0", scheme = "jbpm", title = "JBPM", syntax =
"jbpm:connectionURL", label = "process")
public class JBPMEndpoint extends DefaultEndpoint {
private static final transient Logger LOGGER =
LoggerFactory.getLogger(JBPMEndpoint.class);
@@ -47,35 +50,24 @@ public JBPMEndpoint(String uri, JBPMComponent component,
JBPMConfiguration confi
}
public Producer createProducer() throws Exception {
- RemoteRestRuntimeEngineBuilder engineBuilder =
RemoteRuntimeEngineFactory.newRestBuilder();
- if (configuration.getUserName() != null) {
- engineBuilder.addUserName(configuration.getUserName());
- }
- if (configuration.getPassword() != null) {
- engineBuilder.addPassword(configuration.getPassword());
- }
- if (configuration.getDeploymentId() != null) {
- engineBuilder.addDeploymentId(configuration.getDeploymentId());
- }
- if (configuration.getConnectionURL() != null) {
- engineBuilder.addUrl(configuration.getConnectionURL());
- }
- if (configuration.getProcessInstanceId() != null) {
-
engineBuilder.addProcessInstanceId(configuration.getProcessInstanceId());
- }
+ KieServicesConfiguration kieConfiguration =
KieServicesFactory.newRestConfiguration(configuration.getConnectionURL().toExternalForm(),
configuration.getUserName(), configuration.getPassword());
+
if (configuration.getTimeout() != null) {
- engineBuilder.addTimeout(configuration.getTimeout());
+ kieConfiguration.setTimeout(configuration.getTimeout());
}
if (configuration.getExtraJaxbClasses() != null) {
-
engineBuilder.addExtraJaxbClasses(configuration.getExtraJaxbClasses());
+ List<Class<?>> classes =
Arrays.asList(configuration.getExtraJaxbClasses());
+ kieConfiguration.addExtraClasses(new LinkedHashSet<>(classes));
}
- RuntimeEngine runtimeEngine = engineBuilder.build();
-
- return new JBPMProducer(this, runtimeEngine);
+
+ KieServicesClient kieServerClient =
KieServicesFactory.newKieServicesClient(kieConfiguration);
+ LOGGER.debug("JBPM Producer created with KieServerClient configured
for {}", configuration.getConnectionURL());
+ return new JBPMProducer(this, kieServerClient);
}
public Consumer createConsumer(Processor processor) throws Exception {
- throw new UnsupportedOperationException("Consumer not supported for "
+ getClass().getSimpleName() + " endpoint");
+ LOGGER.debug("JBPM Consumer created and configured for deployment {}",
configuration.getDeploymentId());
+ return new JBPMConsumer(this, processor);
}
public boolean isSingleton() {
diff --git
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMProducer.java
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMProducer.java
index 8fc9fc4fdb4..c9f218b3e94 100644
---
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMProducer.java
+++
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMProducer.java
@@ -16,7 +16,9 @@
*/
package org.apache.camel.component.jbpm;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -24,55 +26,54 @@
import org.apache.camel.Message;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.ExchangeHelper;
-import org.kie.api.runtime.KieSession;
-import org.kie.api.runtime.manager.RuntimeEngine;
-import org.kie.api.runtime.process.ProcessInstance;
-import org.kie.api.task.TaskService;
-import org.kie.api.task.model.Attachment;
-import org.kie.api.task.model.Content;
-import org.kie.api.task.model.OrganizationalEntity;
-import org.kie.api.task.model.Status;
+import org.kie.api.KieServices;
+import org.kie.api.command.BatchExecutionCommand;
+import org.kie.api.command.Command;
+import org.kie.api.command.KieCommands;
+import org.kie.api.runtime.ExecutionResults;
import org.kie.api.task.model.Task;
-import org.kie.api.task.model.TaskSummary;
+import org.kie.server.api.model.ServiceResponse;
+import org.kie.server.api.model.instance.ProcessInstance;
+import org.kie.server.api.model.instance.TaskAttachment;
+import org.kie.server.api.model.instance.TaskInstance;
+import org.kie.server.api.model.instance.TaskSummary;
+import org.kie.server.client.KieServicesClient;
+import org.kie.server.client.ProcessServicesClient;
+import org.kie.server.client.QueryServicesClient;
+import org.kie.server.client.RuleServicesClient;
+import org.kie.server.client.UserTaskServicesClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JBPMProducer extends DefaultProducer {
private static final transient Logger LOGGER =
LoggerFactory.getLogger(JBPMProducer.class);
- private KieSession kieSession;
- private TaskService taskService;
+
+ private static KieCommands commandsFactory =
KieServices.get().getCommands();
+
private JBPMConfiguration configuration;
- private RuntimeEngine runtimeEngine;
+ private KieServicesClient kieServicesClient;
+
- public JBPMProducer(JBPMEndpoint endpoint, RuntimeEngine runtimeEngine) {
+ public JBPMProducer(JBPMEndpoint endpoint, KieServicesClient
kieServicesClient) {
super(endpoint);
this.configuration = endpoint.getConfiguration();
- this.runtimeEngine = runtimeEngine;
+ this.kieServicesClient = kieServicesClient;
}
@Override
protected void doStart() throws Exception {
LOGGER.trace("starting producer");
- kieSession = runtimeEngine.getKieSession();
- taskService = runtimeEngine.getTaskService();
super.doStart();
LOGGER.trace("started producer");
}
@Override
protected void doStop() throws Exception {
- super.doStop();
- if (kieSession != null) {
- kieSession = null;
- }
-
- if (taskService != null) {
- taskService = null;
- }
+ super.doStop();
}
public void process(Exchange exchange) throws Exception {
- getOperation(exchange).execute(kieSession, taskService, configuration,
exchange);
+ getOperation(exchange).execute(kieServicesClient, configuration,
exchange);
}
Operation getOperation(Exchange exchange) {
@@ -92,35 +93,40 @@ Operation getOperation(Exchange exchange) {
//PROCESS OPERATIONS
startProcess {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- ProcessInstance processInstance =
kieSession.startProcess(getProcessId(configuration, exchange),
getParameters(configuration, exchange));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ ProcessServicesClient processClient =
kieServicesClient.getServicesClient(ProcessServicesClient.class);
+ Long processInstance =
processClient.startProcess(configuration.getDeploymentId(),
getProcessId(configuration, exchange), getParameters(configuration, exchange));
setResult(exchange, processInstance);
}
}, abortProcessInstance {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
-
kieSession.abortProcessInstance(safe(getProcessInstanceId(configuration,
exchange)));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ ProcessServicesClient processClient =
kieServicesClient.getServicesClient(ProcessServicesClient.class);
+
processClient.abortProcessInstance(configuration.getDeploymentId(),
safe(getProcessInstanceId(configuration, exchange)));
}
}, signalEvent {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ ProcessServicesClient processClient =
kieServicesClient.getServicesClient(ProcessServicesClient.class);
Long processInstanceId = getProcessInstanceId(configuration,
exchange);
if (processInstanceId != null) {
- kieSession.signalEvent(getEventType(configuration,
exchange), getEvent(configuration, exchange), processInstanceId);
+
processClient.signalProcessInstance(configuration.getDeploymentId(),
processInstanceId, getEventType(configuration, exchange),
getEvent(configuration, exchange));
} else {
- kieSession.signalEvent(getEventType(configuration,
exchange), getEvent(configuration, exchange));
+ processClient.signal(configuration.getDeploymentId(),
getEventType(configuration, exchange), getEvent(configuration, exchange));
}
}
}, getProcessInstance {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- ProcessInstance processInstance =
kieSession.getProcessInstance(safe(getProcessInstanceId(configuration,
exchange)));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ ProcessServicesClient processClient =
kieServicesClient.getServicesClient(ProcessServicesClient.class);
+ ProcessInstance processInstance =
processClient.getProcessInstance(configuration.getDeploymentId(),
safe(getProcessInstanceId(configuration, exchange)));
setResult(exchange, processInstance);
}
}, getProcessInstances {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- Collection<ProcessInstance> processInstances =
kieSession.getProcessInstances();
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ QueryServicesClient queryClient =
kieServicesClient.getServicesClient(QueryServicesClient.class);
+ Collection<ProcessInstance> processInstances =
queryClient.findProcessInstances(getPage(configuration, exchange),
getPageSize(configuration, exchange));
setResult(exchange, processInstances);
}
},
@@ -128,198 +134,217 @@ void execute(KieSession kieSession, TaskService
taskService, JBPMConfiguration c
//RULE OPERATIONS
fireAllRules {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ RuleServicesClient ruleClient =
kieServicesClient.getServicesClient(RuleServicesClient.class);
+ List<Command<?>> commands = new ArrayList<Command<?>>();
+ BatchExecutionCommand executionCommand =
commandsFactory.newBatchExecution(commands);
+
Integer max = getMaxNumber(configuration, exchange);
- int rulesFired;
if (max != null) {
- rulesFired = kieSession.fireAllRules(max);
+ commands.add(commandsFactory.newFireAllRules(max));
} else {
- rulesFired = kieSession.fireAllRules();
+ commands.add(commandsFactory.newFireAllRules());
}
- setResult(exchange, rulesFired);
- }
- }, getFactCount {
- @Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- long factCount = kieSession.getFactCount();
- setResult(exchange, factCount);
+ ServiceResponse<ExecutionResults> reply =
ruleClient.executeCommandsWithResults(configuration.getDeploymentId(),
executionCommand);
+ setResult(exchange, reply.getResult());
}
}, getGlobal {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- Object global =
kieSession.getGlobal(getIdentifier(configuration, exchange));
- setResult(exchange, global);
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ RuleServicesClient ruleClient =
kieServicesClient.getServicesClient(RuleServicesClient.class);
+ List<Command<?>> commands = new ArrayList<Command<?>>();
+ BatchExecutionCommand executionCommand =
commandsFactory.newBatchExecution(commands);
+ String identifier = getIdentifier(configuration, exchange);
+ commands.add(commandsFactory.newGetGlobal(identifier,
identifier));
+
+ ServiceResponse<ExecutionResults> reply =
ruleClient.executeCommandsWithResults(configuration.getDeploymentId(),
executionCommand);
+ setResult(exchange, reply.getResult().getValue(identifier));
}
}, setGlobal {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- kieSession.setGlobal(getIdentifier(configuration, exchange),
getValue(configuration, exchange));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ RuleServicesClient ruleClient =
kieServicesClient.getServicesClient(RuleServicesClient.class);
+ List<Command<?>> commands = new ArrayList<Command<?>>();
+ BatchExecutionCommand executionCommand =
commandsFactory.newBatchExecution(commands);
+
+
commands.add(commandsFactory.newSetGlobal(getIdentifier(configuration,
exchange), getValue(configuration, exchange)));
+
+
ruleClient.executeCommandsWithResults(configuration.getDeploymentId(),
executionCommand);
}
},
//WORK ITEM OPERATIONS
abortWorkItem {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
-
kieSession.getWorkItemManager().abortWorkItem(safe(getWorkItemId(configuration,
exchange)));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ ProcessServicesClient processClient =
kieServicesClient.getServicesClient(ProcessServicesClient.class);
+ processClient.abortWorkItem(configuration.getDeploymentId(),
safe(getProcessInstanceId(configuration, exchange)),
safe(getWorkItemId(configuration, exchange)));
}
}, completeWorkItem {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
-
kieSession.getWorkItemManager().completeWorkItem(safe(getWorkItemId(configuration,
exchange)), getParameters(configuration, exchange));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ ProcessServicesClient processClient =
kieServicesClient.getServicesClient(ProcessServicesClient.class);
+
processClient.completeWorkItem(configuration.getDeploymentId(),
safe(getProcessInstanceId(configuration, exchange)),
safe(getWorkItemId(configuration, exchange)), getParameters(configuration,
exchange));
}
},
//TASK OPERATIONS
activateTask {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- taskService.activate(safe(getTaskId(configuration, exchange)),
getUserId(configuration, exchange));
- }
- }, addTask {
- @Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- long taskId = taskService.addTask(getTask(configuration,
exchange), getParameters(configuration, exchange));
- setResult(exchange, taskId);
- }
- }, claimNextAvailableTask {
- @Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- taskService.claimNextAvailable(getUserId(configuration,
exchange), getLanguage(configuration, exchange));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ UserTaskServicesClient taskClient =
kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+ taskClient.activateTask(configuration.getDeploymentId(),
safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
}
}, claimTask {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- taskService.claim(safe(getTaskId(configuration, exchange)),
getUserId(configuration, exchange));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ UserTaskServicesClient taskClient =
kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+ taskClient.claimTask(configuration.getDeploymentId(),
safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
}
}, completeTask {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- taskService.complete(safe(getTaskId(configuration, exchange)),
getUserId(configuration, exchange), getParameters(configuration, exchange));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ UserTaskServicesClient taskClient =
kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+
taskClient.completeAutoProgress(configuration.getDeploymentId(),
safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange),
getParameters(configuration, exchange));
}
}, delegateTask {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- taskService.delegate(safe(getTaskId(configuration, exchange)),
getUserId(configuration, exchange), getTargetUserId(configuration, exchange));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ UserTaskServicesClient taskClient =
kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+ taskClient.delegateTask(configuration.getDeploymentId(),
safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange),
getTargetUserId(configuration, exchange));
}
}, exitTask {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- taskService.exit(safe(getTaskId(configuration, exchange)),
getUserId(configuration, exchange));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ UserTaskServicesClient taskClient =
kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+ taskClient.exitTask(configuration.getDeploymentId(),
safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
}
}, failTask {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- taskService.fail(safe(getTaskId(configuration, exchange)),
getUserId(configuration, exchange), getParameters(configuration, exchange));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ UserTaskServicesClient taskClient =
kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+ taskClient.failTask(configuration.getDeploymentId(),
safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange),
getParameters(configuration, exchange));
}
}, getAttachment {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- Attachment attachment =
taskService.getAttachmentById(safe(getAttachmentId(configuration, exchange)));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ UserTaskServicesClient taskClient =
kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+ TaskAttachment attachment =
taskClient.getTaskAttachmentById(configuration.getDeploymentId(),
safe(getTaskId(configuration, exchange)), safe(getAttachmentId(configuration,
exchange)));
setResult(exchange, attachment);
}
- }, getContent {
- @Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- Content content =
taskService.getContentById(safe(getContentId(configuration, exchange)));
- setResult(exchange, content);
- }
}, getTasksAssignedAsBusinessAdministrator {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- List<TaskSummary> taskSummaries =
taskService.getTasksAssignedAsBusinessAdministrator(getUserId(configuration,
exchange), getLanguage(configuration, exchange));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ UserTaskServicesClient taskClient =
kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+ List<TaskSummary> taskSummaries =
taskClient.findTasksAssignedAsBusinessAdministrator(getUserId(configuration,
exchange), getPage(configuration, exchange), getPageSize(configuration,
exchange));
setResult(exchange, taskSummaries);
}
}, getTasksAssignedAsPotentialOwnerByStatus {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
-
taskService.getTasksAssignedAsPotentialOwnerByStatus(getUserId(configuration,
exchange), getStatuses(configuration, exchange), getLanguage(configuration,
exchange));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ UserTaskServicesClient taskClient =
kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+ List<TaskSummary> taskSummaries =
taskClient.findTasksAssignedAsPotentialOwner(getUserId(configuration,
exchange), getStatuses(configuration, exchange), getPage(configuration,
exchange), getPageSize(configuration, exchange));
+ setResult(exchange, taskSummaries);
}
}, getTaskByWorkItem {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- Task task =
taskService.getTaskByWorkItemId(safe(getWorkItemId(configuration, exchange)));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ UserTaskServicesClient taskClient =
kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+ TaskInstance task =
taskClient.findTaskByWorkItemId(safe(getWorkItemId(configuration, exchange)));
setResult(exchange, task);
}
}, getTaskBy {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- Task task =
taskService.getTaskById(safe(getTaskId(configuration, exchange)));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ UserTaskServicesClient taskClient =
kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+ TaskInstance task =
taskClient.findTaskById(safe(getTaskId(configuration, exchange)));
setResult(exchange, task);
}
}, getTaskContent {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- Map<String, Object> taskContent =
taskService.getTaskContent(safe(getTaskId(configuration, exchange)));
- setResult(exchange, taskContent);
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ UserTaskServicesClient taskClient =
kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+ Map<String, Object> content =
taskClient.getTaskOutputContentByTaskId(configuration.getDeploymentId(),
safe(getTaskId(configuration, exchange)));
+ setResult(exchange, content);
}
}, getTasksByProcessInstance {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- List<Long> processInstanceIds =
taskService.getTasksByProcessInstanceId(safe(getProcessInstanceId(configuration,
exchange)));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ UserTaskServicesClient taskClient =
kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+ List<TaskSummary> processInstanceIds =
taskClient.findTasksByStatusByProcessInstanceId(safe(getProcessInstanceId(configuration,
exchange)), Collections.emptyList(),
+ getPage(configuration, exchange),
getPageSize(configuration, exchange));
setResult(exchange, processInstanceIds);
}
}, getTasksByStatusByProcessInstance {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- List<TaskSummary> taskSummaryList =
taskService.getTasksByStatusByProcessInstanceId(
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ UserTaskServicesClient taskClient =
kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+ List<TaskSummary> taskSummaryList =
taskClient.findTasksByStatusByProcessInstanceId(
safe(getProcessInstanceId(configuration, exchange)),
getStatuses(configuration, exchange),
- getLanguage(configuration, exchange));
+ getPage(configuration, exchange),
getPageSize(configuration, exchange));
setResult(exchange, taskSummaryList);
}
}, getTasksOwned {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- List<TaskSummary> summaryList =
taskService.getTasksOwned(getUserId(configuration, exchange),
getLanguage(configuration, exchange));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ UserTaskServicesClient taskClient =
kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+ List<TaskSummary> summaryList =
taskClient.findTasksOwned(getUserId(configuration, exchange),
getPage(configuration, exchange), getPageSize(configuration, exchange));
setResult(exchange, summaryList);
}
}, nominateTask {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- taskService.nominate(safe(getTaskId(configuration, exchange)),
getUserId(configuration, exchange), getEntities(configuration, exchange));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ UserTaskServicesClient taskClient =
kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+ taskClient.nominateTask(configuration.getDeploymentId(),
safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange),
getEntities(configuration, exchange));
}
}, releaseTask {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- taskService.release(safe(getTaskId(configuration, exchange)),
getUserId(configuration, exchange));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ UserTaskServicesClient taskClient =
kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+ taskClient.releaseTask(configuration.getDeploymentId(),
safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
}
}, resumeTask {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- taskService.resume(safe(getTaskId(configuration, exchange)),
getUserId(configuration, exchange));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ UserTaskServicesClient taskClient =
kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+ taskClient.resumeTask(configuration.getDeploymentId(),
safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
}
}, skipTask {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- taskService.skip(safe(getTaskId(configuration, exchange)),
getUserId(configuration, exchange));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ UserTaskServicesClient taskClient =
kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+ taskClient.skipTask(configuration.getDeploymentId(),
safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
}
}, startTask {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- taskService.start(safe(getTaskId(configuration, exchange)),
getUserId(configuration, exchange));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ UserTaskServicesClient taskClient =
kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+ taskClient.startTask(configuration.getDeploymentId(),
safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
}
}, stopTask {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- taskService.stop(safe(getTaskId(configuration, exchange)),
getUserId(configuration, exchange));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ UserTaskServicesClient taskClient =
kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+ taskClient.stopTask(configuration.getDeploymentId(),
safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
}
}, suspendTask {
@Override
- void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange) {
- taskService.suspend(safe(getTaskId(configuration, exchange)),
getUserId(configuration, exchange));
+ void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange) {
+ UserTaskServicesClient taskClient =
kieServicesClient.getServicesClient(UserTaskServicesClient.class);
+ taskClient.suspendTask(configuration.getDeploymentId(),
safe(getTaskId(configuration, exchange)), getUserId(configuration, exchange));
}
};
- List<Status> getStatuses(JBPMConfiguration configuration, Exchange
exchange) {
- List<Status> statusList =
exchange.getIn().getHeader(JBPMConstants.STATUS_LIST, List.class);
+ List<String> getStatuses(JBPMConfiguration configuration, Exchange
exchange) {
+ List<String> statusList =
exchange.getIn().getHeader(JBPMConstants.STATUS_LIST, List.class);
if (statusList == null) {
statusList = configuration.getStatuses();
}
return statusList;
}
- List<OrganizationalEntity> getEntities(JBPMConfiguration
configuration, Exchange exchange) {
- List<OrganizationalEntity> entityList =
exchange.getIn().getHeader(JBPMConstants.ENTITY_LIST, List.class);
+ List<String> getEntities(JBPMConfiguration configuration, Exchange
exchange) {
+ List<String> entityList =
exchange.getIn().getHeader(JBPMConstants.ENTITY_LIST, List.class);
if (entityList == null) {
entityList = configuration.getEntities();
}
@@ -350,12 +375,20 @@ String getTargetUserId(JBPMConfiguration configuration,
Exchange exchange) {
return userId;
}
- String getLanguage(JBPMConfiguration configuration, Exchange exchange)
{
- String language =
exchange.getIn().getHeader(JBPMConstants.LANGUAGE, String.class);
- if (language == null) {
- language = configuration.getLanguage();
+ Integer getPage(JBPMConfiguration configuration, Exchange exchange) {
+ Integer page =
exchange.getIn().getHeader(JBPMConstants.RESULT_PAGE, Integer.class);
+ if (page == null) {
+ page = configuration.getPage();
+ }
+ return page;
+ }
+
+ Integer getPageSize(JBPMConfiguration configuration, Exchange
exchange) {
+ Integer pageSize =
exchange.getIn().getHeader(JBPMConstants.RESULT_PAGE_SIZE, Integer.class);
+ if (pageSize == null) {
+ pageSize = configuration.getPageSize();
}
- return language;
+ return pageSize;
}
Task getTask(JBPMConfiguration configuration, Exchange exchange) {
@@ -466,7 +499,7 @@ void setResult(Exchange exchange, Object result) {
getResultMessage(exchange).setBody(result);
}
- abstract void execute(KieSession kieSession, TaskService taskService,
JBPMConfiguration configuration, Exchange exchange);
+ abstract void execute(KieServicesClient kieServicesClient,
JBPMConfiguration configuration, Exchange exchange);
}
}
diff --git
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/emitters/CamelEventEmitter.java
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/emitters/CamelEventEmitter.java
new file mode 100644
index 00000000000..0886466f0f8
--- /dev/null
+++
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/emitters/CamelEventEmitter.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.emitters;
+
+import java.util.Collection;
+
+import org.apache.camel.component.jbpm.JBPMConsumer;
+import org.jbpm.persistence.api.integration.EventCollection;
+import org.jbpm.persistence.api.integration.EventEmitter;
+import org.jbpm.persistence.api.integration.InstanceView;
+import org.jbpm.persistence.api.integration.base.BaseEventCollection;
+
+public class CamelEventEmitter implements EventEmitter {
+
+ private JBPMConsumer consumer;
+ private boolean sendItems;
+
+ public CamelEventEmitter(JBPMConsumer consumer, boolean sendItems) {
+ this.consumer = consumer;
+ this.sendItems = sendItems;
+ }
+
+ @Override
+ public void deliver(Collection<InstanceView<?>> data) {
+ // no-op
+
+ }
+
+ @Override
+ public void apply(Collection<InstanceView<?>> data) {
+ if (consumer == null || data.isEmpty()) {
+ return;
+ }
+
+ if (sendItems) {
+
+ data.forEach(item -> consumer.sendMessage("Emitter", item));
+ } else {
+
+ consumer.sendMessage("Emitter", data);
+ }
+ }
+
+ @Override
+ public void drop(Collection<InstanceView<?>> data) {
+ // no-op
+
+ }
+
+ @Override
+ public EventCollection newCollection() {
+ return new BaseEventCollection();
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+}
diff --git
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/emitters/ServiceRegistryBoundEventEmitter.java
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/emitters/ServiceRegistryBoundEventEmitter.java
new file mode 100644
index 00000000000..c531aa783d7
--- /dev/null
+++
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/emitters/ServiceRegistryBoundEventEmitter.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.emitters;
+
+import java.util.Collection;
+
+import org.jbpm.persistence.api.integration.EventCollection;
+import org.jbpm.persistence.api.integration.EventEmitter;
+import org.jbpm.persistence.api.integration.InstanceView;
+import org.jbpm.services.api.service.ServiceRegistry;
+
+public class ServiceRegistryBoundEventEmitter implements EventEmitter {
+
+ private EventEmitter delegate;
+
+ public ServiceRegistryBoundEventEmitter() {
+ this.delegate = (EventEmitter)
ServiceRegistry.get().service("CamelEventEmitter");
+ }
+
+ @Override
+ public void deliver(Collection<InstanceView<?>> data) {
+ delegate.deliver(data);
+
+ }
+
+ @Override
+ public void apply(Collection<InstanceView<?>> data) {
+ delegate.apply(data);
+ }
+
+ @Override
+ public void drop(Collection<InstanceView<?>> data) {
+ delegate.drop(data);
+
+ }
+
+ @Override
+ public EventCollection newCollection() {
+ return delegate.newCollection();
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+}
diff --git
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/listeners/CamelCaseEventListener.java
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/listeners/CamelCaseEventListener.java
new file mode 100644
index 00000000000..4906b5b9b87
--- /dev/null
+++
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/listeners/CamelCaseEventListener.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.listeners;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import org.apache.camel.component.jbpm.JBPMCamelConsumerAware;
+import org.apache.camel.component.jbpm.JBPMConsumer;
+import org.jbpm.casemgmt.api.event.CaseCancelEvent;
+import org.jbpm.casemgmt.api.event.CaseCloseEvent;
+import org.jbpm.casemgmt.api.event.CaseCommentEvent;
+import org.jbpm.casemgmt.api.event.CaseDataEvent;
+import org.jbpm.casemgmt.api.event.CaseDestroyEvent;
+import org.jbpm.casemgmt.api.event.CaseDynamicSubprocessEvent;
+import org.jbpm.casemgmt.api.event.CaseDynamicTaskEvent;
+import org.jbpm.casemgmt.api.event.CaseEventListener;
+import org.jbpm.casemgmt.api.event.CaseReopenEvent;
+import org.jbpm.casemgmt.api.event.CaseRoleAssignmentEvent;
+import org.jbpm.casemgmt.api.event.CaseStartEvent;
+import org.kie.internal.runtime.Cacheable;
+
+
+public class CamelCaseEventListener implements CaseEventListener, Cacheable,
JBPMCamelConsumerAware {
+
+ private Set<JBPMConsumer> consumers = new LinkedHashSet<>();
+
+ @Override
+ public void beforeCaseStarted(CaseStartEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeCaseStarted", event);
+ }
+
+ @Override
+ public void afterCaseStarted(CaseStartEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterCaseStarted", event);
+ }
+
+ @Override
+ public void beforeCaseClosed(CaseCloseEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeCaseClosed", event);
+ }
+
+ @Override
+ public void afterCaseClosed(CaseCloseEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterCaseClosed", event);
+ }
+
+ @Override
+ public void beforeCaseCancelled(CaseCancelEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeCaseCancelled", event);
+ }
+
+ @Override
+ public void afterCaseCancelled(CaseCancelEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterCaseCancelled", event);
+ }
+
+ @Override
+ public void beforeCaseDestroyed(CaseDestroyEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeCaseDestroyed", event);
+ }
+
+ @Override
+ public void afterCaseDestroyed(CaseDestroyEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterCaseDestroyed", event);
+ }
+
+ @Override
+ public void beforeCaseReopen(CaseReopenEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeCaseReopen", event);
+ }
+
+ @Override
+ public void afterCaseReopen(CaseReopenEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterCaseReopen", event);
+ }
+
+ @Override
+ public void beforeCaseCommentAdded(CaseCommentEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeCaseCommentAdded", event);
+ }
+
+ @Override
+ public void afterCaseCommentAdded(CaseCommentEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterCaseCommentAdded", event);
+ }
+
+ @Override
+ public void beforeCaseCommentUpdated(CaseCommentEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeCaseCommentUpdated", event);
+ }
+
+ @Override
+ public void afterCaseCommentUpdated(CaseCommentEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterCaseCommentUpdated", event);
+ }
+
+ @Override
+ public void beforeCaseCommentRemoved(CaseCommentEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeCaseCommentRemoved", event);
+ }
+
+ @Override
+ public void afterCaseCommentRemoved(CaseCommentEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterCaseCommentRemoved", event);
+ }
+
+ @Override
+ public void beforeCaseRoleAssignmentAdded(CaseRoleAssignmentEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeCaseRoleAssignmentAdded", event);
+ }
+
+ @Override
+ public void afterCaseRoleAssignmentAdded(CaseRoleAssignmentEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterCaseRoleAssignmentAdded", event);
+ }
+
+ @Override
+ public void beforeCaseRoleAssignmentRemoved(CaseRoleAssignmentEvent event)
{
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeCaseRoleAssignmentRemoved", event);
+ }
+
+ @Override
+ public void afterCaseRoleAssignmentRemoved(CaseRoleAssignmentEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterCaseRoleAssignmentRemoved", event);
+ }
+
+ @Override
+ public void beforeCaseDataAdded(CaseDataEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeCaseDataAdded", event);
+ }
+
+ @Override
+ public void afterCaseDataAdded(CaseDataEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterCaseDataAdded", event);
+ }
+
+ @Override
+ public void beforeCaseDataRemoved(CaseDataEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeCaseDataRemoved", event);
+ }
+
+ @Override
+ public void afterCaseDataRemoved(CaseDataEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterCaseDataRemoved", event);
+ }
+
+ @Override
+ public void beforeDynamicTaskAdded(CaseDynamicTaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeDynamicTaskAdded", event);
+ }
+
+ @Override
+ public void afterDynamicTaskAdded(CaseDynamicTaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterDynamicTaskAdded", event);
+ }
+
+ @Override
+ public void beforeDynamicProcessAdded(CaseDynamicSubprocessEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeDynamicProcessAdded", event);
+ }
+
+ @Override
+ public void afterDynamicProcessAdded(CaseDynamicSubprocessEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterDynamicProcessAdded", event);
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void addConsumer(JBPMConsumer consumer) {
+ this.consumers.add(consumer);
+ }
+
+ @Override
+ public void removeConsumer(JBPMConsumer consumer) {
+ this.consumers.remove(consumer);
+ }
+
+ protected void sendMessage(String eventType, Object event) {
+ this.consumers.stream().filter(c ->
c.getStatus().isStarted()).forEach(c -> c.sendMessage(eventType, event));
+ }
+
+}
diff --git
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/listeners/CamelProcessEventListener.java
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/listeners/CamelProcessEventListener.java
new file mode 100644
index 00000000000..2e6ff08af69
--- /dev/null
+++
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/listeners/CamelProcessEventListener.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.listeners;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import org.apache.camel.component.jbpm.JBPMCamelConsumerAware;
+import org.apache.camel.component.jbpm.JBPMConsumer;
+import org.kie.api.event.process.ProcessCompletedEvent;
+import org.kie.api.event.process.ProcessEventListener;
+import org.kie.api.event.process.ProcessNodeLeftEvent;
+import org.kie.api.event.process.ProcessNodeTriggeredEvent;
+import org.kie.api.event.process.ProcessStartedEvent;
+import org.kie.api.event.process.ProcessVariableChangedEvent;
+import org.kie.internal.runtime.Cacheable;
+
+
+public class CamelProcessEventListener implements ProcessEventListener,
Cacheable, JBPMCamelConsumerAware {
+
+ private Set<JBPMConsumer> consumers = new LinkedHashSet<>();
+
+ @Override
+ public void beforeProcessStarted(ProcessStartedEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+
+ sendMessage("beforeProcessStarted", event);
+ }
+
+ @Override
+ public void afterProcessStarted(ProcessStartedEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+
+ sendMessage("afterProcessStarted", event);
+ }
+
+ @Override
+ public void beforeProcessCompleted(ProcessCompletedEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeProcessCompleted", event);
+ }
+
+ @Override
+ public void afterProcessCompleted(ProcessCompletedEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterProcessCompleted", event);
+ }
+
+ @Override
+ public void beforeNodeTriggered(ProcessNodeTriggeredEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeNodeTriggered", event);
+ }
+
+ @Override
+ public void afterNodeTriggered(ProcessNodeTriggeredEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterNodeTriggered", event);
+ }
+
+ @Override
+ public void beforeNodeLeft(ProcessNodeLeftEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeNodeLeft", event);
+ }
+
+ @Override
+ public void afterNodeLeft(ProcessNodeLeftEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterNodeLeft", event);
+ }
+
+ @Override
+ public void beforeVariableChanged(ProcessVariableChangedEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeVariableChanged", event);
+ }
+
+ @Override
+ public void afterVariableChanged(ProcessVariableChangedEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterVariableChanged", event);
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void addConsumer(JBPMConsumer consumer) {
+ this.consumers.add(consumer);
+ }
+
+ @Override
+ public void removeConsumer(JBPMConsumer consumer) {
+ this.consumers.remove(consumer);
+ }
+
+ protected void sendMessage(String eventType, Object event) {
+ this.consumers.stream().filter(c ->
c.getStatus().isStarted()).forEach(c -> c.sendMessage(eventType, event));
+ }
+}
diff --git
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/listeners/CamelTaskEventListener.java
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/listeners/CamelTaskEventListener.java
new file mode 100644
index 00000000000..67a93b7d2e5
--- /dev/null
+++
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/listeners/CamelTaskEventListener.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.listeners;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import org.apache.camel.component.jbpm.JBPMCamelConsumerAware;
+import org.apache.camel.component.jbpm.JBPMConsumer;
+import org.kie.api.task.TaskEvent;
+import org.kie.api.task.TaskLifeCycleEventListener;
+import org.kie.internal.runtime.Cacheable;
+
+
+public class CamelTaskEventListener implements Cacheable,
TaskLifeCycleEventListener, JBPMCamelConsumerAware {
+
+ private Set<JBPMConsumer> consumers = new LinkedHashSet<>();
+
+ @Override
+ public void beforeTaskActivatedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeTaskActivatedEvent", event);
+ }
+
+ @Override
+ public void beforeTaskClaimedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeTaskClaimedEvent", event);
+ }
+
+ @Override
+ public void beforeTaskSkippedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeTaskSkippedEvent", event);
+ }
+
+ @Override
+ public void beforeTaskStartedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeTaskStartedEvent", event);
+
+ }
+
+ @Override
+ public void beforeTaskStoppedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeTaskStoppedEvent", event);
+
+ }
+
+ @Override
+ public void beforeTaskCompletedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeTaskCompletedEvent", event);
+
+ }
+
+ @Override
+ public void beforeTaskFailedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeTaskFailedEvent", event);
+
+ }
+
+ @Override
+ public void beforeTaskAddedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeTaskAddedEvent", event);
+
+ }
+
+ @Override
+ public void beforeTaskExitedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeTaskExitedEvent", event);
+
+ }
+
+ @Override
+ public void beforeTaskReleasedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeTaskReleasedEvent", event);
+
+ }
+
+ @Override
+ public void beforeTaskResumedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeTaskResumedEvent", event);
+
+ }
+
+ @Override
+ public void beforeTaskSuspendedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeTaskSuspendedEvent", event);
+
+ }
+
+ @Override
+ public void beforeTaskForwardedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeTaskForwardedEvent", event);
+
+ }
+
+ @Override
+ public void beforeTaskDelegatedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeTaskDelegatedEvent", event);
+
+ }
+
+ @Override
+ public void beforeTaskNominatedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("beforeTaskNominatedEvent", event);
+
+ }
+
+ @Override
+ public void afterTaskActivatedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterTaskActivatedEvent", event);
+
+ }
+
+ @Override
+ public void afterTaskClaimedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterTaskClaimedEvent", event);
+
+ }
+
+ @Override
+ public void afterTaskSkippedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterTaskSkippedEvent", event);
+
+ }
+
+ @Override
+ public void afterTaskStartedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterTaskStartedEvent", event);
+
+ }
+
+ @Override
+ public void afterTaskStoppedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterTaskStoppedEvent", event);
+
+ }
+
+ @Override
+ public void afterTaskCompletedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterTaskCompletedEvent", event);
+
+ }
+
+ @Override
+ public void afterTaskFailedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterTaskFailedEvent", event);
+
+ }
+
+ @Override
+ public void afterTaskAddedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterTaskAddedEvent", event);
+
+ }
+
+ @Override
+ public void afterTaskExitedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterTaskExitedEvent", event);
+
+ }
+
+ @Override
+ public void afterTaskReleasedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterTaskReleasedEvent", event);
+
+ }
+
+ @Override
+ public void afterTaskResumedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterTaskResumedEvent", event);
+
+ }
+
+ @Override
+ public void afterTaskSuspendedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterTaskSuspendedEvent", event);
+
+ }
+
+ @Override
+ public void afterTaskForwardedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterTaskForwardedEvent", event);
+
+ }
+
+ @Override
+ public void afterTaskDelegatedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterTaskDelegatedEvent", event);
+
+ }
+
+ @Override
+ public void afterTaskNominatedEvent(TaskEvent event) {
+ if (consumers.isEmpty()) {
+ return;
+ }
+ sendMessage("afterTaskNominatedEvent", event);
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void addConsumer(JBPMConsumer consumer) {
+ this.consumers.add(consumer);
+ }
+
+ @Override
+ public void removeConsumer(JBPMConsumer consumer) {
+ this.consumers.remove(consumer);
+ }
+
+ protected void sendMessage(String eventType, Object event) {
+ this.consumers.stream().filter(c ->
c.getStatus().isStarted()).forEach(c -> c.sendMessage(eventType, event));
+ }
+
+
+}
diff --git
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/server/CamelKieServerExtension.java
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/server/CamelKieServerExtension.java
new file mode 100644
index 00000000000..3a8a0ea9bb0
--- /dev/null
+++
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/server/CamelKieServerExtension.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.jbpm.server;
+
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.model.FromDefinition;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.model.RoutesDefinition;
+import org.jbpm.services.api.service.ServiceRegistry;
+import org.kie.server.services.api.KieContainerInstance;
+import org.kie.server.services.api.KieServerExtension;
+import org.kie.server.services.api.KieServerRegistry;
+import org.kie.server.services.api.SupportedTransports;
+import org.kie.server.services.impl.KieServerImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class CamelKieServerExtension implements KieServerExtension {
+ public static final String EXTENSION_NAME = "Camel";
+
+ private static final Logger logger =
LoggerFactory.getLogger(CamelKieServerExtension.class);
+
+ private static final Boolean disabled =
Boolean.parseBoolean(System.getProperty("org.camel.server.ext.disabled",
"false"));
+
+ protected DefaultCamelContext camel;
+
+ protected boolean managedCamel;
+
+ protected Map<String, DefaultCamelContext> camelContexts = new HashMap<>();
+
+ public CamelKieServerExtension() {
+ this.managedCamel = true;
+ }
+
+ public CamelKieServerExtension(DefaultCamelContext camel) {
+ this.camel = camel;
+ this.managedCamel = false;
+ }
+
+ @Override
+ public boolean isInitialized() {
+ return camel != null;
+ }
+
+ @Override
+ public boolean isActive() {
+ return disabled == false;
+ }
+
+ @Override
+ public void init(KieServerImpl kieServer, KieServerRegistry registry) {
+ if (this.managedCamel && this.camel == null) {
+ this.camel = new DefaultCamelContext();
+ this.camel.setName("KIE Server Camel context");
+
+ try (InputStream is =
this.getClass().getResourceAsStream("/global-camel-routes.xml")) {
+ if (is != null) {
+
+ RoutesDefinition routes = camel.loadRoutesDefinition(is);
+ camel.addRouteDefinitions(routes.getRoutes());
+ }
+ } catch (Exception e) {
+ logger.error("Error while adding Camel context for KIE
Server", e);
+ }
+ }
+
+ ServiceRegistry.get().register("GlobalCamelService", this.camel);
+ }
+
+ @Override
+ public void destroy(KieServerImpl kieServer, KieServerRegistry registry) {
+ ServiceRegistry.get().remove("GlobalCamelService");
+
+ if (this.managedCamel && this.camel != null) {
+ try {
+ this.camel.stop();
+ } catch (Exception e) {
+ logger.error("Failed at stopping KIE Server extension {}",
EXTENSION_NAME);
+ }
+ }
+ }
+
+ @Override
+ public void createContainer(String id, KieContainerInstance
kieContainerInstance, Map<String, Object> parameters) {
+
+ ClassLoader classloader =
kieContainerInstance.getKieContainer().getClassLoader();
+ try (InputStream is =
classloader.getResourceAsStream("camel-routes.xml")) {
+ if (is != null) {
+
+ DefaultCamelContext context = new DefaultCamelContext();
+ context.setName("KIE Server Camel context for container " +
kieContainerInstance.getContainerId());
+
+ RoutesDefinition routes = context.loadRoutesDefinition(is);
+ annotateKJarRoutes(routes, id);
+ context.addRouteDefinitions(routes.getRoutes());
+ context.start();
+ camelContexts.put(id, context);
+
+ ServiceRegistry.get().register(id + "_CamelService",
this.camel);
+
+ }
+ } catch (Exception e) {
+ logger.error("Error while adding Camel context for {}",
kieContainerInstance.getContainerId(), e);
+ }
+ }
+
+ @Override
+ public void updateContainer(String id, KieContainerInstance
kieContainerInstance, Map<String, Object> parameters) {
+ disposeContainer(id, kieContainerInstance, parameters);
+ createContainer(id, kieContainerInstance, parameters);
+ }
+
+ @Override
+ public boolean isUpdateContainerAllowed(String id, KieContainerInstance
kieContainerInstance, Map<String, Object> parameters) {
+ return true;
+ }
+
+ @Override
+ public void disposeContainer(String id, KieContainerInstance
kieContainerInstance, Map<String, Object> parameters) {
+ DefaultCamelContext context = camelContexts.get(id);
+
+ if (context != null) {
+
+ ServiceRegistry.get().remove(id + "_CamelService");
+ try {
+ context.stop();
+ } catch (Exception e) {
+ logger.error("Error while removing Camel context for container
{}", id, e);
+ }
+ }
+ }
+
+ @Override
+ public List<Object> getAppComponents(SupportedTransports type) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public <T> T getAppComponents(Class<T> serviceType) {
+ return null;
+ }
+
+ @Override
+ public String getImplementedCapability() {
+ return "Integration";
+ }
+
+ @Override
+ public List<Object> getServices() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public String getExtensionName() {
+ return EXTENSION_NAME;
+ }
+
+ @Override
+ public Integer getStartOrder() {
+ return 50;
+ }
+
+ @Override
+ public void serverStarted() {
+ if (this.managedCamel && this.camel != null &&
!this.camel.isStarted()) {
+ try {
+ this.camel.start();
+ } catch (Exception e) {
+ logger.error("Failed at start Camel context", e);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return EXTENSION_NAME + " KIE Server extension";
+ }
+
+ protected void annotateKJarRoutes(RoutesDefinition routes, String
deploymentId) {
+ for (RouteDefinition route : routes.getRoutes()) {
+
+ for (FromDefinition from : route.getInputs()) {
+
+ if (from.getUri().startsWith("jbpm:events") &&
!from.getUri().contains("deploymentId")) {
+ StringBuilder uri = new StringBuilder(from.getUri());
+
+ String[] split = from.getUri().split("\\?");
+ if (split.length == 1) {
+ // no query given
+ uri.append("?");
+ } else {
+ // already query params exist
+ uri.append("&");
+ }
+ uri.append("deploymentId=").append(deploymentId);
+ from.setUri(uri.toString());
+ }
+
+ System.out.println(from.getUri());
+ }
+ }
+ }
+}
diff --git
a/components/camel-jbpm/src/main/resources/META-INF/services/org.kie.server.services.api.KieServerExtension
b/components/camel-jbpm/src/main/resources/META-INF/services/org.kie.server.services.api.KieServerExtension
new file mode 100644
index 00000000000..a3aed9f0f76
--- /dev/null
+++
b/components/camel-jbpm/src/main/resources/META-INF/services/org.kie.server.services.api.KieServerExtension
@@ -0,0 +1 @@
+org.apache.camel.component.jbpm.server.CamelKieServerExtension
\ No newline at end of file
diff --git
a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/JBPMComponentIntegrationTest.java
b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/JBPMComponentIntegrationTest.java
index 987b0a6b912..18e1055a94d 100644
---
a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/JBPMComponentIntegrationTest.java
+++
b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/JBPMComponentIntegrationTest.java
@@ -17,21 +17,74 @@
package org.apache.camel.component.jbpm;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.jbpm.JBPMProducer.Operation;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Ignore;
import org.junit.Test;
+import org.kie.server.api.model.instance.TaskSummary;
-@Ignore("This is an integration test that needs BPMS running on the local
machine")
+/**
+ * To run this example you need jBPM to run locally, easiest is to use single
zip
+ * distribution - download from jbpm.org
+ *
+ * Next, start it and import Evaluation sample project, build and deploy.
+ * Once done this test can be ran out of the box.
+ */
+@Ignore("This is an integration test that needs jBPM running on the local
machine")
public class JBPMComponentIntegrationTest extends CamelTestSupport {
+ @SuppressWarnings("unchecked")
@Test
public void interactsOverRest() throws Exception {
getMockEndpoint("mock:result").expectedMessageCount(1);
- template.sendBodyAndHeader("direct:start", null,
JBPMConstants.PROCESS_ID, "project1.integration-test");
+
+ // let's start process instance for evaluation process
+ Map<String, Object> params = new HashMap<>();
+ params.put("employee", "wbadmin");
+ params.put("reason", "Camel asks for it");
+
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(JBPMConstants.PROCESS_ID, "evaluation");
+ headers.put(JBPMConstants.PARAMETERS, params);
+
+ template.sendBodyAndHeaders("direct:start", null, headers);
+ assertMockEndpointsSatisfied();
+ Long processInstanceId = (Long)
getMockEndpoint("mock:result").getExchanges().get(0).getIn().getBody();
+ assertNotNull(processInstanceId);
+
+ // now let's collect user tasks
+ headers = new HashMap<>();
+ headers.put(JBPMConstants.OPERATION, JBPMConstants.OPERATION +
Operation.getTasksOwned);
+
+ template.sendBodyAndHeaders("direct:start", null, headers);
+ getMockEndpoint("mock:result").expectedMessageCount(2);
+ assertMockEndpointsSatisfied();
+
+ List<TaskSummary> tasks = (List<TaskSummary>)
getMockEndpoint("mock:result").getExchanges().get(1).getIn().getBody();
+ assertEquals(1, tasks.size());
+
+ // let's complete first user task
+ headers = new HashMap<>();
+ headers.put(JBPMConstants.TASK_ID, tasks.get(0).getId());
+ headers.put(JBPMConstants.OPERATION, JBPMConstants.OPERATION +
Operation.completeTask);
+
+ template.sendBodyAndHeaders("direct:start", null, headers);
+ getMockEndpoint("mock:result").expectedMessageCount(3);
+ assertMockEndpointsSatisfied();
+
+ // lastly let's abort process instance we just created
+ headers = new HashMap<>();
+ headers.put(JBPMConstants.PROCESS_INSTANCE_ID, processInstanceId);
+ headers.put(JBPMConstants.OPERATION, JBPMConstants.OPERATION +
Operation.abortProcessInstance);
+
+ template.sendBodyAndHeaders("direct:start", null, headers);
+ getMockEndpoint("mock:result").expectedMessageCount(4);
assertMockEndpointsSatisfied();
-
-
assertNotNull(getMockEndpoint("mock:result").getExchanges().get(0).getIn().getBody());
}
@Override
@@ -40,8 +93,8 @@ protected RouteBuilder createRouteBuilder() throws Exception {
@Override
public void configure() {
from("direct:start")
-
.to("jbpm:http://localhost:8080/business-central?userName=bpmsAdmin&password=pa$word1"
- +
"&deploymentId=org.kie.example:project1:1.0.0-SNAPSHOT")
+
.to("jbpm:http://localhost:8080/kie-server/services/rest/server?userName=wbadmin&password=wbadmin"
+ + "&deploymentId=evaluation")
.to("mock:result");
}
};
diff --git a/parent/pom.xml b/parent/pom.xml
index 398800c7861..2ea57782a80 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -362,7 +362,7 @@
<javax.ws.rs-api-version>2.0.1</javax.ws.rs-api-version>
<jaxb-bundle-version>2.2.11_1</jaxb-bundle-version>
<jaxen-version>1.1.6</jaxen-version>
- <jbpm-version>6.5.0.Final</jbpm-version>
+ <jbpm-version>7.14.0.Final</jbpm-version>
<jboss-javaee-6-version>1.0.0.Final</jboss-javaee-6-version>
<jboss-logging-version>3.3.2.Final</jboss-logging-version>
<jboss-marshalling-version>1.4.10.Final</jboss-marshalling-version>
diff --git a/platforms/karaf/features/src/main/resources/features.xml
b/platforms/karaf/features/src/main/resources/features.xml
index 37c5862a9b0..1b36a889a6b 100644
--- a/platforms/karaf/features/src/main/resources/features.xml
+++ b/platforms/karaf/features/src/main/resources/features.xml
@@ -1149,27 +1149,41 @@
<details>The camel-jbpm feature can only run on a SUN JVM. You need to add
the package com.sun.tools.xjc to the java platform packages in the
etc/jre.properties file.</details>
<feature version='${project.version}'>camel-core</feature>
<feature>transaction</feature>
- <bundle
dependency='true'>mvn:org.openengsb.wrapped/com.google.protobuf/2.4.1.w1</bundle>
- <bundle
dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.javax-inject/${javax-inject-bundle-version}</bundle>
- <bundle
dependency='true'>mvn:org.codehaus.jackson/jackson-jaxrs/${jackson-version}</bundle>
- <bundle
dependency='true'>mvn:org.codehaus.jackson/jackson-core-asl/${jackson-version}</bundle>
- <bundle
dependency='true'>mvn:org.codehaus.jackson/jackson-mapper-asl/${jackson-version}</bundle>
- <bundle
dependency='true'>mvn:org.codehaus.jackson/jackson-xc/${jackson-version}</bundle>
- <bundle
dependency='true'>mvn:org.apache.geronimo.specs/geronimo-jms_1.1_spec/${geronimo-jms-spec-version}</bundle>
- <bundle
dependency='true'>mvn:org.apache.servicemix.specs/org.apache.servicemix.specs.jaxws-api-2.2/${servicemix-specs-version}</bundle>
+ <feature version='${cxf-version-range}'>cxf-jaxrs</feature>
+ <bundle
dependency='true'>mvn:org.kie.server/kie-server-api/${jbpm-version}</bundle>
+ <bundle
dependency='true'>mvn:org.kie.server/kie-server-common/${jbpm-version}</bundle>
+ <bundle
dependency='true'>mvn:org.kie.server/kie-server-client/${jbpm-version}</bundle>
+ <bundle
dependency='true'>mvn:org.kie.soup/kie-soup-maven-support/${jbpm-version}</bundle>
+ <bundle
dependency='true'>mvn:org.kie.soup/kie-soup-project-datamodel-api/${jbpm-version}</bundle>
+ <bundle
dependency='true'>mvn:org.kie.soup/kie-soup-project-datamodel-commons/${jbpm-version}</bundle>
+ <bundle
dependency='true'>mvn:org.kie.soup/kie-soup-commons/${jbpm-version}</bundle>
+ <bundle dependency='true'>mvn:org.kie/kie-api/${jbpm-version}</bundle>
+ <bundle
dependency='true'>mvn:org.kie/kie-internal/${jbpm-version}</bundle>
<bundle
dependency='true'>mvn:org.drools/drools-core/${jbpm-version}</bundle>
<bundle
dependency='true'>mvn:org.drools/drools-compiler/${jbpm-version}</bundle>
- <bundle
dependency='true'>wrap:mvn:org.kie/kie-internal/${jbpm-version}</bundle>
- <bundle
dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.xstream-java8/${xstream-bundle-version}</bundle>
+ <bundle dependency='true'>mvn:org.mvel/mvel2/${mvel-version}</bundle>
+ <bundle
dependency='true'>mvn:org.kie/kie-dmn-model/${jbpm-version}</bundle>
+ <bundle dependency='true'>mvn:org.kie/kie-dmn-api/${jbpm-version}</bundle>
+ <bundle
dependency='true'>mvn:org.optaplanner/optaplanner-core/${jbpm-version}</bundle>
+ <bundle
dependency='true'>mvn:org.optaplanner/optaplanner-persistence-common/${jbpm-version}</bundle>
+ <bundle
dependency='true'>mvn:org.optaplanner/optaplanner-persistence-jaxb/${jbpm-version}</bundle>
+ <bundle
dependency='true'>mvn:org.optaplanner/optaplanner-persistence-jackson/${jbpm-version}</bundle>
+ <bundle
dependency='true'>mvn:org.optaplanner/optaplanner-persistence-xstream/${jbpm-version}</bundle>
+ <bundle
dependency='true'>mvn:com.google.protobuf/protobuf-java/${protobuf-version}</bundle>
+ <bundle
dependency='true'>mvn:com.google.guava/guava/${google-guava-version}</bundle>
+ <bundle
dependency='true'>mvn:org.apache.commons/commons-math3/${commons-math3-version}</bundle>
+ <bundle
dependency='true'>mvn:org.apache.commons/commons-lang3/${commons-lang3-version}</bundle>
<bundle
dependency='true'>mvn:commons-codec/commons-codec/${commons-codec-version}</bundle>
- <bundle dependency='true'>mvn:org.mvel/mvel2/${mvel-version}</bundle>
- <bundle dependency='true'>wrap:mvn:org.kie/kie-api/${jbpm-version}</bundle>
- <bundle
dependency='true'>wrap:mvn:org.kie.remote/kie-remote-common/${jbpm-version}</bundle>
- <bundle
dependency='true'>wrap:mvn:org.kie.remote.ws/kie-remote-ws-common/${jbpm-version}</bundle>
- <bundle
dependency='true'>wrap:mvn:org.kie.remote/kie-remote-jaxb/${jbpm-version}</bundle>
- <bundle
dependency='true'>wrap:mvn:org.kie.remote/kie-services-client/6.1.0.Final</bundle>
- <bundle
dependency='true'>wrap:mvn:org.kie.remote/kie-remote-client/${jbpm-version}</bundle>
- <bundle>mvn:org.apache.camel/camel-jbpm/${project.version}</bundle>
+ <bundle
dependency='true'>mvn:com.fasterxml.jackson.core/jackson-annotations/${jackson2-version}</bundle>
+ <bundle
dependency='true'>mvn:com.fasterxml.jackson.core/jackson-core/${jackson2-version}</bundle>
+ <bundle
dependency='true'>mvn:com.fasterxml.jackson.core/jackson-databind/${jackson2-version}</bundle>
+ <bundle
dependency='true'>mvn:com.fasterxml.jackson.module/jackson-module-jaxb-annotations/${jackson2-version}</bundle>
+ <bundle
dependency='true'>mvn:org.apache.servicemix.specs/org.apache.servicemix.specs.jaxws-api-2.2/${servicemix-specs-version}</bundle>
+ <bundle
dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.xstream-java8/${xstream-bundle-version}</bundle>
+ <bundle
dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.xpp3/${xpp3-bundle-version}</bundle>
+ <bundle
dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.reflections/${reflections-bundle-version}</bundle>
+ <bundle
dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.jaxb-xjc/${jaxb-bundle-version}</bundle>
+ <bundle>mvn:org.apache.camel/camel-jbpm/${project.version}</bundle>
</feature>
<feature name='camel-jcache' version='${project.version}' resolver='(obr)'
start-level='50'>
<feature version='${project.version}'>camel-core</feature>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Upgrade jBPM component to use 7 series with consumer capability to react to
> produced events by jBPM
> ---------------------------------------------------------------------------------------------------
>
> Key: CAMEL-12931
> URL: https://issues.apache.org/jira/browse/CAMEL-12931
> Project: Camel
> Issue Type: Improvement
> Components: camel-jbpm
> Affects Versions: 2.22.1
> Reporter: Maciej Swiderski
> Assignee: Andrea Cosentino
> Priority: Major
> Fix For: 2.23.0
>
>
> Currently camel-jbpm component relies on old 6.5 version of jBPM which is not
> maintained and not existing in v7 of jBPM.
>
> This Jira is about upgrading jBPM to latest version of jBPM (7.14 as of now)
> and also to extend the capabilities to provide consumer for jBPM. Consumer is
> then responsible for sending events produced by jBPM (process events, case
> events, user tasks events) to camel routes.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)