This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ab32b6  CAMEL-16991 - Fixing thread safety issues with 
ClientConfiguration object in producers (#6135)
9ab32b6 is described below

commit 9ab32b6d193f5c0e40e84db6fae63941533d88e5
Author: Hokutor <[email protected]>
AuthorDate: Wed Sep 22 02:49:09 2021 -0400

    CAMEL-16991 - Fixing thread safety issues with ClientConfiguration object 
in producers (#6135)
    
    Fixing thread safety issues with ClientConfiguration object in producers
    
    Co-Authored-By: Reji Mathews <[email protected]>
    Co-Authored-By: lyndonmiao <[email protected]>
    
    Co-authored-by: Reji Mathews <[email protected]>
    Co-authored-by: lyndonmiao <[email protected]>
---
 .../component/huaweicloud/dms/DMSProducer.java     | 91 ++++++++++------------
 .../org/apache/camel/FunctionGraphProducer.java    | 36 ++++-----
 .../component/huaweicloud/iam/IAMProducer.java     | 59 +++++++-------
 .../component/huaweicloud/obs/OBSProducer.java     | 82 ++++++++++---------
 .../smn/SimpleNotificationProducer.java            | 45 ++++++-----
 5 files changed, 158 insertions(+), 155 deletions(-)

diff --git 
a/components/camel-huawei/camel-huaweicloud-dms/src/main/java/org/apache/camel/component/huaweicloud/dms/DMSProducer.java
 
b/components/camel-huawei/camel-huaweicloud-dms/src/main/java/org/apache/camel/component/huaweicloud/dms/DMSProducer.java
index 601ab8c..394caf6 100644
--- 
a/components/camel-huawei/camel-huaweicloud-dms/src/main/java/org/apache/camel/component/huaweicloud/dms/DMSProducer.java
+++ 
b/components/camel-huawei/camel-huaweicloud-dms/src/main/java/org/apache/camel/component/huaweicloud/dms/DMSProducer.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
 public class DMSProducer extends DefaultProducer {
     private static final Logger LOG = 
LoggerFactory.getLogger(DMSProducer.class);
     private DMSEndpoint endpoint;
-    private ClientConfigurations clientConfigurations;
     private DmsClient dmsClient;
     private ObjectMapper mapper;
 
@@ -57,29 +56,36 @@ public class DMSProducer extends DefaultProducer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        this.clientConfigurations = new ClientConfigurations();
-        this.dmsClient = this.endpoint.initClient();
         this.mapper = new ObjectMapper();
     }
 
     public void process(Exchange exchange) throws Exception {
-        updateClientConfigs(exchange);
+
+        ClientConfigurations clientConfigurations = new ClientConfigurations();
+
+        if (dmsClient == null) {
+            LOG.debug("Initializing SDK client");
+            this.dmsClient = endpoint.initClient();
+            LOG.debug("Successfully initialized SDK client");
+        }
+
+        updateClientConfigs(exchange, clientConfigurations);
 
         switch (clientConfigurations.getOperation()) {
             case DMSOperations.CREATE_INSTANCE:
-                createInstance(exchange);
+                createInstance(exchange, clientConfigurations);
                 break;
             case DMSOperations.DELETE_INSTANCE:
-                deleteInstance(exchange);
+                deleteInstance(exchange, clientConfigurations);
                 break;
             case DMSOperations.LIST_INSTANCES:
-                listInstances(exchange);
+                listInstances(exchange, clientConfigurations);
                 break;
             case DMSOperations.QUERY_INSTANCE:
-                queryInstance(exchange);
+                queryInstance(exchange, clientConfigurations);
                 break;
             case DMSOperations.UPDATE_INSTANCE:
-                updateInstance(exchange);
+                updateInstance(exchange, clientConfigurations);
                 break;
             default:
                 throw new UnsupportedOperationException(
@@ -89,10 +95,12 @@ public class DMSProducer extends DefaultProducer {
 
     /**
      * Perform create instance operation
-     *
-     * @param exchange
+     * 
+     * @param  exchange
+     * @param  clientConfigurations
+     * @throws JsonProcessingException
      */
-    private void createInstance(Exchange exchange) throws 
JsonProcessingException {
+    private void createInstance(Exchange exchange, ClientConfigurations 
clientConfigurations) throws JsonProcessingException {
         CreateInstanceRequestBody body = null;
 
         // checking if user inputted exchange body containing instance 
information. Body must be a CreateInstanceRequestBody or a valid JSON String 
(Advanced users)
@@ -197,10 +205,11 @@ public class DMSProducer extends DefaultProducer {
 
     /**
      * Perform delete instance operation
-     *
+     * 
      * @param exchange
+     * @param clientConfigurations
      */
-    private void deleteInstance(Exchange exchange) {
+    private void deleteInstance(Exchange exchange, ClientConfigurations 
clientConfigurations) {
         // check for instance id, which is mandatory to delete an instance
         if (ObjectHelper.isEmpty(clientConfigurations.getInstanceId())) {
             throw new IllegalArgumentException("Instance id is mandatory to 
delete an instance");
@@ -214,10 +223,12 @@ public class DMSProducer extends DefaultProducer {
 
     /**
      * Perform list instances operation
-     *
-     * @param exchange
+     * 
+     * @param  exchange
+     * @param  clientConfigurations
+     * @throws JsonProcessingException
      */
-    private void listInstances(Exchange exchange) throws 
JsonProcessingException {
+    private void listInstances(Exchange exchange, ClientConfigurations 
clientConfigurations) throws JsonProcessingException {
         ListInstancesRequest request = new ListInstancesRequest()
                 .withEngine(clientConfigurations.getEngine());
         ListInstancesResponse response = dmsClient.listInstances(request);
@@ -226,10 +237,12 @@ public class DMSProducer extends DefaultProducer {
 
     /**
      * Perform query instance operation
-     *
-     * @param exchange
+     * 
+     * @param  exchange
+     * @param  clientConfigurations
+     * @throws JsonProcessingException
      */
-    private void queryInstance(Exchange exchange) throws 
JsonProcessingException {
+    private void queryInstance(Exchange exchange, ClientConfigurations 
clientConfigurations) throws JsonProcessingException {
         // check for instance id, which is mandatory to query an instance
         if (ObjectHelper.isEmpty(clientConfigurations.getInstanceId())) {
             throw new IllegalArgumentException("Instance id is mandatory to 
query an instance");
@@ -243,10 +256,12 @@ public class DMSProducer extends DefaultProducer {
 
     /**
      * Perform update instance operation
-     *
-     * @param exchange
+     * 
+     * @param  exchange
+     * @param  clientConfigurations
+     * @throws JsonProcessingException
      */
-    private void updateInstance(Exchange exchange) throws 
JsonProcessingException {
+    private void updateInstance(Exchange exchange, ClientConfigurations 
clientConfigurations) throws JsonProcessingException {
         // check for instance id, which is mandatory to update an instance
         if (ObjectHelper.isEmpty(clientConfigurations.getInstanceId())) {
             throw new IllegalArgumentException("Instance id is mandatory to 
update an instance");
@@ -276,11 +291,11 @@ public class DMSProducer extends DefaultProducer {
      * Update dynamic client configurations. Some endpoint parameters 
(operation, user ID, and group ID) can also be
      * passed via exchange properties, so they can be updated between each 
transaction. Since they can change, we must
      * clear the previous transaction and update these parameters with their 
new values
-     *
+     * 
      * @param exchange
+     * @param clientConfigurations
      */
-    private void updateClientConfigs(Exchange exchange) {
-        resetDynamicConfigs();
+    private void updateClientConfigs(Exchange exchange, ClientConfigurations 
clientConfigurations) {
 
         // checking for required operation (exchange overrides endpoint 
operation if both are provided)
         if (ObjectHelper.isEmpty(exchange.getProperty(DMSProperties.OPERATION))
@@ -400,28 +415,4 @@ public class DMSProducer extends DefaultProducer {
                         ? (String) 
exchange.getProperty(DMSProperties.STORAGE_SPEC_CODE)
                         : endpoint.getStorageSpecCode());
     }
-
-    /**
-     * Set all dynamic configurations to null
-     */
-    private void resetDynamicConfigs() {
-        clientConfigurations.setOperation(null);
-        clientConfigurations.setEngine(null);
-        clientConfigurations.setInstanceId(null);
-        clientConfigurations.setName(null);
-        clientConfigurations.setEngineVersion(null);
-        clientConfigurations.setSpecification(null);
-        clientConfigurations.setStorageSpace(null);
-        clientConfigurations.setPartitionNum(null);
-        clientConfigurations.setAccessUser(null);
-        clientConfigurations.setPassword(null);
-        clientConfigurations.setVpcId(null);
-        clientConfigurations.setSecurityGroupId(null);
-        clientConfigurations.setSubnetId(null);
-        clientConfigurations.setAvailableZones(null);
-        clientConfigurations.setProductId(null);
-        clientConfigurations.setKafkaManagerUser(null);
-        clientConfigurations.setKafkaManagerPassword(null);
-        clientConfigurations.setStorageSpecCode(null);
-    }
 }
diff --git 
a/components/camel-huawei/camel-huaweicloud-functiongraph/src/main/java/org/apache/camel/FunctionGraphProducer.java
 
b/components/camel-huawei/camel-huaweicloud-functiongraph/src/main/java/org/apache/camel/FunctionGraphProducer.java
index 2a26a81..4c436ef 100644
--- 
a/components/camel-huawei/camel-huaweicloud-functiongraph/src/main/java/org/apache/camel/FunctionGraphProducer.java
+++ 
b/components/camel-huawei/camel-huaweicloud-functiongraph/src/main/java/org/apache/camel/FunctionGraphProducer.java
@@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory;
 public class FunctionGraphProducer extends DefaultProducer {
     private static final Logger LOG = 
LoggerFactory.getLogger(FunctionGraphProducer.class);
     private FunctionGraphEndpoint endpoint;
-    private ClientConfigurations clientConfigurations;
     private FunctionGraphClient functionGraphClient;
 
     public FunctionGraphProducer(FunctionGraphEndpoint endpoint) {
@@ -47,16 +46,22 @@ public class FunctionGraphProducer extends DefaultProducer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        this.clientConfigurations = new ClientConfigurations(this.endpoint);
-        this.functionGraphClient = this.endpoint.initClient();
     }
 
     public void process(Exchange exchange) throws Exception {
-        updateClientConfigs(exchange);
+        ClientConfigurations clientConfigurations = new 
ClientConfigurations(endpoint);
+
+        if (functionGraphClient == null) {
+            LOG.debug("Initializing SDK client");
+            this.functionGraphClient = endpoint.initClient();
+            LOG.debug("Successfully initialized SDK client");
+        }
+
+        updateClientConfigs(exchange, clientConfigurations);
 
         switch (clientConfigurations.getOperation()) {
             case FunctionGraphOperations.INVOKE_FUNCTION:
-                invokeFunction(exchange);
+                invokeFunction(exchange, clientConfigurations);
                 break;
             default:
                 throw new UnsupportedOperationException(
@@ -66,10 +71,11 @@ public class FunctionGraphProducer extends DefaultProducer {
 
     /**
      * Perform invoke function operation and map return object to exchange body
-     *
+     * 
      * @param exchange
+     * @param clientConfigurations
      */
-    private void invokeFunction(Exchange exchange) {
+    private void invokeFunction(Exchange exchange, ClientConfigurations 
clientConfigurations) {
 
         // convert exchange body to Map object
         Object body = exchange.getMessage().getBody();
@@ -117,11 +123,11 @@ public class FunctionGraphProducer extends 
DefaultProducer {
      * Update dynamic client configurations. Some endpoint parameters 
(operation, function name, package, and
      * XCFFLogType) can also be passed via exchange properties, so they can be 
updated between each transaction. Since
      * they can change, we must clear the previous transaction and update 
these parameters with their new values
-     *
+     * 
      * @param exchange
+     * @param clientConfigurations
      */
-    private void updateClientConfigs(Exchange exchange) {
-        resetDynamicConfigs();
+    private void updateClientConfigs(Exchange exchange, ClientConfigurations 
clientConfigurations) {
 
         // checking for required operation
         if 
(ObjectHelper.isEmpty(exchange.getProperty(FunctionGraphProperties.OPERATION))
@@ -163,14 +169,4 @@ public class FunctionGraphProducer extends DefaultProducer 
{
             clientConfigurations.setXCffLogType((String) 
exchange.getProperty(FunctionGraphProperties.XCFFLOGTYPE));
         }
     }
-
-    /**
-     * Set all dynamic configurations to null
-     */
-    private void resetDynamicConfigs() {
-        clientConfigurations.setOperation(null);
-        clientConfigurations.setFunctionName(null);
-        clientConfigurations.setFunctionPackage(null);
-        clientConfigurations.setXCffLogType(null);
-    }
 }
diff --git 
a/components/camel-huawei/camel-huaweicloud-iam/src/main/java/org/apache/camel/component/huaweicloud/iam/IAMProducer.java
 
b/components/camel-huawei/camel-huaweicloud-iam/src/main/java/org/apache/camel/component/huaweicloud/iam/IAMProducer.java
index e959d9a..522a876 100644
--- 
a/components/camel-huawei/camel-huaweicloud-iam/src/main/java/org/apache/camel/component/huaweicloud/iam/IAMProducer.java
+++ 
b/components/camel-huawei/camel-huaweicloud-iam/src/main/java/org/apache/camel/component/huaweicloud/iam/IAMProducer.java
@@ -48,7 +48,6 @@ import org.slf4j.LoggerFactory;
 public class IAMProducer extends DefaultProducer {
     private static final Logger LOG = 
LoggerFactory.getLogger(IAMProducer.class);
     private IAMEndpoint endpoint;
-    private ClientConfigurations clientConfigurations;
     private IamClient iamClient;
     private Gson gson;
 
@@ -60,32 +59,39 @@ public class IAMProducer extends DefaultProducer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        this.clientConfigurations = new ClientConfigurations();
-        this.iamClient = this.endpoint.initClient();
         this.gson = new Gson();
     }
 
     public void process(Exchange exchange) throws Exception {
-        updateClientConfigs(exchange);
+
+        ClientConfigurations clientConfigurations = new ClientConfigurations();
+
+        if (this.iamClient == null) {
+            LOG.info("Initializing SDK client");
+            this.iamClient = endpoint.initClient();
+            LOG.info("IAM client initialized");
+        }
+
+        updateClientConfigs(exchange, clientConfigurations);
 
         switch (clientConfigurations.getOperation()) {
             case IAMOperations.LIST_USERS:
                 listUsers(exchange);
                 break;
             case IAMOperations.GET_USER:
-                getUser(exchange);
+                getUser(exchange, clientConfigurations);
                 break;
             case IAMOperations.UPDATE_USER:
-                updateUser(exchange);
+                updateUser(exchange, clientConfigurations);
                 break;
             case IAMOperations.LIST_GROUPS:
                 listGroups(exchange);
                 break;
             case IAMOperations.GET_GROUP_USERS:
-                getGroupUsers(exchange);
+                getGroupUsers(exchange, clientConfigurations);
                 break;
             case IAMOperations.UPDATE_GROUP:
-                updateGroup(exchange);
+                updateGroup(exchange, clientConfigurations);
                 break;
             default:
                 throw new UnsupportedOperationException(
@@ -107,10 +113,11 @@ public class IAMProducer extends DefaultProducer {
 
     /**
      * Perform get user operation
-     *
+     * 
      * @param exchange
+     * @param clientConfigurations
      */
-    private void getUser(Exchange exchange) {
+    private void getUser(Exchange exchange, ClientConfigurations 
clientConfigurations) {
         // check for user id, which is mandatory to get user
         if (ObjectHelper.isEmpty(clientConfigurations.getUserId())) {
             if (LOG.isErrorEnabled()) {
@@ -128,10 +135,11 @@ public class IAMProducer extends DefaultProducer {
 
     /**
      * Perform update user operation
-     *
+     * 
      * @param exchange
+     * @param clientConfigurations
      */
-    private void updateUser(Exchange exchange) {
+    private void updateUser(Exchange exchange, ClientConfigurations 
clientConfigurations) {
         // checking for valid exchange body containing user information. Body 
must be an UpdateUserOption object or a JSON string
         Object body = exchange.getMessage().getBody();
         UpdateUserOption userOption;
@@ -169,7 +177,7 @@ public class IAMProducer extends DefaultProducer {
 
     /**
      * Perform list groups operation
-     *
+     * 
      * @param exchange
      */
     private void listGroups(Exchange exchange) {
@@ -181,10 +189,11 @@ public class IAMProducer extends DefaultProducer {
 
     /**
      * Perform get group users operation
-     *
+     * 
      * @param exchange
+     * @param clientConfigurations
      */
-    private void getGroupUsers(Exchange exchange) {
+    private void getGroupUsers(Exchange exchange, ClientConfigurations 
clientConfigurations) {
         // check for group id, which is mandatory for getting group users
         if (ObjectHelper.isEmpty(clientConfigurations.getGroupId())) {
             if (LOG.isErrorEnabled()) {
@@ -202,10 +211,11 @@ public class IAMProducer extends DefaultProducer {
 
     /**
      * Perform update group operation
-     *
+     * 
      * @param exchange
+     * @param clientConfigurations
      */
-    private void updateGroup(Exchange exchange) {
+    private void updateGroup(Exchange exchange, ClientConfigurations 
clientConfigurations) {
         // checking for valid exchange body containing group information. Body 
must be an KeystoneUpdateGroupOption object or a JSON string
         Object body = exchange.getMessage().getBody();
         KeystoneUpdateGroupOption groupOption;
@@ -245,11 +255,11 @@ public class IAMProducer extends DefaultProducer {
      * Update dynamic client configurations. Some endpoint parameters 
(operation, user ID, and group ID) can also be
      * passed via exchange properties, so they can be updated between each 
transaction. Since they can change, we must
      * clear the previous transaction and update these parameters with their 
new values
-     *
+     * 
      * @param exchange
+     * @param clientConfigurations
      */
-    private void updateClientConfigs(Exchange exchange) {
-        resetDynamicConfigs();
+    private void updateClientConfigs(Exchange exchange, ClientConfigurations 
clientConfigurations) {
 
         // checking for required operation (exchange overrides endpoint 
operation if both are provided)
         if (ObjectHelper.isEmpty(exchange.getProperty(IAMProperties.OPERATION))
@@ -283,13 +293,4 @@ public class IAMProducer extends DefaultProducer {
                             : endpoint.getGroupId());
         }
     }
-
-    /**
-     * Set all dynamic configurations to null
-     */
-    private void resetDynamicConfigs() {
-        clientConfigurations.setOperation(null);
-        clientConfigurations.setUserId(null);
-        clientConfigurations.setGroupId(null);
-    }
 }
diff --git 
a/components/camel-huawei/camel-huaweicloud-obs/src/main/java/org/apache/camel/component/huaweicloud/obs/OBSProducer.java
 
b/components/camel-huawei/camel-huaweicloud-obs/src/main/java/org/apache/camel/component/huaweicloud/obs/OBSProducer.java
index 86f564a..c222ced 100644
--- 
a/components/camel-huawei/camel-huaweicloud-obs/src/main/java/org/apache/camel/component/huaweicloud/obs/OBSProducer.java
+++ 
b/components/camel-huawei/camel-huaweicloud-obs/src/main/java/org/apache/camel/component/huaweicloud/obs/OBSProducer.java
@@ -48,7 +48,6 @@ import org.slf4j.LoggerFactory;
 public class OBSProducer extends DefaultProducer {
     private static final Logger LOG = 
LoggerFactory.getLogger(OBSProducer.class);
     private OBSEndpoint endpoint;
-    private ClientConfigurations clientConfigurations;
     private ObsClient obsClient;
     private Gson gson;
 
@@ -60,32 +59,37 @@ public class OBSProducer extends DefaultProducer {
     @Override
     protected void doInit() throws Exception {
         super.doInit();
-        this.clientConfigurations = new ClientConfigurations();
-        this.obsClient = this.endpoint.initClient();
         this.gson = new Gson();
     }
 
     public void process(Exchange exchange) throws Exception {
-        updateClientConfigs(exchange);
+
+        ClientConfigurations clientConfigurations = new ClientConfigurations();
+
+        if (obsClient == null) {
+            this.obsClient = endpoint.initClient();
+        }
+
+        updateClientConfigs(exchange, clientConfigurations);
 
         switch (clientConfigurations.getOperation()) {
             case OBSOperations.LIST_BUCKETS:
                 listBuckets(exchange);
                 break;
             case OBSOperations.CREATE_BUCKET:
-                createBucket(exchange);
+                createBucket(exchange, clientConfigurations);
                 break;
             case OBSOperations.DELETE_BUCKET:
-                deleteBucket(exchange);
+                deleteBucket(exchange, clientConfigurations);
                 break;
             case OBSOperations.CHECK_BUCKET_EXISTS:
-                checkBucketExists(exchange);
+                checkBucketExists(exchange, clientConfigurations);
                 break;
             case OBSOperations.GET_BUCKET_METADATA:
-                getBucketMetadata(exchange);
+                getBucketMetadata(exchange, clientConfigurations);
                 break;
             case OBSOperations.LIST_OBJECTS:
-                listObjects(exchange);
+                listObjects(exchange, clientConfigurations);
                 break;
             default:
                 throw new UnsupportedOperationException(
@@ -95,8 +99,9 @@ public class OBSProducer extends DefaultProducer {
 
     /**
      * Perform list buckets operation
-     *
-     * @param exchange
+     * 
+     * @param  exchange
+     * @throws ObsException
      */
     private void listBuckets(Exchange exchange) throws ObsException {
         // invoke list buckets method and map response object to exchange body
@@ -107,10 +112,12 @@ public class OBSProducer extends DefaultProducer {
 
     /**
      * Perform create bucket operation
-     *
-     * @param exchange
+     * 
+     * @param  exchange
+     * @param  clientConfigurations
+     * @throws ObsException
      */
-    private void createBucket(Exchange exchange) throws ObsException {
+    private void createBucket(Exchange exchange, ClientConfigurations 
clientConfigurations) throws ObsException {
         CreateBucketRequest request = null;
 
         // checking if user inputted exchange body containing bucket 
information. Body must be a CreateBucketRequest or a valid JSON string 
(Advanced users)
@@ -153,10 +160,12 @@ public class OBSProducer extends DefaultProducer {
 
     /**
      * Perform delete bucket operation
-     *
-     * @param exchange
+     * 
+     * @param  exchange
+     * @param  clientConfigurations
+     * @throws ObsException
      */
-    private void deleteBucket(Exchange exchange) throws ObsException {
+    private void deleteBucket(Exchange exchange, ClientConfigurations 
clientConfigurations) throws ObsException {
         // check for bucket name, which is mandatory to delete a bucket
         if (ObjectHelper.isEmpty(clientConfigurations.getBucketName())) {
             LOG.error("No bucket name given");
@@ -170,10 +179,12 @@ public class OBSProducer extends DefaultProducer {
 
     /**
      * Perform check bucket exists operation
-     *
-     * @param exchange
+     * 
+     * @param  exchange
+     * @param  clientConfigurations
+     * @throws ObsException
      */
-    private void checkBucketExists(Exchange exchange) throws ObsException {
+    private void checkBucketExists(Exchange exchange, ClientConfigurations 
clientConfigurations) throws ObsException {
         // check for bucket name, which is mandatory to check if a bucket 
exists
         if (ObjectHelper.isEmpty(clientConfigurations.getBucketName())) {
             LOG.error("No bucket name given");
@@ -187,10 +198,12 @@ public class OBSProducer extends DefaultProducer {
 
     /**
      * Perform get bucket metadata operation
-     *
-     * @param exchange
+     * 
+     * @param  exchange
+     * @param  clientConfigurations
+     * @throws ObsException
      */
-    private void getBucketMetadata(Exchange exchange) throws ObsException {
+    private void getBucketMetadata(Exchange exchange, ClientConfigurations 
clientConfigurations) throws ObsException {
         // check for bucket name, which is mandatory to get bucket metadata
         if (ObjectHelper.isEmpty(clientConfigurations.getBucketName())) {
             LOG.error("No bucket name given");
@@ -205,10 +218,12 @@ public class OBSProducer extends DefaultProducer {
 
     /**
      * Perform list objects operation
-     *
-     * @param exchange
+     * 
+     * @param  exchange
+     * @param  clientConfigurations
+     * @throws ObsException
      */
-    private void listObjects(Exchange exchange) throws ObsException {
+    private void listObjects(Exchange exchange, ClientConfigurations 
clientConfigurations) throws ObsException {
         ListObjectsRequest request = null;
 
         // checking if user inputted exchange body containing list objects 
information. Body must be a ListObjectsRequest or a valid JSON string (Advanced 
users)
@@ -251,11 +266,11 @@ public class OBSProducer extends DefaultProducer {
      * Update dynamic client configurations. Some endpoint parameters 
(operation, and bucket name and location) can also
      * be passed via exchange properties, so they can be updated between each 
transaction. Since they can change, we
      * must clear the previous transaction and update these parameters with 
their new values
-     *
+     * 
      * @param exchange
+     * @param clientConfigurations
      */
-    private void updateClientConfigs(Exchange exchange) {
-        resetDynamicConfigs();
+    private void updateClientConfigs(Exchange exchange, ClientConfigurations 
clientConfigurations) {
 
         // checking for required operation (exchange overrides endpoint 
operation if both are provided)
         if (ObjectHelper.isEmpty(exchange.getProperty(OBSProperties.OPERATION))
@@ -287,13 +302,4 @@ public class OBSProducer extends DefaultProducer {
                             : endpoint.getBucketLocation());
         }
     }
-
-    /**
-     * Set all dynamic configurations to null
-     */
-    private void resetDynamicConfigs() {
-        clientConfigurations.setOperation(null);
-        clientConfigurations.setBucketName(null);
-        clientConfigurations.setBucketLocation(null);
-    }
 }
diff --git 
a/components/camel-huawei/camel-huaweicloud-smn/src/main/java/org/apache/camel/component/huaweicloud/smn/SimpleNotificationProducer.java
 
b/components/camel-huawei/camel-huaweicloud-smn/src/main/java/org/apache/camel/component/huaweicloud/smn/SimpleNotificationProducer.java
index 04f14a9..c5de448 100644
--- 
a/components/camel-huawei/camel-huaweicloud-smn/src/main/java/org/apache/camel/component/huaweicloud/smn/SimpleNotificationProducer.java
+++ 
b/components/camel-huawei/camel-huaweicloud-smn/src/main/java/org/apache/camel/component/huaweicloud/smn/SimpleNotificationProducer.java
@@ -39,7 +39,6 @@ import org.slf4j.LoggerFactory;
 public class SimpleNotificationProducer extends DefaultProducer {
     private static final Logger LOG = 
LoggerFactory.getLogger(SimpleNotificationProducer.class);
     private SmnClient smnClient;
-    private ClientConfigurations clientConfigurations;
 
     public SimpleNotificationProducer(SimpleNotificationEndpoint endpoint) {
         super(endpoint);
@@ -48,11 +47,16 @@ public class SimpleNotificationProducer extends 
DefaultProducer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        validateAndInitializeSmnClient((SimpleNotificationEndpoint) 
super.getEndpoint());
     }
 
     public void process(Exchange exchange) throws Exception {
 
+        ClientConfigurations clientConfigurations = new ClientConfigurations();
+
+        if (smnClient == null) {
+            validateAndInitializeSmnClient((SimpleNotificationEndpoint) 
super.getEndpoint(), clientConfigurations);
+        }
+
         String service = ((SimpleNotificationEndpoint) 
super.getEndpoint()).getSmnService();
 
         if (!ObjectHelper.isEmpty(service)) {
@@ -61,7 +65,8 @@ public class SimpleNotificationProducer extends 
DefaultProducer {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Using message publishing service");
                     }
-                    
performPublishMessageServiceOperations((SimpleNotificationEndpoint) 
super.getEndpoint(), exchange);
+                    
performPublishMessageServiceOperations((SimpleNotificationEndpoint) 
super.getEndpoint(), exchange,
+                            clientConfigurations);
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Completed publishing message");
                     }
@@ -82,15 +87,19 @@ public class SimpleNotificationProducer extends 
DefaultProducer {
 
     /**
      * Publish message service operations
-     *
+     * 
      * @param endpoint
      * @param exchange
+     * @param clientConfigurations
      */
-    private void 
performPublishMessageServiceOperations(SimpleNotificationEndpoint endpoint, 
Exchange exchange) {
+    private void performPublishMessageServiceOperations(
+            SimpleNotificationEndpoint endpoint,
+            Exchange exchange,
+            ClientConfigurations clientConfigurations) {
         PublishMessageResponse response;
 
         PublishMessageRequestBody apiBody;
-        this.clientConfigurations = validateServiceConfigurations(endpoint, 
exchange);
+        validateServiceConfigurations(endpoint, exchange, 
clientConfigurations);
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Checking operation name");
@@ -156,10 +165,13 @@ public class SimpleNotificationProducer extends 
DefaultProducer {
 
     /**
      * validation and initialization of SmnClient object
-     *
+     * 
      * @param simpleNotificationEndpoint
+     * @param clientConfigurations
      */
-    private void validateAndInitializeSmnClient(SimpleNotificationEndpoint 
simpleNotificationEndpoint) {
+    private void validateAndInitializeSmnClient(
+            SimpleNotificationEndpoint simpleNotificationEndpoint,
+            ClientConfigurations clientConfigurations) {
         if (simpleNotificationEndpoint.getSmnClient() != null) {
             if (LOG.isWarnEnabled()) {
                 LOG.warn(
@@ -168,7 +180,6 @@ public class SimpleNotificationProducer extends 
DefaultProducer {
             this.smnClient = simpleNotificationEndpoint.getSmnClient();
             return;
         }
-        this.clientConfigurations = new ClientConfigurations();
 
         //checking for cloud SK (secret key)
         if (ObjectHelper.isEmpty(simpleNotificationEndpoint.getSecretKey()) &&
@@ -292,15 +303,15 @@ public class SimpleNotificationProducer extends 
DefaultProducer {
 
     /**
      * validation of all user inputs before attempting to invoke a service 
operation
-     *
-     * @param  simpleNotificationEndpoint
-     * @param  exchange
-     * @return
+     * 
+     * @param simpleNotificationEndpoint
+     * @param exchange
+     * @param clientConfigurations
      */
-    private ClientConfigurations validateServiceConfigurations(
-            SimpleNotificationEndpoint simpleNotificationEndpoint, Exchange 
exchange) {
+    private void validateServiceConfigurations(
+            SimpleNotificationEndpoint simpleNotificationEndpoint, Exchange 
exchange,
+            ClientConfigurations clientConfigurations) {
 
-        ClientConfigurations clientConfigurations = new ClientConfigurations();
         if (LOG.isDebugEnabled()) {
             LOG.debug("Inspecting exchange body");
         }
@@ -369,7 +380,5 @@ public class SimpleNotificationProducer extends 
DefaultProducer {
         } else {
             clientConfigurations.setMessageTtl((int) 
exchange.getProperty(SmnProperties.NOTIFICATION_TTL));
         }
-
-        return clientConfigurations;
     }
 }

Reply via email to