This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 1ccf99ca Provides singleton example for producer (#540)
1ccf99ca is described below
commit 1ccf99ca3d2ca55649c0a8a3ab8089d910568ba4
Author: Aaron Ai <[email protected]>
AuthorDate: Wed Jun 14 13:36:57 2023 +0800
Provides singleton example for producer (#540)
* Provides singleton example for producer
* Fix style
* Polish code
---
.../client/java/example/AsyncProducerExample.java | 29 +------
.../java/example/AsyncSimpleConsumerExample.java | 1 +
.../java/example/ProducerDelayMessageExample.java | 29 +------
.../java/example/ProducerFifoMessageExample.java | 29 +------
.../java/example/ProducerNormalMessageExample.java | 29 +------
.../client/java/example/ProducerSingleton.java | 90 ++++++++++++++++++++++
.../example/ProducerTransactionMessageExample.java | 32 ++------
.../client/java/example/PushConsumerExample.java | 3 +-
.../client/java/example/SimpleConsumerExample.java | 1 +
9 files changed, 116 insertions(+), 127 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
index 7bf80986..0a52c825 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
@@ -17,16 +17,12 @@
package org.apache.rocketmq.client.java.example;
-import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
-import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
-import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
@@ -39,29 +35,11 @@ public class AsyncProducerExample {
private AsyncProducerExample() {
}
- public static void main(String[] args) throws ClientException,
IOException, InterruptedException {
+ public static void main(String[] args) throws ClientException,
InterruptedException {
final ClientServiceProvider provider =
ClientServiceProvider.loadService();
- // Credential provider is optional for client configuration.
- String accessKey = "yourAccessKey";
- String secretKey = "yourSecretKey";
- SessionCredentialsProvider sessionCredentialsProvider =
- new StaticSessionCredentialsProvider(accessKey, secretKey);
-
- String endpoints = "foobar.com:8080";
- ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
- .setEndpoints(endpoints)
- .setCredentialProvider(sessionCredentialsProvider)
- .build();
String topic = "yourTopic";
- // In most case, you don't need to create too many producers,
singleton pattern is recommended.
- final Producer producer = provider.newProducerBuilder()
- .setClientConfiguration(clientConfiguration)
- // Set the topic name(s), which is optional but recommended. It
makes producer could prefetch the topic
- // route before message publishing.
- .setTopics(topic)
- // May throw {@link ClientException} if the producer is not
initialized.
- .build();
+ final Producer producer = ProducerSingleton.getInstance(topic);
// Define your message body.
byte[] body = "This is a normal message for Apache
RocketMQ".getBytes(StandardCharsets.UTF_8);
String tag = "yourMessageTagA";
@@ -88,6 +66,7 @@ public class AsyncProducerExample {
// Block to avoid exist of background threads.
Thread.sleep(Long.MAX_VALUE);
// Close the producer when you don't need it anymore.
- producer.close();
+ // You could close it manually or add this into the JVM shutdown hook.
+ // producer.shutdown();
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
index d19a9bfd..2d96e946 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
@@ -112,6 +112,7 @@ public class AsyncSimpleConsumerExample {
}), receiveCallbackExecutor);
} while (true);
// Close the simple consumer when you don't need it anymore.
+ // You could close it manually or add this into the JVM shutdown hook.
// consumer.close();
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerDelayMessageExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerDelayMessageExample.java
index 24402914..a4dcf961 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerDelayMessageExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerDelayMessageExample.java
@@ -17,14 +17,10 @@
package org.apache.rocketmq.client.java.example;
-import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
-import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
-import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
-import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
@@ -37,29 +33,11 @@ public class ProducerDelayMessageExample {
private ProducerDelayMessageExample() {
}
- public static void main(String[] args) throws ClientException, IOException
{
+ public static void main(String[] args) throws ClientException {
final ClientServiceProvider provider =
ClientServiceProvider.loadService();
- // Credential provider is optional for client configuration.
- String accessKey = "yourAccessKey";
- String secretKey = "yourSecretKey";
- SessionCredentialsProvider sessionCredentialsProvider =
- new StaticSessionCredentialsProvider(accessKey, secretKey);
-
- String endpoints = "foobar.com:8080";
- ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
- .setEndpoints(endpoints)
- .setCredentialProvider(sessionCredentialsProvider)
- .build();
String topic = "yourDelayTopic";
- // In most case, you don't need to create too many producers,
singleton pattern is recommended.
- final Producer producer = provider.newProducerBuilder()
- .setClientConfiguration(clientConfiguration)
- // Set the topic name(s), which is optional but recommended. It
makes producer could prefetch the topic
- // route before message publishing.
- .setTopics(topic)
- // May throw {@link ClientException} if the producer is not
initialized.
- .build();
+ final Producer producer = ProducerSingleton.getInstance(topic);
// Define your message body.
byte[] body = "This is a delay message for Apache
RocketMQ".getBytes(StandardCharsets.UTF_8);
String tag = "yourMessageTagA";
@@ -82,6 +60,7 @@ public class ProducerDelayMessageExample {
log.error("Failed to send message", t);
}
// Close the producer when you don't need it anymore.
- producer.close();
+ // You could close it manually or add this into the JVM shutdown hook.
+ // producer.shutdown();
}
}
\ No newline at end of file
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java
index e8bbb055..a39a980c 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java
@@ -17,13 +17,9 @@
package org.apache.rocketmq.client.java.example;
-import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
-import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
-import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
@@ -36,29 +32,11 @@ public class ProducerFifoMessageExample {
private ProducerFifoMessageExample() {
}
- public static void main(String[] args) throws ClientException, IOException
{
+ public static void main(String[] args) throws ClientException {
final ClientServiceProvider provider =
ClientServiceProvider.loadService();
- // Credential provider is optional for client configuration.
- String accessKey = "yourAccessKey";
- String secretKey = "yourSecretKey";
- SessionCredentialsProvider sessionCredentialsProvider =
- new StaticSessionCredentialsProvider(accessKey, secretKey);
-
- String endpoints = "foobar.com:8080";
- ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
- .setEndpoints(endpoints)
- .setCredentialProvider(sessionCredentialsProvider)
- .build();
String topic = "yourFifoTopic";
- // In most case, you don't need to create too many producers,
singleton pattern is recommended.
- final Producer producer = provider.newProducerBuilder()
- .setClientConfiguration(clientConfiguration)
- // Set the topic name(s), which is optional but recommended. It
makes producer could prefetch the topic
- // route before message publishing.
- .setTopics(topic)
- // May throw {@link ClientException} if the producer is not
initialized.
- .build();
+ final Producer producer = ProducerSingleton.getInstance(topic);
// Define your message body.
byte[] body = "This is a FIFO message for Apache
RocketMQ".getBytes(StandardCharsets.UTF_8);
String tag = "yourMessageTagA";
@@ -80,6 +58,7 @@ public class ProducerFifoMessageExample {
log.error("Failed to send message", t);
}
// Close the producer when you don't need it anymore.
- producer.close();
+ // You could close it manually or add this into the JVM shutdown hook.
+ // producer.shutdown();
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerNormalMessageExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerNormalMessageExample.java
index b0dde2ca..b8e510ce 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerNormalMessageExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerNormalMessageExample.java
@@ -17,13 +17,9 @@
package org.apache.rocketmq.client.java.example;
-import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
-import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
-import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
@@ -36,29 +32,11 @@ public class ProducerNormalMessageExample {
private ProducerNormalMessageExample() {
}
- public static void main(String[] args) throws ClientException, IOException
{
+ public static void main(String[] args) throws ClientException {
final ClientServiceProvider provider =
ClientServiceProvider.loadService();
- // Credential provider is optional for client configuration.
- String accessKey = "yourAccessKey";
- String secretKey = "yourSecretKey";
- SessionCredentialsProvider sessionCredentialsProvider =
- new StaticSessionCredentialsProvider(accessKey, secretKey);
-
- String endpoints = "foobar.com:8080";
- ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
- .setEndpoints(endpoints)
- .setCredentialProvider(sessionCredentialsProvider)
- .build();
String topic = "yourNormalTopic";
- // In most case, you don't need to create too many producers,
singleton pattern is recommended.
- final Producer producer = provider.newProducerBuilder()
- .setClientConfiguration(clientConfiguration)
- // Set the topic name(s), which is optional but recommended. It
makes producer could prefetch the topic
- // route before message publishing.
- .setTopics(topic)
- // May throw {@link ClientException} if the producer is not
initialized.
- .build();
+ final Producer producer = ProducerSingleton.getInstance(topic);
// Define your message body.
byte[] body = "This is a normal message for Apache
RocketMQ".getBytes(StandardCharsets.UTF_8);
String tag = "yourMessageTagA";
@@ -78,6 +56,7 @@ public class ProducerNormalMessageExample {
log.error("Failed to send message", t);
}
// Close the producer when you don't need it anymore.
- producer.close();
+ // You could close it manually or add this into the JVM shutdown hook.
+ // producer.shutdown();
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerSingleton.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerSingleton.java
new file mode 100644
index 00000000..76c037ec
--- /dev/null
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerSingleton.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.example;
+
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
+import org.apache.rocketmq.client.apis.producer.TransactionChecker;
+
+/**
+ * Each client will establish an independent connection to the server node
within a process.
+ *
+ * <p>In most cases, the singleton mode can meet the requirements of higher
concurrency.
+ * If multiple connections are desired, consider increasing the number of
clients appropriately.
+ */
+public class ProducerSingleton {
+ private static volatile Producer PRODUCER;
+ private static volatile Producer TRANSACTIONAL_PRODUCER;
+ private static final String ACCESS_KEY = "yourAccessKey";
+ private static final String SECRET_KEY = "yourSecretKey";
+ private static final String ENDPOINTS = "foobar.com:8080";
+
+ private ProducerSingleton() {
+ }
+
+ private static Producer buildProducer(TransactionChecker checker,
String... topics) throws ClientException {
+ final ClientServiceProvider provider =
ClientServiceProvider.loadService();
+ // Credential provider is optional for client configuration.
+ // This parameter is necessary only when the server ACL is enabled.
Otherwise,
+ // it does not need to be set by default.
+ SessionCredentialsProvider sessionCredentialsProvider =
+ new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY);
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
+ .setEndpoints(ENDPOINTS)
+ .setCredentialProvider(sessionCredentialsProvider)
+ .build();
+ final ProducerBuilder builder = provider.newProducerBuilder()
+ .setClientConfiguration(clientConfiguration)
+ // Set the topic name(s), which is optional but recommended. It
makes producer could prefetch
+ // the topic route before message publishing.
+ .setTopics(topics);
+ if (checker != null) {
+ // Set the transaction checker.
+ builder.setTransactionChecker(checker);
+ }
+ return builder.build();
+ }
+
+ public static Producer getInstance(String... topics) throws
ClientException {
+ if (null == PRODUCER) {
+ synchronized (ProducerSingleton.class) {
+ if (null == PRODUCER) {
+ PRODUCER = buildProducer(null, topics);
+ }
+ }
+ }
+ return PRODUCER;
+ }
+
+ public static Producer getTransactionalInstance(TransactionChecker checker,
+ String... topics) throws ClientException {
+ if (null == TRANSACTIONAL_PRODUCER) {
+ synchronized (ProducerSingleton.class) {
+ if (null == TRANSACTIONAL_PRODUCER) {
+ TRANSACTIONAL_PRODUCER = buildProducer(checker, topics);
+ }
+ }
+ }
+ return TRANSACTIONAL_PRODUCER;
+ }
+}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerTransactionMessageExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerTransactionMessageExample.java
index a538ae0c..d05f2a99 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerTransactionMessageExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerTransactionMessageExample.java
@@ -17,13 +17,9 @@
package org.apache.rocketmq.client.java.example;
-import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
-import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
-import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
@@ -39,35 +35,17 @@ public class ProducerTransactionMessageExample {
private ProducerTransactionMessageExample() {
}
- public static void main(String[] args) throws ClientException, IOException
{
+ public static void main(String[] args) throws ClientException {
final ClientServiceProvider provider =
ClientServiceProvider.loadService();
- // Credential provider is optional for client configuration.
- String accessKey = "yourAccessKey";
- String secretKey = "yourSecretKey";
- SessionCredentialsProvider sessionCredentialsProvider =
- new StaticSessionCredentialsProvider(accessKey, secretKey);
-
- String endpoints = "foobar.com:8080";
- ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
- .setEndpoints(endpoints)
- .setCredentialProvider(sessionCredentialsProvider)
- .build();
String topic = "yourTransactionTopic";
TransactionChecker checker = messageView -> {
log.info("Receive transactional message check, message={}",
messageView);
// Return the transaction resolution according to your business
logic.
return TransactionResolution.COMMIT;
};
- // In most case, you don't need to create too many producers,
singleton pattern is recommended.
- Producer producer = provider.newProducerBuilder()
- .setClientConfiguration(clientConfiguration)
- // Set the topic name(s), which is optional but recommended. It
makes producer could prefetch the topic
- // route before message publishing.
- .setTopics(topic)
- // Set transactional checker.
- .setTransactionChecker(checker)
- .build();
+ // Get producer using singleton pattern.
+ final Producer producer =
ProducerSingleton.getTransactionalInstance(checker, topic);
final Transaction transaction = producer.beginTransaction();
// Define your message body.
byte[] body = "This is a transaction message for Apache
RocketMQ".getBytes(StandardCharsets.UTF_8);
@@ -92,7 +70,9 @@ public class ProducerTransactionMessageExample {
transaction.commit();
// Or rollback the transaction.
// transaction.rollback();
+
// Close the producer when you don't need it anymore.
- producer.close();
+ // You could close it manually or add this into the JVM shutdown hook.
+ // producer.shutdown();
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java
index dab6eb6a..d5119ceb 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java
@@ -37,7 +37,7 @@ public class PushConsumerExample {
private PushConsumerExample() {
}
- public static void main(String[] args) throws ClientException,
IOException, InterruptedException {
+ public static void main(String[] args) throws ClientException,
InterruptedException, IOException {
final ClientServiceProvider provider =
ClientServiceProvider.loadService();
// Credential provider is optional for client configuration.
@@ -71,6 +71,7 @@ public class PushConsumerExample {
// Block the main thread, no need for production environment.
Thread.sleep(Long.MAX_VALUE);
// Close the push consumer when you don't need it anymore.
+ // You could close it manually or add this into the JVM shutdown hook.
pushConsumer.close();
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
index 7adeace6..e349b82a 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
@@ -88,6 +88,7 @@ public class SimpleConsumerExample {
}
} while (true);
// Close the simple consumer when you don't need it anymore.
+ // You could close it manually or add this into the JVM shutdown hook.
// consumer.close();
}
}