This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ccf99ca Provides singleton example for producer (#540)
1ccf99ca is described below

commit 1ccf99ca3d2ca55649c0a8a3ab8089d910568ba4
Author: Aaron Ai <[email protected]>
AuthorDate: Wed Jun 14 13:36:57 2023 +0800

    Provides singleton example for producer (#540)
    
    * Provides singleton example for producer
    
    * Fix style
    
    * Polish code
---
 .../client/java/example/AsyncProducerExample.java  | 29 +------
 .../java/example/AsyncSimpleConsumerExample.java   |  1 +
 .../java/example/ProducerDelayMessageExample.java  | 29 +------
 .../java/example/ProducerFifoMessageExample.java   | 29 +------
 .../java/example/ProducerNormalMessageExample.java | 29 +------
 .../client/java/example/ProducerSingleton.java     | 90 ++++++++++++++++++++++
 .../example/ProducerTransactionMessageExample.java | 32 ++------
 .../client/java/example/PushConsumerExample.java   |  3 +-
 .../client/java/example/SimpleConsumerExample.java |  1 +
 9 files changed, 116 insertions(+), 127 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
index 7bf80986..0a52c825 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
@@ -17,16 +17,12 @@
 
 package org.apache.rocketmq.client.java.example;
 
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.ClientServiceProvider;
-import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
-import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
 import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.producer.Producer;
 import org.apache.rocketmq.client.apis.producer.SendReceipt;
@@ -39,29 +35,11 @@ public class AsyncProducerExample {
     private AsyncProducerExample() {
     }
 
-    public static void main(String[] args) throws ClientException, 
IOException, InterruptedException {
+    public static void main(String[] args) throws ClientException, 
InterruptedException {
         final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
 
-        // Credential provider is optional for client configuration.
-        String accessKey = "yourAccessKey";
-        String secretKey = "yourSecretKey";
-        SessionCredentialsProvider sessionCredentialsProvider =
-            new StaticSessionCredentialsProvider(accessKey, secretKey);
-
-        String endpoints = "foobar.com:8080";
-        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
-            .setEndpoints(endpoints)
-            .setCredentialProvider(sessionCredentialsProvider)
-            .build();
         String topic = "yourTopic";
-        // In most case, you don't need to create too many producers, 
singleton pattern is recommended.
-        final Producer producer = provider.newProducerBuilder()
-            .setClientConfiguration(clientConfiguration)
-            // Set the topic name(s), which is optional but recommended. It 
makes producer could prefetch the topic
-            // route before message publishing.
-            .setTopics(topic)
-            // May throw {@link ClientException} if the producer is not 
initialized.
-            .build();
+        final Producer producer = ProducerSingleton.getInstance(topic);
         // Define your message body.
         byte[] body = "This is a normal message for Apache 
RocketMQ".getBytes(StandardCharsets.UTF_8);
         String tag = "yourMessageTagA";
@@ -88,6 +66,7 @@ public class AsyncProducerExample {
         // Block to avoid exist of background threads.
         Thread.sleep(Long.MAX_VALUE);
         // Close the producer when you don't need it anymore.
-        producer.close();
+        // You could close it manually or add this into the JVM shutdown hook.
+        // producer.shutdown();
     }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
index d19a9bfd..2d96e946 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
@@ -112,6 +112,7 @@ public class AsyncSimpleConsumerExample {
             }), receiveCallbackExecutor);
         } while (true);
         // Close the simple consumer when you don't need it anymore.
+        // You could close it manually or add this into the JVM shutdown hook.
         // consumer.close();
     }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerDelayMessageExample.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerDelayMessageExample.java
index 24402914..a4dcf961 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerDelayMessageExample.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerDelayMessageExample.java
@@ -17,14 +17,10 @@
 
 package org.apache.rocketmq.client.java.example;
 
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
-import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.ClientServiceProvider;
-import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
-import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
 import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.producer.Producer;
 import org.apache.rocketmq.client.apis.producer.SendReceipt;
@@ -37,29 +33,11 @@ public class ProducerDelayMessageExample {
     private ProducerDelayMessageExample() {
     }
 
-    public static void main(String[] args) throws ClientException, IOException 
{
+    public static void main(String[] args) throws ClientException {
         final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
 
-        // Credential provider is optional for client configuration.
-        String accessKey = "yourAccessKey";
-        String secretKey = "yourSecretKey";
-        SessionCredentialsProvider sessionCredentialsProvider =
-            new StaticSessionCredentialsProvider(accessKey, secretKey);
-
-        String endpoints = "foobar.com:8080";
-        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
-            .setEndpoints(endpoints)
-            .setCredentialProvider(sessionCredentialsProvider)
-            .build();
         String topic = "yourDelayTopic";
-        // In most case, you don't need to create too many producers, 
singleton pattern is recommended.
-        final Producer producer = provider.newProducerBuilder()
-            .setClientConfiguration(clientConfiguration)
-            // Set the topic name(s), which is optional but recommended. It 
makes producer could prefetch the topic
-            // route before message publishing.
-            .setTopics(topic)
-            // May throw {@link ClientException} if the producer is not 
initialized.
-            .build();
+        final Producer producer = ProducerSingleton.getInstance(topic);
         // Define your message body.
         byte[] body = "This is a delay message for Apache 
RocketMQ".getBytes(StandardCharsets.UTF_8);
         String tag = "yourMessageTagA";
@@ -82,6 +60,7 @@ public class ProducerDelayMessageExample {
             log.error("Failed to send message", t);
         }
         // Close the producer when you don't need it anymore.
-        producer.close();
+        // You could close it manually or add this into the JVM shutdown hook.
+        // producer.shutdown();
     }
 }
\ No newline at end of file
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java
index e8bbb055..a39a980c 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerFifoMessageExample.java
@@ -17,13 +17,9 @@
 
 package org.apache.rocketmq.client.java.example;
 
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.ClientServiceProvider;
-import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
-import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
 import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.producer.Producer;
 import org.apache.rocketmq.client.apis.producer.SendReceipt;
@@ -36,29 +32,11 @@ public class ProducerFifoMessageExample {
     private ProducerFifoMessageExample() {
     }
 
-    public static void main(String[] args) throws ClientException, IOException 
{
+    public static void main(String[] args) throws ClientException {
         final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
 
-        // Credential provider is optional for client configuration.
-        String accessKey = "yourAccessKey";
-        String secretKey = "yourSecretKey";
-        SessionCredentialsProvider sessionCredentialsProvider =
-            new StaticSessionCredentialsProvider(accessKey, secretKey);
-
-        String endpoints = "foobar.com:8080";
-        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
-            .setEndpoints(endpoints)
-            .setCredentialProvider(sessionCredentialsProvider)
-            .build();
         String topic = "yourFifoTopic";
-        // In most case, you don't need to create too many producers, 
singleton pattern is recommended.
-        final Producer producer = provider.newProducerBuilder()
-            .setClientConfiguration(clientConfiguration)
-            // Set the topic name(s), which is optional but recommended. It 
makes producer could prefetch the topic
-            // route before message publishing.
-            .setTopics(topic)
-            // May throw {@link ClientException} if the producer is not 
initialized.
-            .build();
+        final Producer producer = ProducerSingleton.getInstance(topic);
         // Define your message body.
         byte[] body = "This is a FIFO message for Apache 
RocketMQ".getBytes(StandardCharsets.UTF_8);
         String tag = "yourMessageTagA";
@@ -80,6 +58,7 @@ public class ProducerFifoMessageExample {
             log.error("Failed to send message", t);
         }
         // Close the producer when you don't need it anymore.
-        producer.close();
+        // You could close it manually or add this into the JVM shutdown hook.
+        // producer.shutdown();
     }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerNormalMessageExample.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerNormalMessageExample.java
index b0dde2ca..b8e510ce 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerNormalMessageExample.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerNormalMessageExample.java
@@ -17,13 +17,9 @@
 
 package org.apache.rocketmq.client.java.example;
 
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.ClientServiceProvider;
-import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
-import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
 import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.producer.Producer;
 import org.apache.rocketmq.client.apis.producer.SendReceipt;
@@ -36,29 +32,11 @@ public class ProducerNormalMessageExample {
     private ProducerNormalMessageExample() {
     }
 
-    public static void main(String[] args) throws ClientException, IOException 
{
+    public static void main(String[] args) throws ClientException {
         final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
 
-        // Credential provider is optional for client configuration.
-        String accessKey = "yourAccessKey";
-        String secretKey = "yourSecretKey";
-        SessionCredentialsProvider sessionCredentialsProvider =
-            new StaticSessionCredentialsProvider(accessKey, secretKey);
-
-        String endpoints = "foobar.com:8080";
-        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
-            .setEndpoints(endpoints)
-            .setCredentialProvider(sessionCredentialsProvider)
-            .build();
         String topic = "yourNormalTopic";
-        // In most case, you don't need to create too many producers, 
singleton pattern is recommended.
-        final Producer producer = provider.newProducerBuilder()
-            .setClientConfiguration(clientConfiguration)
-            // Set the topic name(s), which is optional but recommended. It 
makes producer could prefetch the topic
-            // route before message publishing.
-            .setTopics(topic)
-            // May throw {@link ClientException} if the producer is not 
initialized.
-            .build();
+        final Producer producer = ProducerSingleton.getInstance(topic);
         // Define your message body.
         byte[] body = "This is a normal message for Apache 
RocketMQ".getBytes(StandardCharsets.UTF_8);
         String tag = "yourMessageTagA";
@@ -78,6 +56,7 @@ public class ProducerNormalMessageExample {
             log.error("Failed to send message", t);
         }
         // Close the producer when you don't need it anymore.
-        producer.close();
+        // You could close it manually or add this into the JVM shutdown hook.
+        // producer.shutdown();
     }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerSingleton.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerSingleton.java
new file mode 100644
index 00000000..76c037ec
--- /dev/null
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerSingleton.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.example;
+
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
+import org.apache.rocketmq.client.apis.producer.TransactionChecker;
+
+/**
+ * Each client will establish an independent connection to the server node 
within a process.
+ *
+ * <p>In most cases, the singleton mode can meet the requirements of higher 
concurrency.
+ * If multiple connections are desired, consider increasing the number of 
clients appropriately.
+ */
+public class ProducerSingleton {
+    private static volatile Producer PRODUCER;
+    private static volatile Producer TRANSACTIONAL_PRODUCER;
+    private static final String ACCESS_KEY = "yourAccessKey";
+    private static final String SECRET_KEY = "yourSecretKey";
+    private static final String ENDPOINTS = "foobar.com:8080";
+
+    private ProducerSingleton() {
+    }
+
+    private static Producer buildProducer(TransactionChecker checker, 
String... topics) throws ClientException {
+        final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
+        // Credential provider is optional for client configuration.
+        // This parameter is necessary only when the server ACL is enabled. 
Otherwise,
+        // it does not need to be set by default.
+        SessionCredentialsProvider sessionCredentialsProvider =
+            new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY);
+        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
+            .setEndpoints(ENDPOINTS)
+            .setCredentialProvider(sessionCredentialsProvider)
+            .build();
+        final ProducerBuilder builder = provider.newProducerBuilder()
+            .setClientConfiguration(clientConfiguration)
+            // Set the topic name(s), which is optional but recommended. It 
makes producer could prefetch
+            // the topic route before message publishing.
+            .setTopics(topics);
+        if (checker != null) {
+            // Set the transaction checker.
+            builder.setTransactionChecker(checker);
+        }
+        return builder.build();
+    }
+
+    public static Producer getInstance(String... topics) throws 
ClientException {
+        if (null == PRODUCER) {
+            synchronized (ProducerSingleton.class) {
+                if (null == PRODUCER) {
+                    PRODUCER = buildProducer(null, topics);
+                }
+            }
+        }
+        return PRODUCER;
+    }
+
+    public static Producer getTransactionalInstance(TransactionChecker checker,
+        String... topics) throws ClientException {
+        if (null == TRANSACTIONAL_PRODUCER) {
+            synchronized (ProducerSingleton.class) {
+                if (null == TRANSACTIONAL_PRODUCER) {
+                    TRANSACTIONAL_PRODUCER = buildProducer(checker, topics);
+                }
+            }
+        }
+        return TRANSACTIONAL_PRODUCER;
+    }
+}
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerTransactionMessageExample.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerTransactionMessageExample.java
index a538ae0c..d05f2a99 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerTransactionMessageExample.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerTransactionMessageExample.java
@@ -17,13 +17,9 @@
 
 package org.apache.rocketmq.client.java.example;
 
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.ClientServiceProvider;
-import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
-import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
 import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.producer.Producer;
 import org.apache.rocketmq.client.apis.producer.SendReceipt;
@@ -39,35 +35,17 @@ public class ProducerTransactionMessageExample {
     private ProducerTransactionMessageExample() {
     }
 
-    public static void main(String[] args) throws ClientException, IOException 
{
+    public static void main(String[] args) throws ClientException {
         final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
 
-        // Credential provider is optional for client configuration.
-        String accessKey = "yourAccessKey";
-        String secretKey = "yourSecretKey";
-        SessionCredentialsProvider sessionCredentialsProvider =
-            new StaticSessionCredentialsProvider(accessKey, secretKey);
-
-        String endpoints = "foobar.com:8080";
-        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
-            .setEndpoints(endpoints)
-            .setCredentialProvider(sessionCredentialsProvider)
-            .build();
         String topic = "yourTransactionTopic";
         TransactionChecker checker = messageView -> {
             log.info("Receive transactional message check, message={}", 
messageView);
             // Return the transaction resolution according to your business 
logic.
             return TransactionResolution.COMMIT;
         };
-        // In most case, you don't need to create too many producers, 
singleton pattern is recommended.
-        Producer producer = provider.newProducerBuilder()
-            .setClientConfiguration(clientConfiguration)
-            // Set the topic name(s), which is optional but recommended. It 
makes producer could prefetch the topic
-            // route before message publishing.
-            .setTopics(topic)
-            // Set transactional checker.
-            .setTransactionChecker(checker)
-            .build();
+        // Get producer using singleton pattern.
+        final Producer producer = 
ProducerSingleton.getTransactionalInstance(checker, topic);
         final Transaction transaction = producer.beginTransaction();
         // Define your message body.
         byte[] body = "This is a transaction message for Apache 
RocketMQ".getBytes(StandardCharsets.UTF_8);
@@ -92,7 +70,9 @@ public class ProducerTransactionMessageExample {
         transaction.commit();
         // Or rollback the transaction.
         // transaction.rollback();
+
         // Close the producer when you don't need it anymore.
-        producer.close();
+        // You could close it manually or add this into the JVM shutdown hook.
+        // producer.shutdown();
     }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java
index dab6eb6a..d5119ceb 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java
@@ -37,7 +37,7 @@ public class PushConsumerExample {
     private PushConsumerExample() {
     }
 
-    public static void main(String[] args) throws ClientException, 
IOException, InterruptedException {
+    public static void main(String[] args) throws ClientException, 
InterruptedException, IOException {
         final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
 
         // Credential provider is optional for client configuration.
@@ -71,6 +71,7 @@ public class PushConsumerExample {
         // Block the main thread, no need for production environment.
         Thread.sleep(Long.MAX_VALUE);
         // Close the push consumer when you don't need it anymore.
+        // You could close it manually or add this into the JVM shutdown hook.
         pushConsumer.close();
     }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
index 7adeace6..e349b82a 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
@@ -88,6 +88,7 @@ public class SimpleConsumerExample {
             }
         } while (true);
         // Close the simple consumer when you don't need it anymore.
+        // You could close it manually or add this into the JVM shutdown hook.
         // consumer.close();
     }
 }

Reply via email to