This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 9ab32b6 CAMEL-16991 - Fixing thread safety issues with
ClientConfiguration object in producers (#6135)
9ab32b6 is described below
commit 9ab32b6d193f5c0e40e84db6fae63941533d88e5
Author: Hokutor <[email protected]>
AuthorDate: Wed Sep 22 02:49:09 2021 -0400
CAMEL-16991 - Fixing thread safety issues with ClientConfiguration object
in producers (#6135)
Fixing thread safety issues with ClientConfiguration object in producers
Co-Authored-By: Reji Mathews <[email protected]>
Co-Authored-By: lyndonmiao <[email protected]>
Co-authored-by: Reji Mathews <[email protected]>
Co-authored-by: lyndonmiao <[email protected]>
---
.../component/huaweicloud/dms/DMSProducer.java | 91 ++++++++++------------
.../org/apache/camel/FunctionGraphProducer.java | 36 ++++-----
.../component/huaweicloud/iam/IAMProducer.java | 59 +++++++-------
.../component/huaweicloud/obs/OBSProducer.java | 82 ++++++++++---------
.../smn/SimpleNotificationProducer.java | 45 ++++++-----
5 files changed, 158 insertions(+), 155 deletions(-)
diff --git
a/components/camel-huawei/camel-huaweicloud-dms/src/main/java/org/apache/camel/component/huaweicloud/dms/DMSProducer.java
b/components/camel-huawei/camel-huaweicloud-dms/src/main/java/org/apache/camel/component/huaweicloud/dms/DMSProducer.java
index 601ab8c..394caf6 100644
---
a/components/camel-huawei/camel-huaweicloud-dms/src/main/java/org/apache/camel/component/huaweicloud/dms/DMSProducer.java
+++
b/components/camel-huawei/camel-huaweicloud-dms/src/main/java/org/apache/camel/component/huaweicloud/dms/DMSProducer.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
public class DMSProducer extends DefaultProducer {
private static final Logger LOG =
LoggerFactory.getLogger(DMSProducer.class);
private DMSEndpoint endpoint;
- private ClientConfigurations clientConfigurations;
private DmsClient dmsClient;
private ObjectMapper mapper;
@@ -57,29 +56,36 @@ public class DMSProducer extends DefaultProducer {
@Override
protected void doStart() throws Exception {
super.doStart();
- this.clientConfigurations = new ClientConfigurations();
- this.dmsClient = this.endpoint.initClient();
this.mapper = new ObjectMapper();
}
public void process(Exchange exchange) throws Exception {
- updateClientConfigs(exchange);
+
+ ClientConfigurations clientConfigurations = new ClientConfigurations();
+
+ if (dmsClient == null) {
+ LOG.debug("Initializing SDK client");
+ this.dmsClient = endpoint.initClient();
+ LOG.debug("Successfully initialized SDK client");
+ }
+
+ updateClientConfigs(exchange, clientConfigurations);
switch (clientConfigurations.getOperation()) {
case DMSOperations.CREATE_INSTANCE:
- createInstance(exchange);
+ createInstance(exchange, clientConfigurations);
break;
case DMSOperations.DELETE_INSTANCE:
- deleteInstance(exchange);
+ deleteInstance(exchange, clientConfigurations);
break;
case DMSOperations.LIST_INSTANCES:
- listInstances(exchange);
+ listInstances(exchange, clientConfigurations);
break;
case DMSOperations.QUERY_INSTANCE:
- queryInstance(exchange);
+ queryInstance(exchange, clientConfigurations);
break;
case DMSOperations.UPDATE_INSTANCE:
- updateInstance(exchange);
+ updateInstance(exchange, clientConfigurations);
break;
default:
throw new UnsupportedOperationException(
@@ -89,10 +95,12 @@ public class DMSProducer extends DefaultProducer {
/**
* Perform create instance operation
- *
- * @param exchange
+ *
+ * @param exchange
+ * @param clientConfigurations
+ * @throws JsonProcessingException
*/
- private void createInstance(Exchange exchange) throws
JsonProcessingException {
+ private void createInstance(Exchange exchange, ClientConfigurations
clientConfigurations) throws JsonProcessingException {
CreateInstanceRequestBody body = null;
// checking if user inputted exchange body containing instance
information. Body must be a CreateInstanceRequestBody or a valid JSON String
(Advanced users)
@@ -197,10 +205,11 @@ public class DMSProducer extends DefaultProducer {
/**
* Perform delete instance operation
- *
+ *
* @param exchange
+ * @param clientConfigurations
*/
- private void deleteInstance(Exchange exchange) {
+ private void deleteInstance(Exchange exchange, ClientConfigurations
clientConfigurations) {
// check for instance id, which is mandatory to delete an instance
if (ObjectHelper.isEmpty(clientConfigurations.getInstanceId())) {
throw new IllegalArgumentException("Instance id is mandatory to
delete an instance");
@@ -214,10 +223,12 @@ public class DMSProducer extends DefaultProducer {
/**
* Perform list instances operation
- *
- * @param exchange
+ *
+ * @param exchange
+ * @param clientConfigurations
+ * @throws JsonProcessingException
*/
- private void listInstances(Exchange exchange) throws
JsonProcessingException {
+ private void listInstances(Exchange exchange, ClientConfigurations
clientConfigurations) throws JsonProcessingException {
ListInstancesRequest request = new ListInstancesRequest()
.withEngine(clientConfigurations.getEngine());
ListInstancesResponse response = dmsClient.listInstances(request);
@@ -226,10 +237,12 @@ public class DMSProducer extends DefaultProducer {
/**
* Perform query instance operation
- *
- * @param exchange
+ *
+ * @param exchange
+ * @param clientConfigurations
+ * @throws JsonProcessingException
*/
- private void queryInstance(Exchange exchange) throws
JsonProcessingException {
+ private void queryInstance(Exchange exchange, ClientConfigurations
clientConfigurations) throws JsonProcessingException {
// check for instance id, which is mandatory to query an instance
if (ObjectHelper.isEmpty(clientConfigurations.getInstanceId())) {
throw new IllegalArgumentException("Instance id is mandatory to
query an instance");
@@ -243,10 +256,12 @@ public class DMSProducer extends DefaultProducer {
/**
* Perform update instance operation
- *
- * @param exchange
+ *
+ * @param exchange
+ * @param clientConfigurations
+ * @throws JsonProcessingException
*/
- private void updateInstance(Exchange exchange) throws
JsonProcessingException {
+ private void updateInstance(Exchange exchange, ClientConfigurations
clientConfigurations) throws JsonProcessingException {
// check for instance id, which is mandatory to update an instance
if (ObjectHelper.isEmpty(clientConfigurations.getInstanceId())) {
throw new IllegalArgumentException("Instance id is mandatory to
update an instance");
@@ -276,11 +291,11 @@ public class DMSProducer extends DefaultProducer {
* Update dynamic client configurations. Some endpoint parameters
(operation, user ID, and group ID) can also be
* passed via exchange properties, so they can be updated between each
transaction. Since they can change, we must
* clear the previous transaction and update these parameters with their
new values
- *
+ *
* @param exchange
+ * @param clientConfigurations
*/
- private void updateClientConfigs(Exchange exchange) {
- resetDynamicConfigs();
+ private void updateClientConfigs(Exchange exchange, ClientConfigurations
clientConfigurations) {
// checking for required operation (exchange overrides endpoint
operation if both are provided)
if (ObjectHelper.isEmpty(exchange.getProperty(DMSProperties.OPERATION))
@@ -400,28 +415,4 @@ public class DMSProducer extends DefaultProducer {
? (String)
exchange.getProperty(DMSProperties.STORAGE_SPEC_CODE)
: endpoint.getStorageSpecCode());
}
-
- /**
- * Set all dynamic configurations to null
- */
- private void resetDynamicConfigs() {
- clientConfigurations.setOperation(null);
- clientConfigurations.setEngine(null);
- clientConfigurations.setInstanceId(null);
- clientConfigurations.setName(null);
- clientConfigurations.setEngineVersion(null);
- clientConfigurations.setSpecification(null);
- clientConfigurations.setStorageSpace(null);
- clientConfigurations.setPartitionNum(null);
- clientConfigurations.setAccessUser(null);
- clientConfigurations.setPassword(null);
- clientConfigurations.setVpcId(null);
- clientConfigurations.setSecurityGroupId(null);
- clientConfigurations.setSubnetId(null);
- clientConfigurations.setAvailableZones(null);
- clientConfigurations.setProductId(null);
- clientConfigurations.setKafkaManagerUser(null);
- clientConfigurations.setKafkaManagerPassword(null);
- clientConfigurations.setStorageSpecCode(null);
- }
}
diff --git
a/components/camel-huawei/camel-huaweicloud-functiongraph/src/main/java/org/apache/camel/FunctionGraphProducer.java
b/components/camel-huawei/camel-huaweicloud-functiongraph/src/main/java/org/apache/camel/FunctionGraphProducer.java
index 2a26a81..4c436ef 100644
---
a/components/camel-huawei/camel-huaweicloud-functiongraph/src/main/java/org/apache/camel/FunctionGraphProducer.java
+++
b/components/camel-huawei/camel-huaweicloud-functiongraph/src/main/java/org/apache/camel/FunctionGraphProducer.java
@@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory;
public class FunctionGraphProducer extends DefaultProducer {
private static final Logger LOG =
LoggerFactory.getLogger(FunctionGraphProducer.class);
private FunctionGraphEndpoint endpoint;
- private ClientConfigurations clientConfigurations;
private FunctionGraphClient functionGraphClient;
public FunctionGraphProducer(FunctionGraphEndpoint endpoint) {
@@ -47,16 +46,22 @@ public class FunctionGraphProducer extends DefaultProducer {
@Override
protected void doStart() throws Exception {
super.doStart();
- this.clientConfigurations = new ClientConfigurations(this.endpoint);
- this.functionGraphClient = this.endpoint.initClient();
}
public void process(Exchange exchange) throws Exception {
- updateClientConfigs(exchange);
+ ClientConfigurations clientConfigurations = new
ClientConfigurations(endpoint);
+
+ if (functionGraphClient == null) {
+ LOG.debug("Initializing SDK client");
+ this.functionGraphClient = endpoint.initClient();
+ LOG.debug("Successfully initialized SDK client");
+ }
+
+ updateClientConfigs(exchange, clientConfigurations);
switch (clientConfigurations.getOperation()) {
case FunctionGraphOperations.INVOKE_FUNCTION:
- invokeFunction(exchange);
+ invokeFunction(exchange, clientConfigurations);
break;
default:
throw new UnsupportedOperationException(
@@ -66,10 +71,11 @@ public class FunctionGraphProducer extends DefaultProducer {
/**
* Perform invoke function operation and map return object to exchange body
- *
+ *
* @param exchange
+ * @param clientConfigurations
*/
- private void invokeFunction(Exchange exchange) {
+ private void invokeFunction(Exchange exchange, ClientConfigurations
clientConfigurations) {
// convert exchange body to Map object
Object body = exchange.getMessage().getBody();
@@ -117,11 +123,11 @@ public class FunctionGraphProducer extends
DefaultProducer {
* Update dynamic client configurations. Some endpoint parameters
(operation, function name, package, and
* XCFFLogType) can also be passed via exchange properties, so they can be
updated between each transaction. Since
* they can change, we must clear the previous transaction and update
these parameters with their new values
- *
+ *
* @param exchange
+ * @param clientConfigurations
*/
- private void updateClientConfigs(Exchange exchange) {
- resetDynamicConfigs();
+ private void updateClientConfigs(Exchange exchange, ClientConfigurations
clientConfigurations) {
// checking for required operation
if
(ObjectHelper.isEmpty(exchange.getProperty(FunctionGraphProperties.OPERATION))
@@ -163,14 +169,4 @@ public class FunctionGraphProducer extends DefaultProducer
{
clientConfigurations.setXCffLogType((String)
exchange.getProperty(FunctionGraphProperties.XCFFLOGTYPE));
}
}
-
- /**
- * Set all dynamic configurations to null
- */
- private void resetDynamicConfigs() {
- clientConfigurations.setOperation(null);
- clientConfigurations.setFunctionName(null);
- clientConfigurations.setFunctionPackage(null);
- clientConfigurations.setXCffLogType(null);
- }
}
diff --git
a/components/camel-huawei/camel-huaweicloud-iam/src/main/java/org/apache/camel/component/huaweicloud/iam/IAMProducer.java
b/components/camel-huawei/camel-huaweicloud-iam/src/main/java/org/apache/camel/component/huaweicloud/iam/IAMProducer.java
index e959d9a..522a876 100644
---
a/components/camel-huawei/camel-huaweicloud-iam/src/main/java/org/apache/camel/component/huaweicloud/iam/IAMProducer.java
+++
b/components/camel-huawei/camel-huaweicloud-iam/src/main/java/org/apache/camel/component/huaweicloud/iam/IAMProducer.java
@@ -48,7 +48,6 @@ import org.slf4j.LoggerFactory;
public class IAMProducer extends DefaultProducer {
private static final Logger LOG =
LoggerFactory.getLogger(IAMProducer.class);
private IAMEndpoint endpoint;
- private ClientConfigurations clientConfigurations;
private IamClient iamClient;
private Gson gson;
@@ -60,32 +59,39 @@ public class IAMProducer extends DefaultProducer {
@Override
protected void doStart() throws Exception {
super.doStart();
- this.clientConfigurations = new ClientConfigurations();
- this.iamClient = this.endpoint.initClient();
this.gson = new Gson();
}
public void process(Exchange exchange) throws Exception {
- updateClientConfigs(exchange);
+
+ ClientConfigurations clientConfigurations = new ClientConfigurations();
+
+ if (this.iamClient == null) {
+ LOG.info("Initializing SDK client");
+ this.iamClient = endpoint.initClient();
+ LOG.info("IAM client initialized");
+ }
+
+ updateClientConfigs(exchange, clientConfigurations);
switch (clientConfigurations.getOperation()) {
case IAMOperations.LIST_USERS:
listUsers(exchange);
break;
case IAMOperations.GET_USER:
- getUser(exchange);
+ getUser(exchange, clientConfigurations);
break;
case IAMOperations.UPDATE_USER:
- updateUser(exchange);
+ updateUser(exchange, clientConfigurations);
break;
case IAMOperations.LIST_GROUPS:
listGroups(exchange);
break;
case IAMOperations.GET_GROUP_USERS:
- getGroupUsers(exchange);
+ getGroupUsers(exchange, clientConfigurations);
break;
case IAMOperations.UPDATE_GROUP:
- updateGroup(exchange);
+ updateGroup(exchange, clientConfigurations);
break;
default:
throw new UnsupportedOperationException(
@@ -107,10 +113,11 @@ public class IAMProducer extends DefaultProducer {
/**
* Perform get user operation
- *
+ *
* @param exchange
+ * @param clientConfigurations
*/
- private void getUser(Exchange exchange) {
+ private void getUser(Exchange exchange, ClientConfigurations
clientConfigurations) {
// check for user id, which is mandatory to get user
if (ObjectHelper.isEmpty(clientConfigurations.getUserId())) {
if (LOG.isErrorEnabled()) {
@@ -128,10 +135,11 @@ public class IAMProducer extends DefaultProducer {
/**
* Perform update user operation
- *
+ *
* @param exchange
+ * @param clientConfigurations
*/
- private void updateUser(Exchange exchange) {
+ private void updateUser(Exchange exchange, ClientConfigurations
clientConfigurations) {
// checking for valid exchange body containing user information. Body
must be an UpdateUserOption object or a JSON string
Object body = exchange.getMessage().getBody();
UpdateUserOption userOption;
@@ -169,7 +177,7 @@ public class IAMProducer extends DefaultProducer {
/**
* Perform list groups operation
- *
+ *
* @param exchange
*/
private void listGroups(Exchange exchange) {
@@ -181,10 +189,11 @@ public class IAMProducer extends DefaultProducer {
/**
* Perform get group users operation
- *
+ *
* @param exchange
+ * @param clientConfigurations
*/
- private void getGroupUsers(Exchange exchange) {
+ private void getGroupUsers(Exchange exchange, ClientConfigurations
clientConfigurations) {
// check for group id, which is mandatory for getting group users
if (ObjectHelper.isEmpty(clientConfigurations.getGroupId())) {
if (LOG.isErrorEnabled()) {
@@ -202,10 +211,11 @@ public class IAMProducer extends DefaultProducer {
/**
* Perform update group operation
- *
+ *
* @param exchange
+ * @param clientConfigurations
*/
- private void updateGroup(Exchange exchange) {
+ private void updateGroup(Exchange exchange, ClientConfigurations
clientConfigurations) {
// checking for valid exchange body containing group information. Body
must be an KeystoneUpdateGroupOption object or a JSON string
Object body = exchange.getMessage().getBody();
KeystoneUpdateGroupOption groupOption;
@@ -245,11 +255,11 @@ public class IAMProducer extends DefaultProducer {
* Update dynamic client configurations. Some endpoint parameters
(operation, user ID, and group ID) can also be
* passed via exchange properties, so they can be updated between each
transaction. Since they can change, we must
* clear the previous transaction and update these parameters with their
new values
- *
+ *
* @param exchange
+ * @param clientConfigurations
*/
- private void updateClientConfigs(Exchange exchange) {
- resetDynamicConfigs();
+ private void updateClientConfigs(Exchange exchange, ClientConfigurations
clientConfigurations) {
// checking for required operation (exchange overrides endpoint
operation if both are provided)
if (ObjectHelper.isEmpty(exchange.getProperty(IAMProperties.OPERATION))
@@ -283,13 +293,4 @@ public class IAMProducer extends DefaultProducer {
: endpoint.getGroupId());
}
}
-
- /**
- * Set all dynamic configurations to null
- */
- private void resetDynamicConfigs() {
- clientConfigurations.setOperation(null);
- clientConfigurations.setUserId(null);
- clientConfigurations.setGroupId(null);
- }
}
diff --git
a/components/camel-huawei/camel-huaweicloud-obs/src/main/java/org/apache/camel/component/huaweicloud/obs/OBSProducer.java
b/components/camel-huawei/camel-huaweicloud-obs/src/main/java/org/apache/camel/component/huaweicloud/obs/OBSProducer.java
index 86f564a..c222ced 100644
---
a/components/camel-huawei/camel-huaweicloud-obs/src/main/java/org/apache/camel/component/huaweicloud/obs/OBSProducer.java
+++
b/components/camel-huawei/camel-huaweicloud-obs/src/main/java/org/apache/camel/component/huaweicloud/obs/OBSProducer.java
@@ -48,7 +48,6 @@ import org.slf4j.LoggerFactory;
public class OBSProducer extends DefaultProducer {
private static final Logger LOG =
LoggerFactory.getLogger(OBSProducer.class);
private OBSEndpoint endpoint;
- private ClientConfigurations clientConfigurations;
private ObsClient obsClient;
private Gson gson;
@@ -60,32 +59,37 @@ public class OBSProducer extends DefaultProducer {
@Override
protected void doInit() throws Exception {
super.doInit();
- this.clientConfigurations = new ClientConfigurations();
- this.obsClient = this.endpoint.initClient();
this.gson = new Gson();
}
public void process(Exchange exchange) throws Exception {
- updateClientConfigs(exchange);
+
+ ClientConfigurations clientConfigurations = new ClientConfigurations();
+
+ if (obsClient == null) {
+ this.obsClient = endpoint.initClient();
+ }
+
+ updateClientConfigs(exchange, clientConfigurations);
switch (clientConfigurations.getOperation()) {
case OBSOperations.LIST_BUCKETS:
listBuckets(exchange);
break;
case OBSOperations.CREATE_BUCKET:
- createBucket(exchange);
+ createBucket(exchange, clientConfigurations);
break;
case OBSOperations.DELETE_BUCKET:
- deleteBucket(exchange);
+ deleteBucket(exchange, clientConfigurations);
break;
case OBSOperations.CHECK_BUCKET_EXISTS:
- checkBucketExists(exchange);
+ checkBucketExists(exchange, clientConfigurations);
break;
case OBSOperations.GET_BUCKET_METADATA:
- getBucketMetadata(exchange);
+ getBucketMetadata(exchange, clientConfigurations);
break;
case OBSOperations.LIST_OBJECTS:
- listObjects(exchange);
+ listObjects(exchange, clientConfigurations);
break;
default:
throw new UnsupportedOperationException(
@@ -95,8 +99,9 @@ public class OBSProducer extends DefaultProducer {
/**
* Perform list buckets operation
- *
- * @param exchange
+ *
+ * @param exchange
+ * @throws ObsException
*/
private void listBuckets(Exchange exchange) throws ObsException {
// invoke list buckets method and map response object to exchange body
@@ -107,10 +112,12 @@ public class OBSProducer extends DefaultProducer {
/**
* Perform create bucket operation
- *
- * @param exchange
+ *
+ * @param exchange
+ * @param clientConfigurations
+ * @throws ObsException
*/
- private void createBucket(Exchange exchange) throws ObsException {
+ private void createBucket(Exchange exchange, ClientConfigurations
clientConfigurations) throws ObsException {
CreateBucketRequest request = null;
// checking if user inputted exchange body containing bucket
information. Body must be a CreateBucketRequest or a valid JSON string
(Advanced users)
@@ -153,10 +160,12 @@ public class OBSProducer extends DefaultProducer {
/**
* Perform delete bucket operation
- *
- * @param exchange
+ *
+ * @param exchange
+ * @param clientConfigurations
+ * @throws ObsException
*/
- private void deleteBucket(Exchange exchange) throws ObsException {
+ private void deleteBucket(Exchange exchange, ClientConfigurations
clientConfigurations) throws ObsException {
// check for bucket name, which is mandatory to delete a bucket
if (ObjectHelper.isEmpty(clientConfigurations.getBucketName())) {
LOG.error("No bucket name given");
@@ -170,10 +179,12 @@ public class OBSProducer extends DefaultProducer {
/**
* Perform check bucket exists operation
- *
- * @param exchange
+ *
+ * @param exchange
+ * @param clientConfigurations
+ * @throws ObsException
*/
- private void checkBucketExists(Exchange exchange) throws ObsException {
+ private void checkBucketExists(Exchange exchange, ClientConfigurations
clientConfigurations) throws ObsException {
// check for bucket name, which is mandatory to check if a bucket
exists
if (ObjectHelper.isEmpty(clientConfigurations.getBucketName())) {
LOG.error("No bucket name given");
@@ -187,10 +198,12 @@ public class OBSProducer extends DefaultProducer {
/**
* Perform get bucket metadata operation
- *
- * @param exchange
+ *
+ * @param exchange
+ * @param clientConfigurations
+ * @throws ObsException
*/
- private void getBucketMetadata(Exchange exchange) throws ObsException {
+ private void getBucketMetadata(Exchange exchange, ClientConfigurations
clientConfigurations) throws ObsException {
// check for bucket name, which is mandatory to get bucket metadata
if (ObjectHelper.isEmpty(clientConfigurations.getBucketName())) {
LOG.error("No bucket name given");
@@ -205,10 +218,12 @@ public class OBSProducer extends DefaultProducer {
/**
* Perform list objects operation
- *
- * @param exchange
+ *
+ * @param exchange
+ * @param clientConfigurations
+ * @throws ObsException
*/
- private void listObjects(Exchange exchange) throws ObsException {
+ private void listObjects(Exchange exchange, ClientConfigurations
clientConfigurations) throws ObsException {
ListObjectsRequest request = null;
// checking if user inputted exchange body containing list objects
information. Body must be a ListObjectsRequest or a valid JSON string (Advanced
users)
@@ -251,11 +266,11 @@ public class OBSProducer extends DefaultProducer {
* Update dynamic client configurations. Some endpoint parameters
(operation, and bucket name and location) can also
* be passed via exchange properties, so they can be updated between each
transaction. Since they can change, we
* must clear the previous transaction and update these parameters with
their new values
- *
+ *
* @param exchange
+ * @param clientConfigurations
*/
- private void updateClientConfigs(Exchange exchange) {
- resetDynamicConfigs();
+ private void updateClientConfigs(Exchange exchange, ClientConfigurations
clientConfigurations) {
// checking for required operation (exchange overrides endpoint
operation if both are provided)
if (ObjectHelper.isEmpty(exchange.getProperty(OBSProperties.OPERATION))
@@ -287,13 +302,4 @@ public class OBSProducer extends DefaultProducer {
: endpoint.getBucketLocation());
}
}
-
- /**
- * Set all dynamic configurations to null
- */
- private void resetDynamicConfigs() {
- clientConfigurations.setOperation(null);
- clientConfigurations.setBucketName(null);
- clientConfigurations.setBucketLocation(null);
- }
}
diff --git
a/components/camel-huawei/camel-huaweicloud-smn/src/main/java/org/apache/camel/component/huaweicloud/smn/SimpleNotificationProducer.java
b/components/camel-huawei/camel-huaweicloud-smn/src/main/java/org/apache/camel/component/huaweicloud/smn/SimpleNotificationProducer.java
index 04f14a9..c5de448 100644
---
a/components/camel-huawei/camel-huaweicloud-smn/src/main/java/org/apache/camel/component/huaweicloud/smn/SimpleNotificationProducer.java
+++
b/components/camel-huawei/camel-huaweicloud-smn/src/main/java/org/apache/camel/component/huaweicloud/smn/SimpleNotificationProducer.java
@@ -39,7 +39,6 @@ import org.slf4j.LoggerFactory;
public class SimpleNotificationProducer extends DefaultProducer {
private static final Logger LOG =
LoggerFactory.getLogger(SimpleNotificationProducer.class);
private SmnClient smnClient;
- private ClientConfigurations clientConfigurations;
public SimpleNotificationProducer(SimpleNotificationEndpoint endpoint) {
super(endpoint);
@@ -48,11 +47,16 @@ public class SimpleNotificationProducer extends
DefaultProducer {
@Override
protected void doStart() throws Exception {
super.doStart();
- validateAndInitializeSmnClient((SimpleNotificationEndpoint)
super.getEndpoint());
}
public void process(Exchange exchange) throws Exception {
+ ClientConfigurations clientConfigurations = new ClientConfigurations();
+
+ if (smnClient == null) {
+ validateAndInitializeSmnClient((SimpleNotificationEndpoint)
super.getEndpoint(), clientConfigurations);
+ }
+
String service = ((SimpleNotificationEndpoint)
super.getEndpoint()).getSmnService();
if (!ObjectHelper.isEmpty(service)) {
@@ -61,7 +65,8 @@ public class SimpleNotificationProducer extends
DefaultProducer {
if (LOG.isDebugEnabled()) {
LOG.debug("Using message publishing service");
}
-
performPublishMessageServiceOperations((SimpleNotificationEndpoint)
super.getEndpoint(), exchange);
+
performPublishMessageServiceOperations((SimpleNotificationEndpoint)
super.getEndpoint(), exchange,
+ clientConfigurations);
if (LOG.isDebugEnabled()) {
LOG.debug("Completed publishing message");
}
@@ -82,15 +87,19 @@ public class SimpleNotificationProducer extends
DefaultProducer {
/**
* Publish message service operations
- *
+ *
* @param endpoint
* @param exchange
+ * @param clientConfigurations
*/
- private void
performPublishMessageServiceOperations(SimpleNotificationEndpoint endpoint,
Exchange exchange) {
+ private void performPublishMessageServiceOperations(
+ SimpleNotificationEndpoint endpoint,
+ Exchange exchange,
+ ClientConfigurations clientConfigurations) {
PublishMessageResponse response;
PublishMessageRequestBody apiBody;
- this.clientConfigurations = validateServiceConfigurations(endpoint,
exchange);
+ validateServiceConfigurations(endpoint, exchange,
clientConfigurations);
if (LOG.isDebugEnabled()) {
LOG.debug("Checking operation name");
@@ -156,10 +165,13 @@ public class SimpleNotificationProducer extends
DefaultProducer {
/**
* validation and initialization of SmnClient object
- *
+ *
* @param simpleNotificationEndpoint
+ * @param clientConfigurations
*/
- private void validateAndInitializeSmnClient(SimpleNotificationEndpoint
simpleNotificationEndpoint) {
+ private void validateAndInitializeSmnClient(
+ SimpleNotificationEndpoint simpleNotificationEndpoint,
+ ClientConfigurations clientConfigurations) {
if (simpleNotificationEndpoint.getSmnClient() != null) {
if (LOG.isWarnEnabled()) {
LOG.warn(
@@ -168,7 +180,6 @@ public class SimpleNotificationProducer extends
DefaultProducer {
this.smnClient = simpleNotificationEndpoint.getSmnClient();
return;
}
- this.clientConfigurations = new ClientConfigurations();
//checking for cloud SK (secret key)
if (ObjectHelper.isEmpty(simpleNotificationEndpoint.getSecretKey()) &&
@@ -292,15 +303,15 @@ public class SimpleNotificationProducer extends
DefaultProducer {
/**
* validation of all user inputs before attempting to invoke a service
operation
- *
- * @param simpleNotificationEndpoint
- * @param exchange
- * @return
+ *
+ * @param simpleNotificationEndpoint
+ * @param exchange
+ * @param clientConfigurations
*/
- private ClientConfigurations validateServiceConfigurations(
- SimpleNotificationEndpoint simpleNotificationEndpoint, Exchange
exchange) {
+ private void validateServiceConfigurations(
+ SimpleNotificationEndpoint simpleNotificationEndpoint, Exchange
exchange,
+ ClientConfigurations clientConfigurations) {
- ClientConfigurations clientConfigurations = new ClientConfigurations();
if (LOG.isDebugEnabled()) {
LOG.debug("Inspecting exchange body");
}
@@ -369,7 +380,5 @@ public class SimpleNotificationProducer extends
DefaultProducer {
} else {
clientConfigurations.setMessageTtl((int)
exchange.getProperty(SmnProperties.NOTIFICATION_TTL));
}
-
- return clientConfigurations;
}
}