This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4101168  [Flink] Allow to customize PulsarProducer (#3955)
4101168 is described below

commit 4101168fbf5689af0eac37e1fc03195778f544d5
Author: Cristian <[email protected]>
AuthorDate: Mon Apr 1 21:15:44 2019 -0700

    [Flink] Allow to customize PulsarProducer (#3955)
    
    This is an improvement over #3894.
    
    Because of how Flink instantiates functions, instead of passing a
    custom `PulsarProducer` client we need to pass an object that is
    serializable. The current implementation will default to always
    call `createProducer()` because `producer` is `transient`, so it will
    always be null when Flink creates new instances of the sink.
---
 .../connectors/pulsar/FlinkPulsarProducer.java     | 23 +++++++++++++++-------
 1 file changed, 16 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index 04e69f8..c0d3905 100644
--- 
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -22,6 +22,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 import java.util.function.Function;
+import java.util.Map;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -37,8 +38,10 @@ import 
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtract
 import org.apache.flink.util.SerializableObject;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,6 +76,11 @@ public class FlinkPulsarProducer<IN>
     protected final PulsarKeyExtractor<IN> flinkPulsarKeyExtractor;
 
     /**
+     * {@link Producer} configuration map (will be materialized as a {@link 
ProducerConfigurationData} instance)
+     */
+    protected final Map<String, Object> producerConfig;
+
+    /**
      * Produce Mode.
      */
     protected PulsarProduceMode produceMode = PulsarProduceMode.AT_LEAST_ONCE;
@@ -122,7 +130,7 @@ public class FlinkPulsarProducer<IN>
                                String defaultTopicName,
                                SerializationSchema<IN> serializationSchema,
                                PulsarKeyExtractor<IN> keyExtractor,
-                               Producer<byte[]> producer) {
+                               Map<String, Object> producerConfig) {
         checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url cannot 
be blank");
         checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName 
cannot be blank");
         this.serviceUrl = serviceUrl;
@@ -130,7 +138,7 @@ public class FlinkPulsarProducer<IN>
         this.schema = checkNotNull(serializationSchema, "Serialization Schema 
not set");
         this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
         ClosureCleaner.ensureSerializable(serializationSchema);
-        this.producer = producer;
+        this.producerConfig = producerConfig;
     }
 
     // ---------------------------------- Properties --------------------------
@@ -183,7 +191,11 @@ public class FlinkPulsarProducer<IN>
 
     private Producer<byte[]> createProducer() throws Exception {
         PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
-        return client.newProducer().topic(defaultTopicName).create();
+        ProducerBuilder<byte[]> producerBuilder = client.newProducer();
+        if (producerConfig != null) {
+            producerBuilder = producerBuilder.loadConf(producerConfig);
+        }
+        return producerBuilder.topic(defaultTopicName).create();
     }
 
     /**
@@ -194,10 +206,7 @@ public class FlinkPulsarProducer<IN>
      */
     @Override
     public void open(Configuration parameters) throws Exception {
-        if (producer == null) {
-            // If no custom producer was specified create a default one
-            this.producer = createProducer();
-        }
+        this.producer = createProducer();
 
         RuntimeContext ctx = getRuntimeContext();
 

Reply via email to