gnodet commented on code in PR #22338:
URL: https://github.com/apache/camel/pull/22338#discussion_r3010398952


##########
components/camel-camunda/src/main/java/org/apache/camel/component/camunda/CamundaComponent.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.camunda;
+
+import java.net.URI;
+import java.util.Map;
+
+import io.camunda.client.CamundaClient;
+import io.camunda.client.CamundaClientBuilder;
+import org.apache.camel.Endpoint;
+import org.apache.camel.component.camunda.internal.CamundaService;
+import org.apache.camel.component.camunda.internal.OperationName;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.support.DefaultComponent;
+
[email protected]("camunda")
+public class CamundaComponent extends DefaultComponent {
+
+    @Metadata(label = "security", description = "Camunda SaaS cluster ID. When 
set, the client connects via the cloud builder.")
+    String clusterId;
+
+    @Metadata(label = "security", description = "Camunda SaaS region (default: 
bru-2).", defaultValue = "bru-2")
+    String region = "bru-2";
+
+    @Metadata(label = "security", secret = true,
+              description = "Client ID for OAuth / SaaS authentication.")
+    String clientId;
+
+    @Metadata(label = "security", secret = true,
+              description = "Client secret for OAuth / SaaS authentication.")
+    String clientSecret;
+
+    @Metadata(label = "common",
+              description = "gRPC address of the Camunda cluster (e.g. 
http://localhost:26500). "
+                            + "Used for self-managed connections when 
clusterId is not set.",
+              defaultValue = "http://localhost:26500";)
+    String grpcAddress = "http://localhost:26500";;
+
+    @Metadata(label = "common",
+              description = "REST address of the Camunda cluster (e.g. 
http://localhost:8080). "
+                            + "Used for self-managed connections when 
clusterId is not set.",
+              defaultValue = "http://localhost:8080";)
+    String restAddress = "http://localhost:8080";;
+
+    @Metadata(label = "security",
+              description = "OAuth authorization server URL for self-managed 
authentication.")
+    String oAuthAPI;
+
+    private CamundaService camundaService;
+
+    protected Endpoint createEndpoint(String uri, String remaining, 
Map<String, Object> parameters) throws Exception {
+        OperationName operationName = OperationName.fromValue(remaining);
+
+        Endpoint endpoint = new CamundaEndpoint(uri, this, operationName);
+        setProperties(endpoint, parameters);
+        return endpoint;
+    }
+
+    public CamundaService getCamundaService() {
+        return camundaService;
+    }
+
+    public String getClusterId() {
+        return clusterId;
+    }
+
+    public void setClusterId(String clusterId) {
+        this.clusterId = clusterId;
+    }
+
+    public String getRegion() {
+        return region;
+    }
+
+    public void setRegion(String region) {
+        this.region = region;
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    public String getClientSecret() {
+        return clientSecret;
+    }
+
+    public void setClientSecret(String clientSecret) {
+        this.clientSecret = clientSecret;
+    }
+
+    public String getGrpcAddress() {
+        return grpcAddress;
+    }
+
+    public void setGrpcAddress(String grpcAddress) {
+        this.grpcAddress = grpcAddress;
+    }
+
+    public String getRestAddress() {
+        return restAddress;
+    }
+
+    public void setRestAddress(String restAddress) {
+        this.restAddress = restAddress;
+    }
+
+    public String getOAuthAPI() {
+        return oAuthAPI;
+    }
+
+    public void setOAuthAPI(String oAuthAPI) {
+        this.oAuthAPI = oAuthAPI;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        if (camundaService == null) {
+            CamundaClient client = buildClient();
+            camundaService = new CamundaService(client);
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        if (camundaService != null) {
+            camundaService.doStop();
+            camundaService = null;
+        }
+    }
+
+    private CamundaClient buildClient() {
+        if (clusterId != null) {
+            return CamundaClient.newCloudClientBuilder()
+                    .withClusterId(clusterId)
+                    .withClientId(clientId)
+                    .withClientSecret(clientSecret)
+                    .withRegion(region)
+                    .build();
+        }
+
+        CamundaClientBuilder builder = CamundaClient.newClientBuilder()
+                .grpcAddress(URI.create(grpcAddress))
+                .restAddress(URI.create(restAddress));
+
+        if (clientId != null && oAuthAPI != null) {
+            builder.credentialsProvider(
+                    new 
io.camunda.client.impl.oauth.OAuthCredentialsProviderBuilder()
+                            .authorizationServerUrl(oAuthAPI)
+                            .audience(grpcAddress)
+                            .clientId(clientId)
+                            .clientSecret(clientSecret)
+                            .build());
+        }
+
+        return builder.build();

Review Comment:
   **Major**: When `clusterId` is null and no OAuth is configured (typical 
local Docker/dev setup), there's no way to disable TLS. The Camunda Java Client 
defaults to TLS, so connecting to a local non-TLS instance will fail.
   
   The zeebe component handles this with `usePlaintext()`. Consider adding a 
`usePlaintext` boolean component option for self-managed setups without TLS.



##########
components/camel-camunda/pom.xml:
##########
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.camel</groupId>
+        <artifactId>components</artifactId>
+        <version>4.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>camel-camunda</artifactId>
+    <packaging>jar</packaging>
+    <name>Camel :: Camunda</name>
+    <description>Camel Camunda component using the Camunda 8 Java 
Client</description>
+
+    <properties>
+        <firstVersion>4.19</firstVersion>
+        <supportLevel>Preview</supportLevel>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-support</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.camunda</groupId>
+            <artifactId>camunda-client-java</artifactId>
+            <version>${camunda-client-java-version}</version>

Review Comment:
   **Blocking**: The property `${camunda-client-java-version}` is not defined 
in `parent/pom.xml`. The build will fail. You need to add this property to the 
parent POM's `<properties>` section alongside the existing `zeebe-version` 
property.



##########
components/camel-camunda/src/main/java/org/apache/camel/component/camunda/internal/CamundaService.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.camunda.internal;
+
+import java.time.Duration;
+import java.util.List;
+
+import io.camunda.client.CamundaClient;
+import io.camunda.client.api.command.CreateProcessInstanceCommandStep1;
+import io.camunda.client.api.response.DeploymentEvent;
+import io.camunda.client.api.response.Process;
+import io.camunda.client.api.response.ProcessInstanceEvent;
+import io.camunda.client.api.response.PublishMessageResponse;
+import io.camunda.client.api.worker.JobHandler;
+import io.camunda.client.api.worker.JobWorker;
+import org.apache.camel.component.camunda.model.DeploymentRequest;
+import org.apache.camel.component.camunda.model.DeploymentResponse;
+import org.apache.camel.component.camunda.model.JobRequest;
+import org.apache.camel.component.camunda.model.JobResponse;
+import org.apache.camel.component.camunda.model.MessageRequest;
+import org.apache.camel.component.camunda.model.MessageResponse;
+import org.apache.camel.component.camunda.model.ProcessDeploymentResponse;
+import org.apache.camel.component.camunda.model.ProcessRequest;
+import org.apache.camel.component.camunda.model.ProcessResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CamundaService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CamundaService.class);
+
+    private final CamundaClient client;
+
+    public CamundaService(CamundaClient client) {
+        this.client = client;
+    }
+
+    public void doStop() {
+        if (client != null) {
+            client.close();
+        }
+    }
+
+    public ProcessResponse startProcess(ProcessRequest processMessage) {
+        ProcessResponse resultMessage = new ProcessResponse();
+        resultMessage.setProcessId(processMessage.getProcessId());
+
+        try {
+            
CreateProcessInstanceCommandStep1.CreateProcessInstanceCommandStep3 step3;
+
+            
CreateProcessInstanceCommandStep1.CreateProcessInstanceCommandStep2 step2
+                    = client.newCreateInstanceCommand()
+                            .bpmnProcessId(processMessage.getProcessId());
+
+            if (processMessage.getProcessVersion() > 0) {
+                step3 = step2.version(processMessage.getProcessVersion());
+            } else {
+                step3 = step2.latestVersion();
+            }
+
+            if (!processMessage.getVariables().isEmpty()) {

Review Comment:
   **Blocking**: If `getVariables()` returns `null` (e.g., from JSON 
deserialization, or explicit `setVariables(null)`), this throws NPE. The 
`Collections.emptyMap()` default only works if nobody calls 
`setVariables(null)`.
   
   Same issue on lines 136 and 157.
   
   ```suggestion
               if (processMessage.getVariables() != null && 
!processMessage.getVariables().isEmpty()) {
   ```



##########
components/camel-camunda/src/main/java/org/apache/camel/component/camunda/CamundaConsumer.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.camunda;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.camunda.client.api.response.ActivatedJob;
+import io.camunda.client.api.worker.JobClient;
+import io.camunda.client.api.worker.JobHandler;
+import io.camunda.client.api.worker.JobWorker;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelException;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.camunda.internal.OperationName;
+import org.apache.camel.component.camunda.model.JobWorkerMessage;
+import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CamundaConsumer extends DefaultConsumer {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CamundaConsumer.class);
+
+    private final CamundaEndpoint endpoint;
+
+    private JobWorker jobWorker;
+
+    private ObjectMapper objectMapper = new ObjectMapper();
+
+    public CamundaConsumer(CamundaEndpoint endpoint, Processor processor) 
throws CamelException {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        final OperationName operationName = getEndpoint().getOperationName();
+        switch (operationName) {
+            case REGISTER_JOB_WORKER:
+                ObjectHelper.notNull(getEndpoint().getJobType(), "jobType");
+
+                jobWorker = 
getEndpoint().getCamundaService().registerJobHandler(new ConsumerJobHandler(),
+                        getEndpoint().getJobType(), 
getEndpoint().getTimeout());
+                break;
+            default:
+                throw new CamelException(String.format("Invalid Operation for 
Consumer %s", operationName.value()));
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        if (jobWorker != null && jobWorker.isOpen()) {
+            jobWorker.close();
+        }
+    }
+
+    private class ConsumerJobHandler implements JobHandler {
+        @Override
+        public void handle(JobClient client, ActivatedJob job) throws 
Exception {

Review Comment:
   **Blocking**: The `JobClient client` parameter is received but never used. 
After the exchange is processed (line 114), the job is never completed or 
failed on the Camunda side. This means:
   - On success: the job will time out and be retried → duplicate processing
   - On failure: same behavior, no error feedback to Camunda
   
   Either:
   1. Auto-complete on success / auto-fail on exception (standard job worker 
pattern), or
   2. Document very prominently that the user's route **must** explicitly call 
`completeJob` or `failJob`
   
   The zeebe component has the same design, but it's still a footgun worth 
addressing in a new component.



##########
components/camel-camunda/src/main/java/org/apache/camel/component/camunda/CamundaEndpoint.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.camunda;
+
+import java.util.Map;
+
+import org.apache.camel.Category;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.camunda.internal.CamundaService;
+import org.apache.camel.component.camunda.internal.OperationName;
+import org.apache.camel.spi.EndpointServiceLocation;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * Interact with Camunda 8 Orchestration Clusters using the Camunda Java 
Client.
+ */
+@UriEndpoint(firstVersion = "4.19.0", scheme = "camunda", title = "Camunda",
+             syntax = "camunda:operationName",
+             category = { Category.WORKFLOW, Category.SAAS },
+             headersClass = CamundaConstants.class)
+public class CamundaEndpoint extends DefaultEndpoint implements 
EndpointServiceLocation {
+
+    @UriPath(label = "common", description = "The operation to use",
+             enums = 
"startProcess,cancelProcess,publishMessage,completeJob,failJob,updateJobRetries,worker,throwError,deployResource")
+    @Metadata(required = true)
+    private OperationName operationName;
+
+    @UriParam(defaultValue = "false")
+    @Metadata(description = "Format the result in the body as JSON.")
+    private boolean formatJSON;
+
+    @UriParam
+    @Metadata(label = "consumer", description = "Job type for the job worker.")
+    private String jobType;
+
+    @UriParam(defaultValue = "10")
+    @Metadata(label = "consumer", description = "Timeout for job worker in 
seconds.")
+    private int timeout = 10;
+
+    public CamundaEndpoint() {
+    }
+
+    public CamundaEndpoint(String uri, CamundaComponent component, 
OperationName operationName) {
+        super(uri, component);
+        this.operationName = operationName;
+    }
+
+    @Override
+    public String getServiceUrl() {
+        if (getComponent().getClusterId() != null) {
+            return getComponent().getClusterId() + "." + 
getComponent().getRegion();
+        }
+        return getComponent().getGrpcAddress();
+    }
+
+    @Override
+    public String getServiceProtocol() {
+        return "rest";
+    }
+
+    @Override
+    public Map<String, String> getServiceMetadata() {
+        if (getComponent().getClientId() != null) {
+            return Map.of("clientId", getComponent().getClientId());
+        }
+        return null;
+    }
+
+    public Producer createProducer() throws Exception {

Review Comment:
   **Major**: No validation that the operation is appropriate for a producer. 
If someone creates `camunda://worker` as a producer, the `CamundaProducer` 
constructor won't find a matching processor and `processor` stays null, leading 
to `CamelException("No processor set")` at start.
   
   Validate here that `operationName != OperationName.REGISTER_JOB_WORKER` and 
throw a descriptive error. Same in `createConsumer()` (line 95) — validate that 
the operation is `REGISTER_JOB_WORKER` for consumers.



##########
components/camel-camunda/src/main/java/org/apache/camel/component/camunda/CamundaConstants.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.camunda;
+
+import org.apache.camel.spi.Metadata;
+
+public final class CamundaConstants {
+
+    public static final String HEADER_PREFIX = "CamelCamunda";
+
+    @Metadata(label = "producer", description = "Job key for the worker job", 
javaType = "long")

Review Comment:
   **Minor**: This header is also set by the consumer in 
`CamundaConsumer.ConsumerJobHandler.handle()` (line 100). Label should reflect 
that:
   
   ```suggestion
       @Metadata(label = "common", description = "Job key for the worker job", 
javaType = "long")
   ```



##########
components/camel-camunda/src/main/java/org/apache/camel/component/camunda/CamundaConsumer.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.camunda;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.camunda.client.api.response.ActivatedJob;
+import io.camunda.client.api.worker.JobClient;
+import io.camunda.client.api.worker.JobHandler;
+import io.camunda.client.api.worker.JobWorker;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelException;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.camunda.internal.OperationName;
+import org.apache.camel.component.camunda.model.JobWorkerMessage;
+import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CamundaConsumer extends DefaultConsumer {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CamundaConsumer.class);
+
+    private final CamundaEndpoint endpoint;
+
+    private JobWorker jobWorker;
+
+    private ObjectMapper objectMapper = new ObjectMapper();
+
+    public CamundaConsumer(CamundaEndpoint endpoint, Processor processor) 
throws CamelException {

Review Comment:
   nit: Nothing in the constructor can throw `CamelException`. Remove it:
   
   ```suggestion
       public CamundaConsumer(CamundaEndpoint endpoint, Processor processor) {
   ```



##########
components/camel-camunda/src/main/java/org/apache/camel/component/camunda/processor/AbstractBaseProcessor.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.camunda.processor;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.camunda.CamundaConstants;
+import org.apache.camel.component.camunda.CamundaEndpoint;
+import org.apache.camel.component.camunda.model.CamundaMessage;
+import org.apache.camel.support.service.BaseService;
+
+public abstract class AbstractBaseProcessor extends BaseService implements 
CamundaProcessor {
+    protected final CamundaEndpoint endpoint;
+    ObjectMapper objectMapper = new ObjectMapper();
+
+    public AbstractBaseProcessor(CamundaEndpoint endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    protected void setBody(Exchange exchange, CamundaMessage message, boolean 
formatJSON) {

Review Comment:
   **Major**: The `formatJSON` parameter is accepted but never used — the 
method body uses `endpoint.isFormatJSON()` instead. Since all callers pass 
`endpoint.isFormatJSON()` anyway, remove the parameter to avoid confusion:
   
   ```suggestion
       protected void setBody(Exchange exchange, CamundaMessage message) {
   ```



##########
components/camel-camunda/src/main/java/org/apache/camel/component/camunda/internal/CamundaService.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.camunda.internal;
+
+import java.time.Duration;
+import java.util.List;
+
+import io.camunda.client.CamundaClient;
+import io.camunda.client.api.command.CreateProcessInstanceCommandStep1;
+import io.camunda.client.api.response.DeploymentEvent;
+import io.camunda.client.api.response.Process;
+import io.camunda.client.api.response.ProcessInstanceEvent;
+import io.camunda.client.api.response.PublishMessageResponse;
+import io.camunda.client.api.worker.JobHandler;
+import io.camunda.client.api.worker.JobWorker;
+import org.apache.camel.component.camunda.model.DeploymentRequest;
+import org.apache.camel.component.camunda.model.DeploymentResponse;
+import org.apache.camel.component.camunda.model.JobRequest;
+import org.apache.camel.component.camunda.model.JobResponse;
+import org.apache.camel.component.camunda.model.MessageRequest;
+import org.apache.camel.component.camunda.model.MessageResponse;
+import org.apache.camel.component.camunda.model.ProcessDeploymentResponse;
+import org.apache.camel.component.camunda.model.ProcessRequest;
+import org.apache.camel.component.camunda.model.ProcessResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CamundaService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CamundaService.class);
+
+    private final CamundaClient client;
+
+    public CamundaService(CamundaClient client) {
+        this.client = client;
+    }
+
+    public void doStop() {
+        if (client != null) {
+            client.close();
+        }
+    }
+
+    public ProcessResponse startProcess(ProcessRequest processMessage) {
+        ProcessResponse resultMessage = new ProcessResponse();
+        resultMessage.setProcessId(processMessage.getProcessId());
+
+        try {
+            
CreateProcessInstanceCommandStep1.CreateProcessInstanceCommandStep3 step3;
+
+            
CreateProcessInstanceCommandStep1.CreateProcessInstanceCommandStep2 step2
+                    = client.newCreateInstanceCommand()
+                            .bpmnProcessId(processMessage.getProcessId());
+
+            if (processMessage.getProcessVersion() > 0) {
+                step3 = step2.version(processMessage.getProcessVersion());
+            } else {
+                step3 = step2.latestVersion();
+            }
+
+            if (!processMessage.getVariables().isEmpty()) {
+                step3.variables(processMessage.getVariables());
+            }
+
+            ProcessInstanceEvent event = step3.send().join();
+
+            resultMessage.setProcessId(event.getBpmnProcessId());
+            resultMessage.setProcessKey(event.getProcessDefinitionKey());
+            resultMessage.setProcessVersion(event.getVersion());
+            resultMessage.setProcessInstanceKey(event.getProcessInstanceKey());
+            resultMessage.setSuccess(true);
+        } catch (Exception exception) {
+            LOG.error("Camunda Error", exception);
+            
resultMessage.setProcessVersion(processMessage.getProcessVersion());
+            resultMessage.setErrorMessage(exception.getMessage());
+            resultMessage.setSuccess(false);
+        }
+
+        return resultMessage;
+    }
+
+    public ProcessResponse cancelProcessInstance(ProcessRequest 
processMessage) {
+        ProcessResponse resultMessage = new ProcessResponse();
+        
resultMessage.setProcessInstanceKey(processMessage.getProcessInstanceKey());
+
+        try {
+            
client.newCancelInstanceCommand(processMessage.getProcessInstanceKey())
+                    .send()
+                    .join();
+
+            resultMessage.setSuccess(true);
+        } catch (Exception exception) {
+            LOG.error("Cannot cancel process instance {}", 
processMessage.getProcessId(), exception);

Review Comment:
   **Minor**: For cancel operations, `processMessage.getProcessId()` may be 
null. The relevant identifier is the `processInstanceKey`.
   
   ```suggestion
               LOG.error("Cannot cancel process instance {}", 
processMessage.getProcessInstanceKey(), exception);
   ```



##########
components/camel-camunda/src/main/java/org/apache/camel/component/camunda/internal/CamundaService.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.camunda.internal;
+
+import java.time.Duration;
+import java.util.List;
+
+import io.camunda.client.CamundaClient;
+import io.camunda.client.api.command.CreateProcessInstanceCommandStep1;
+import io.camunda.client.api.response.DeploymentEvent;
+import io.camunda.client.api.response.Process;
+import io.camunda.client.api.response.ProcessInstanceEvent;
+import io.camunda.client.api.response.PublishMessageResponse;
+import io.camunda.client.api.worker.JobHandler;
+import io.camunda.client.api.worker.JobWorker;
+import org.apache.camel.component.camunda.model.DeploymentRequest;
+import org.apache.camel.component.camunda.model.DeploymentResponse;
+import org.apache.camel.component.camunda.model.JobRequest;
+import org.apache.camel.component.camunda.model.JobResponse;
+import org.apache.camel.component.camunda.model.MessageRequest;
+import org.apache.camel.component.camunda.model.MessageResponse;
+import org.apache.camel.component.camunda.model.ProcessDeploymentResponse;
+import org.apache.camel.component.camunda.model.ProcessRequest;
+import org.apache.camel.component.camunda.model.ProcessResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CamundaService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CamundaService.class);
+
+    private final CamundaClient client;
+
+    public CamundaService(CamundaClient client) {
+        this.client = client;
+    }
+
+    public void doStop() {
+        if (client != null) {
+            client.close();
+        }
+    }
+
+    public ProcessResponse startProcess(ProcessRequest processMessage) {
+        ProcessResponse resultMessage = new ProcessResponse();
+        resultMessage.setProcessId(processMessage.getProcessId());
+
+        try {
+            
CreateProcessInstanceCommandStep1.CreateProcessInstanceCommandStep3 step3;
+
+            
CreateProcessInstanceCommandStep1.CreateProcessInstanceCommandStep2 step2
+                    = client.newCreateInstanceCommand()
+                            .bpmnProcessId(processMessage.getProcessId());
+
+            if (processMessage.getProcessVersion() > 0) {
+                step3 = step2.version(processMessage.getProcessVersion());
+            } else {
+                step3 = step2.latestVersion();
+            }
+
+            if (!processMessage.getVariables().isEmpty()) {
+                step3.variables(processMessage.getVariables());
+            }
+
+            ProcessInstanceEvent event = step3.send().join();
+
+            resultMessage.setProcessId(event.getBpmnProcessId());
+            resultMessage.setProcessKey(event.getProcessDefinitionKey());
+            resultMessage.setProcessVersion(event.getVersion());
+            resultMessage.setProcessInstanceKey(event.getProcessInstanceKey());
+            resultMessage.setSuccess(true);
+        } catch (Exception exception) {
+            LOG.error("Camunda Error", exception);
+            
resultMessage.setProcessVersion(processMessage.getProcessVersion());
+            resultMessage.setErrorMessage(exception.getMessage());
+            resultMessage.setSuccess(false);
+        }
+
+        return resultMessage;
+    }
+
+    public ProcessResponse cancelProcessInstance(ProcessRequest 
processMessage) {
+        ProcessResponse resultMessage = new ProcessResponse();
+        
resultMessage.setProcessInstanceKey(processMessage.getProcessInstanceKey());
+
+        try {
+            
client.newCancelInstanceCommand(processMessage.getProcessInstanceKey())
+                    .send()
+                    .join();
+
+            resultMessage.setSuccess(true);
+        } catch (Exception exception) {
+            LOG.error("Cannot cancel process instance {}", 
processMessage.getProcessId(), exception);
+            resultMessage.setErrorMessage(exception.getMessage());
+            resultMessage.setSuccess(false);
+        }
+
+        return resultMessage;
+    }
+
+    public MessageResponse publishMessage(MessageRequest message) {
+        MessageResponse resultMessage = new MessageResponse();
+        resultMessage.setCorrelationKey(message.getCorrelationKey());
+
+        try {
+            if (message.getCorrelationKey() == null) {
+                LOG.error("Correlation Key is missing!");
+                resultMessage.setSuccess(false);
+                resultMessage.setErrorMessage("Correlation Key is missing!");
+                return resultMessage;
+            }
+
+            var cmd = client.newPublishMessageCommand()
+                    .messageName(message.getName())
+                    .correlationKey(message.getCorrelationKey());
+
+            if (message.getTimeToLive() >= 0) {
+                cmd.timeToLive(Duration.ofMillis(message.getTimeToLive()));
+            }
+            if (message.getMessageId() != null) {
+                cmd.messageId(message.getMessageId());
+            }
+            if (!message.getVariables().isEmpty()) {
+                cmd.variables(message.getVariables());
+            }
+
+            PublishMessageResponse response = cmd.send().join();
+            resultMessage.setMessageKey(response.getMessageKey());
+            resultMessage.setSuccess(true);
+        } catch (Exception exception) {
+            LOG.error("Cannot publish message {}", 
message.getCorrelationKey(), exception);
+            resultMessage.setErrorMessage(exception.getMessage());
+            resultMessage.setSuccess(false);
+        }
+
+        return resultMessage;
+    }
+
+    public JobResponse completeJob(JobRequest message) {
+        JobResponse resultMessage = new JobResponse();
+
+        try {
+            var cmd = client.newCompleteCommand(message.getJobKey());
+            if (!message.getVariables().isEmpty()) {
+                cmd.variables(message.getVariables());
+            }
+            cmd.send().join();
+
+            resultMessage.setSuccess(true);
+        } catch (Exception exception) {
+            LOG.error("Cannot complete Job {}", message.getJobKey(), 
exception);
+            resultMessage.setErrorMessage(exception.getMessage());
+            resultMessage.setSuccess(false);
+        }
+
+        return resultMessage;
+    }
+
+    public JobResponse failJob(JobRequest message) {
+        JobResponse resultMessage = new JobResponse();
+
+        try {
+            var cmd = client.newFailCommand(message.getJobKey())
+                    .retries(message.getRetries());
+            if (message.getFailMessage() != null) {
+                cmd.errorMessage(message.getFailMessage());
+            }
+            cmd.send().join();
+
+            resultMessage.setSuccess(true);
+        } catch (Exception exception) {
+            LOG.error("Cannot fail Job {}", message.getJobKey(), exception);
+            resultMessage.setErrorMessage(exception.getMessage());
+            resultMessage.setSuccess(false);
+        }
+
+        return resultMessage;
+    }
+
+    public JobResponse updateJobRetries(JobRequest message) {
+        JobResponse resultMessage = new JobResponse();
+
+        try {
+            client.newUpdateRetriesCommand(message.getJobKey())
+                    .retries(message.getRetries())
+                    .send()
+                    .join();
+
+            resultMessage.setSuccess(true);
+        } catch (Exception exception) {
+            LOG.error("Cannot update retries for Job {}", message.getJobKey(), 
exception);
+            resultMessage.setErrorMessage(exception.getMessage());
+            resultMessage.setSuccess(false);
+        }
+
+        return resultMessage;
+    }
+
+    public JobResponse throwError(JobRequest message) {
+        JobResponse resultMessage = new JobResponse();
+
+        try {
+            var cmd = client.newThrowErrorCommand(message.getJobKey())
+                    .errorCode(message.getErrorCode());
+            if (message.getErrorMessage() != null) {
+                cmd.errorMessage(message.getErrorMessage());
+            }
+            cmd.send().join();
+
+            resultMessage.setSuccess(true);
+        } catch (Exception exception) {
+            LOG.error("Cannot throw error for Job {}", message.getJobKey(), 
exception);
+            resultMessage.setErrorMessage(exception.getMessage());
+            resultMessage.setSuccess(false);
+        }
+
+        return resultMessage;
+    }
+
+    public DeploymentResponse deployResource(DeploymentRequest message) {
+        DeploymentResponse resultMessage = new DeploymentResponse();
+
+        try {
+            DeploymentEvent event = client.newDeployResourceCommand()
+                    .addResourceBytes(message.getFileContent(), 
message.getName())
+                    .send()
+                    .join();
+
+            List<Process> processes = event.getProcesses();
+            if (!processes.isEmpty()) {
+                Process process = processes.get(0);

Review Comment:
   **Minor**: If a BPMN file contains multiple processes, only the first is 
reported. Consider logging a warning when `processes.size() > 1` so users know 
additional processes were deployed but not reported.



##########
components/camel-camunda/src/main/java/org/apache/camel/component/camunda/CamundaConsumer.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.camunda;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.camunda.client.api.response.ActivatedJob;
+import io.camunda.client.api.worker.JobClient;
+import io.camunda.client.api.worker.JobHandler;
+import io.camunda.client.api.worker.JobWorker;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelException;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.camunda.internal.OperationName;
+import org.apache.camel.component.camunda.model.JobWorkerMessage;
+import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CamundaConsumer extends DefaultConsumer {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CamundaConsumer.class);
+
+    private final CamundaEndpoint endpoint;
+
+    private JobWorker jobWorker;
+
+    private ObjectMapper objectMapper = new ObjectMapper();

Review Comment:
   **Minor**: `ObjectMapper` is thread-safe and expensive to create. Consider 
making it a static final field shared across all instances. Same issue in 
`AbstractBaseProcessor`.
   
   ```suggestion
       private static final ObjectMapper objectMapper = new ObjectMapper();
   ```



##########
components/camel-camunda/src/main/java/org/apache/camel/component/camunda/processor/DeploymentProcessor.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.camunda.processor;
+
+import java.io.InputStream;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.camel.CamelException;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.camunda.CamundaConstants;
+import org.apache.camel.component.camunda.CamundaEndpoint;
+import org.apache.camel.component.camunda.internal.CamundaService;
+import org.apache.camel.component.camunda.model.DeploymentRequest;
+import org.apache.camel.component.camunda.model.DeploymentResponse;
+import org.apache.camel.component.camunda.model.ProcessDeploymentResponse;
+
+public class DeploymentProcessor extends AbstractBaseProcessor {
+    public DeploymentProcessor(CamundaEndpoint endpoint) {
+        super(endpoint);
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        DeploymentRequest message = null;
+
+        Object body = exchange.getMessage().getBody();
+        String headerResourceName = 
exchange.getMessage().getHeader(CamundaConstants.RESOURCE_NAME, String.class);
+        if (headerResourceName != null && (body instanceof String || body 
instanceof byte[] || body instanceof InputStream)) {
+            message = new DeploymentRequest();
+            message.setName(headerResourceName);
+            if (body instanceof String) {
+                message.setFileContent(((String) body).getBytes());
+            } else if (body instanceof byte[]) {
+                message.setFileContent((byte[]) body);
+            } else {
+                message.setFileContent(((InputStream) body).readAllBytes());

Review Comment:
   **Major**: The `InputStream` is read but never closed. If `readAllBytes()` 
fails partway through, the stream leaks. Should use try-with-resources or close 
in a finally block.
   
   ```suggestion
                   try (InputStream is = (InputStream) body) {
                       message.setFileContent(is.readAllBytes());
                   }
   ```



##########
components/camel-camunda/src/main/java/org/apache/camel/component/camunda/internal/OperationName.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.camunda.internal;
+
+public enum OperationName {
+    START_PROCESS("startProcess"),
+    CANCEL_PROCESS("cancelProcess"),
+    PUBLISH_MESSAGE("publishMessage"),
+    COMPLETE_JOB("completeJob"),
+    FAIL_JOB("failJob"),
+    UPDATE_JOB_RETRIES("updateJobRetries"),
+    REGISTER_JOB_WORKER("worker"),
+    THROW_ERROR("throwError"),
+    DEPLOY_RESOURCE("deployResource");
+
+    private final String value;
+
+    OperationName(String value) {
+        this.value = value;
+    }
+
+    public String value() {
+        return value;
+    }
+
+    public static OperationName fromValue(String value) {
+        for (OperationName operationName : OperationName.values()) {
+            if (operationName.value.equals(value)) {
+                return operationName;
+            }
+        }
+        throw new IllegalArgumentException(value);

Review Comment:
   nit:
   ```suggestion
           throw new IllegalArgumentException("Unknown operation name: " + 
value);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to