This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4956859  [FLINK-26931][Connector/pulsar] Make the producer name and 
consumer name unique for each instance.
4956859 is described below

commit 495685970e31085815ba0435322ab44e4504cd55
Author: Yufan Sheng <[email protected]>
AuthorDate: Wed Mar 30 20:19:51 2022 +0800

    [FLINK-26931][Connector/pulsar] Make the producer name and consumer name 
unique for each instance.
---
 .../pulsar/common/config/PulsarClientFactory.java      | 18 ++++++++++--------
 .../pulsar/common/config/PulsarConfiguration.java      |  2 +-
 .../flink/connector/pulsar/sink/PulsarSinkBuilder.java |  5 +++++
 .../pulsar/sink/config/PulsarSinkConfigUtils.java      |  6 +++++-
 .../connector/pulsar/sink/writer/PulsarWriter.java     |  2 +-
 .../connector/pulsar/source/PulsarSourceBuilder.java   |  7 ++++++-
 .../pulsar/source/config/PulsarSourceConfigUtils.java  |  6 +++++-
 .../flink/connector/pulsar/sink/PulsarSinkITCase.java  |  2 +-
 8 files changed, 34 insertions(+), 14 deletions(-)

diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java
index b1214b5..1f01b24 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java
@@ -153,7 +153,7 @@ public final class PulsarClientFactory {
 
     /**
      * PulsarAdmin shares almost the same configuration with PulsarClient, but 
we separate this
-     * create method for directly creating it.
+     * creating method for directly use it.
      */
     public static PulsarAdmin createAdmin(PulsarConfiguration configuration) {
         PulsarAdminBuilder builder = PulsarAdmin.builder();
@@ -200,15 +200,17 @@ public final class PulsarClientFactory {
                 String authParamsString = 
configuration.get(PULSAR_AUTH_PARAMS);
                 return sneakyClient(
                         () -> 
AuthenticationFactory.create(authPluginClassName, authParamsString));
-            } else if (configuration.contains(PULSAR_AUTH_PARAM_MAP)) {
-                Map<String, String> paramsMap = 
configuration.get(PULSAR_AUTH_PARAM_MAP);
+            } else {
+                Map<String, String> paramsMap = 
configuration.getProperties(PULSAR_AUTH_PARAM_MAP);
+                if (paramsMap.isEmpty()) {
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "No %s or %s provided",
+                                    PULSAR_AUTH_PARAMS.key(), 
PULSAR_AUTH_PARAM_MAP.key()));
+                }
+
                 return sneakyClient(
                         () -> 
AuthenticationFactory.create(authPluginClassName, paramsMap));
-            } else {
-                throw new IllegalArgumentException(
-                        String.format(
-                                "No %s or %s provided",
-                                PULSAR_AUTH_PARAMS.key(), 
PULSAR_AUTH_PARAM_MAP.key()));
             }
         }
 
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfiguration.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfiguration.java
index 3e64c66..0681a3e 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfiguration.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfiguration.java
@@ -76,7 +76,7 @@ public abstract class PulsarConfiguration extends 
UnmodifiableConfiguration {
         return properties;
     }
 
-    /** Get an option value from the given config, convert it into the a new 
value instance. */
+    /** Get an option value from the given config, convert it into a new value 
instance. */
     public <F, T> T get(ConfigOption<F> option, Function<F, T> convertor) {
         F value = get(option);
         if (value != null) {
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
index 1668e3d..332a27a 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
@@ -315,6 +315,11 @@ public class PulsarSinkBuilder<IN> {
         if (!configBuilder.contains(PULSAR_PRODUCER_NAME)) {
             LOG.warn(
                     "We recommend set a readable producer name through 
setProducerName(String) in production mode.");
+        } else {
+            String producerName = configBuilder.get(PULSAR_PRODUCER_NAME);
+            if (!producerName.contains("%s")) {
+                configBuilder.override(PULSAR_PRODUCER_NAME, producerName + " 
- %s");
+            }
         }
 
         checkNotNull(serializationSchema, "serializationSchema must be set.");
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
index 13821fe..50f0e15 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 
 import java.util.Map;
+import java.util.UUID;
 
 import static java.util.concurrent.TimeUnit.MICROSECONDS;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -70,7 +71,10 @@ public final class PulsarSinkConfigUtils {
             PulsarClient client, Schema<T> schema, SinkConfiguration 
configuration) {
         ProducerBuilder<T> builder = client.newProducer(schema);
 
-        configuration.useOption(PULSAR_PRODUCER_NAME, builder::producerName);
+        configuration.useOption(
+                PULSAR_PRODUCER_NAME,
+                producerName -> String.format(producerName, UUID.randomUUID()),
+                builder::producerName);
         configuration.useOption(
                 PULSAR_SEND_TIMEOUT_MS,
                 Math::toIntExact,
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
index 1e4113a..dec4f52 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
@@ -189,7 +189,7 @@ public class PulsarWriter<IN> implements 
PrecommittingSinkWriter<IN, PulsarCommi
         this.pendingMessages -= 1;
     }
 
-    @SuppressWarnings("rawtypes")
+    @SuppressWarnings({"rawtypes", "unchecked"})
     private TypedMessageBuilder<?> createMessageBuilder(
             String topic, Context context, PulsarMessage<?> message) {
 
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
index 517286d..ed68215 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
@@ -473,9 +473,14 @@ public final class PulsarSourceBuilder<OUT> {
         if (!configBuilder.contains(PULSAR_CONSUMER_NAME)) {
             LOG.warn(
                     "We recommend set a readable consumer name through 
setConsumerName(String) in production mode.");
+        } else {
+            String consumerName = configBuilder.get(PULSAR_CONSUMER_NAME);
+            if (!consumerName.contains("%s")) {
+                configBuilder.override(PULSAR_CONSUMER_NAME, consumerName + " 
- %s");
+            }
         }
 
-        // Since these implementation could be a lambda, make sure they are 
serializable.
+        // Since these implementations could be a lambda, make sure they are 
serializable.
         checkState(isSerializable(startCursor), "StartCursor isn't 
serializable");
         checkState(isSerializable(stopCursor), "StopCursor isn't 
serializable");
         checkState(isSerializable(rangeGenerator), "RangeGenerator isn't 
serializable");
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
index adb8a03..602a157 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.Schema;
 
 import java.util.Map;
 import java.util.Optional;
+import java.util.UUID;
 
 import static java.util.concurrent.TimeUnit.MICROSECONDS;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -106,7 +107,10 @@ public final class PulsarSourceConfigUtils {
         configuration.useOption(
                 PULSAR_MAX_TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS,
                 builder::maxTotalReceiverQueueSizeAcrossPartitions);
-        configuration.useOption(PULSAR_CONSUMER_NAME, builder::consumerName);
+        configuration.useOption(
+                PULSAR_CONSUMER_NAME,
+                consumerName -> String.format(consumerName, UUID.randomUUID()),
+                builder::consumerName);
         configuration.useOption(PULSAR_READ_COMPACTED, builder::readCompacted);
         configuration.useOption(PULSAR_PRIORITY_LEVEL, builder::priorityLevel);
         configuration.useOption(
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
index 2c2c05d..e5da329 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
@@ -61,7 +61,7 @@ class PulsarSinkITCase extends PulsarTestSuiteBase {
     @ParameterizedTest
     @EnumSource(DeliveryGuarantee.class)
     void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
-        // A random topic with partition 1.
+        // A random topic with partition 4.
         String topic = randomAlphabetic(8);
         operator().createTopic(topic, 4);
         int counts = ThreadLocalRandom.current().nextInt(100, 200);

Reply via email to