This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4101168 [Flink] Allow to customize PulsarProducer (#3955)
4101168 is described below
commit 4101168fbf5689af0eac37e1fc03195778f544d5
Author: Cristian <[email protected]>
AuthorDate: Mon Apr 1 21:15:44 2019 -0700
[Flink] Allow to customize PulsarProducer (#3955)
This is an improvement over #3894.
Because of how Flink instantiates functions, instead of passing a
custom `PulsarProducer` client we need to pass an object that is
serializable. The current implementation will default to always
call `createProducer()` because `producer` is `transient`, so it will
always be null when Flink creates new instances of the sink.
---
.../connectors/pulsar/FlinkPulsarProducer.java | 23 +++++++++++++++-------
1 file changed, 16 insertions(+), 7 deletions(-)
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index 04e69f8..c0d3905 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -22,6 +22,7 @@ import static
org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import java.util.function.Function;
+import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
@@ -37,8 +38,10 @@ import
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtract
import org.apache.flink.util.SerializableObject;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,6 +76,11 @@ public class FlinkPulsarProducer<IN>
protected final PulsarKeyExtractor<IN> flinkPulsarKeyExtractor;
/**
+ * {@link Producer} configuration map (will be materialized as a {@link
ProducerConfigurationData} instance)
+ */
+ protected final Map<String, Object> producerConfig;
+
+ /**
* Produce Mode.
*/
protected PulsarProduceMode produceMode = PulsarProduceMode.AT_LEAST_ONCE;
@@ -122,7 +130,7 @@ public class FlinkPulsarProducer<IN>
String defaultTopicName,
SerializationSchema<IN> serializationSchema,
PulsarKeyExtractor<IN> keyExtractor,
- Producer<byte[]> producer) {
+ Map<String, Object> producerConfig) {
checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url cannot
be blank");
checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName
cannot be blank");
this.serviceUrl = serviceUrl;
@@ -130,7 +138,7 @@ public class FlinkPulsarProducer<IN>
this.schema = checkNotNull(serializationSchema, "Serialization Schema
not set");
this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
ClosureCleaner.ensureSerializable(serializationSchema);
- this.producer = producer;
+ this.producerConfig = producerConfig;
}
// ---------------------------------- Properties --------------------------
@@ -183,7 +191,11 @@ public class FlinkPulsarProducer<IN>
private Producer<byte[]> createProducer() throws Exception {
PulsarClient client =
PulsarClient.builder().serviceUrl(serviceUrl).build();
- return client.newProducer().topic(defaultTopicName).create();
+ ProducerBuilder<byte[]> producerBuilder = client.newProducer();
+ if (producerConfig != null) {
+ producerBuilder = producerBuilder.loadConf(producerConfig);
+ }
+ return producerBuilder.topic(defaultTopicName).create();
}
/**
@@ -194,10 +206,7 @@ public class FlinkPulsarProducer<IN>
*/
@Override
public void open(Configuration parameters) throws Exception {
- if (producer == null) {
- // If no custom producer was specified create a default one
- this.producer = createProducer();
- }
+ this.producer = createProducer();
RuntimeContext ctx = getRuntimeContext();