apupier commented on code in PR #22338:
URL: https://github.com/apache/camel/pull/22338#discussion_r3010403363


##########
components/camel-camunda/src/main/java/org/apache/camel/component/camunda/internal/CamundaService.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.camunda.internal;
+
+import java.time.Duration;
+import java.util.List;
+
+import io.camunda.client.CamundaClient;
+import io.camunda.client.api.command.CreateProcessInstanceCommandStep1;
+import io.camunda.client.api.response.DeploymentEvent;
+import io.camunda.client.api.response.Process;
+import io.camunda.client.api.response.ProcessInstanceEvent;
+import io.camunda.client.api.response.PublishMessageResponse;
+import io.camunda.client.api.worker.JobHandler;
+import io.camunda.client.api.worker.JobWorker;
+import org.apache.camel.component.camunda.model.DeploymentRequest;
+import org.apache.camel.component.camunda.model.DeploymentResponse;
+import org.apache.camel.component.camunda.model.JobRequest;
+import org.apache.camel.component.camunda.model.JobResponse;
+import org.apache.camel.component.camunda.model.MessageRequest;
+import org.apache.camel.component.camunda.model.MessageResponse;
+import org.apache.camel.component.camunda.model.ProcessDeploymentResponse;
+import org.apache.camel.component.camunda.model.ProcessRequest;
+import org.apache.camel.component.camunda.model.ProcessResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CamundaService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CamundaService.class);
+
+    private final CamundaClient client;
+
+    public CamundaService(CamundaClient client) {
+        this.client = client;
+    }
+
+    public void doStop() {
+        if (client != null) {
+            client.close();
+        }
+    }
+
+    public ProcessResponse startProcess(ProcessRequest processMessage) {
+        ProcessResponse resultMessage = new ProcessResponse();
+        resultMessage.setProcessId(processMessage.getProcessId());
+
+        try {
+            
CreateProcessInstanceCommandStep1.CreateProcessInstanceCommandStep3 step3;
+
+            
CreateProcessInstanceCommandStep1.CreateProcessInstanceCommandStep2 step2
+                    = client.newCreateInstanceCommand()
+                            .bpmnProcessId(processMessage.getProcessId());
+
+            if (processMessage.getProcessVersion() > 0) {
+                step3 = step2.version(processMessage.getProcessVersion());
+            } else {
+                step3 = step2.latestVersion();
+            }
+
+            if (!processMessage.getVariables().isEmpty()) {
+                step3.variables(processMessage.getVariables());
+            }
+
+            ProcessInstanceEvent event = step3.send().join();
+
+            resultMessage.setProcessId(event.getBpmnProcessId());
+            resultMessage.setProcessKey(event.getProcessDefinitionKey());
+            resultMessage.setProcessVersion(event.getVersion());
+            resultMessage.setProcessInstanceKey(event.getProcessInstanceKey());
+            resultMessage.setSuccess(true);
+        } catch (Exception exception) {
+            LOG.error("Camunda Error", exception);
+            
resultMessage.setProcessVersion(processMessage.getProcessVersion());
+            resultMessage.setErrorMessage(exception.getMessage());
+            resultMessage.setSuccess(false);
+        }
+
+        return resultMessage;
+    }
+
+    public ProcessResponse cancelProcessInstance(ProcessRequest 
processMessage) {
+        ProcessResponse resultMessage = new ProcessResponse();
+        
resultMessage.setProcessInstanceKey(processMessage.getProcessInstanceKey());
+
+        try {
+            
client.newCancelInstanceCommand(processMessage.getProcessInstanceKey())
+                    .send()
+                    .join();

Review Comment:
   I think we need to specify a configurable timeout on all the join 
operations. (can be one timeout for the whole  component) Otherwise we can have 
pending processes for indefinite time.  But well, maybe it is handled in 
another way already that I have not found as the camel-zeebe component was 
doing the same 
https://github.com/apache/camel/blob/ffbce6a277e44bc2fbff7b1eaeac657484be31f3/components/camel-zeebe/src/main/java/org/apache/camel/component/zeebe/internal/ZeebeService.java#L121
   and not blocking for this PR as this was already like that with zeebe but 
maybe worth raising an issue for that later on (unless we find how it is 
configured)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to