This is an automated email from the ASF dual-hosted git repository. zhaijia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 5ad3e02 ISSUE-5934: Support read/write properties from/to Message in flink pulsar consumer/producer (#5955) 5ad3e02 is described below commit 5ad3e020cf32a0c0a2db79c7471a9bcf7bf1a249 Author: duli559 <554979...@qq.com> AuthorDate: Thu Mar 26 19:00:25 2020 +0800 ISSUE-5934: Support read/write properties from/to Message in flink pulsar consumer/producer (#5955) Fix #5934 Motivation Support read/write properties from/to Message in flink pulsar consumer/producer, and you can override it in your derived class Modifications 1. modify `PulsarConsumerSource.deserialize` access right from 'private' to 'protected'. 2. add method `protected Map<String, String> generateProperties(T value)` in class `FlinkPulsarProducer`, and invoked in `TypedMessageBuilder.properties()` to add it in pulsar Message. * fix unit test failure Co-authored-by: herodu <her...@tencent.com> Co-authored-by: Sijie Guo <si...@apache.org> Co-authored-by: duli <554979...@163.com> --- .../example/PulsarConsumerSourceWordCount.java | 4 ++- .../connectors/pulsar/FlinkPulsarProducer.java | 33 ++++++++++++++++-- .../connectors/pulsar/PulsarAvroTableSink.java | 6 +++- .../connectors/pulsar/PulsarConsumerSource.java | 2 +- .../connectors/pulsar/PulsarTableSink.java | 6 +++- .../partitioner/PulsarPropertiesExtractor.java | 40 ++++++++++++++++++++++ .../connectors/pulsar/PulsarAvroTableSinkTest.java | 5 ++- .../connectors/pulsar/PulsarJsonTableSinkTest.java | 5 ++- 8 files changed, 93 insertions(+), 8 deletions(-) diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java index a323f8f..2240a4d 100644 --- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java +++ b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java @@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer; import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder; +import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; /** @@ -100,7 +101,8 @@ public class PulsarConsumerSourceWordCount { outputTopic, new AuthenticationDisabled(), wordWithCount -> wordWithCount.toString().getBytes(UTF_8), - wordWithCount -> wordWithCount.word + wordWithCount -> wordWithCount.word, + null )).setParallelism(parallelism); } else { // print the results with a single thread, rather than in parallel diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java index 376439d..cf3c657 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java @@ -22,6 +22,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import java.util.function.Function; + import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.SerializationSchema; @@ -33,6 +34,7 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor; +import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor; import org.apache.flink.util.SerializableObject; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.MessageId; @@ -68,6 +70,11 @@ public class FlinkPulsarProducer<T> protected final PulsarKeyExtractor<T> flinkPulsarKeyExtractor; /** + * User-provided properties extractor for assigning a key to a pulsar message. + */ + protected final PulsarPropertiesExtractor<T> flinkPulsarPropertiesExtractor; + + /** * Produce Mode. */ protected PulsarProduceMode produceMode = PulsarProduceMode.AT_LEAST_ONCE; @@ -110,7 +117,8 @@ public class FlinkPulsarProducer<T> String defaultTopicName, Authentication authentication, SerializationSchema<T> serializationSchema, - PulsarKeyExtractor<T> keyExtractor) { + PulsarKeyExtractor<T> keyExtractor, + PulsarPropertiesExtractor<T> propertiesExtractor) { checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url cannot be blank"); checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName cannot be blank"); checkNotNull(authentication, "auth cannot be null, set disabled for no auth"); @@ -123,17 +131,20 @@ public class FlinkPulsarProducer<T> this.producerConf.setTopicName(defaultTopicName); this.schema = checkNotNull(serializationSchema, "Serialization Schema not set"); this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor); + this.flinkPulsarPropertiesExtractor = getOrNullPropertiesExtractor(propertiesExtractor); ClosureCleaner.ensureSerializable(serializationSchema); } public FlinkPulsarProducer(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData, SerializationSchema<T> serializationSchema, - PulsarKeyExtractor<T> keyExtractor) { + PulsarKeyExtractor<T> keyExtractor, + PulsarPropertiesExtractor<T> propertiesExtractor) { this.clientConf = checkNotNull(clientConfigurationData, "client conf can not be null"); this.producerConf = checkNotNull(producerConfigurationData, "producer conf can not be null"); this.schema = checkNotNull(serializationSchema, "Serialization Schema not set"); this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor); + this.flinkPulsarPropertiesExtractor = getOrNullPropertiesExtractor(propertiesExtractor); ClosureCleaner.ensureSerializable(serializationSchema); } @@ -148,6 +159,13 @@ public class FlinkPulsarProducer<T> } /** + * @return pulsar properties extractor. + */ + public PulsarPropertiesExtractor<T> getPulsarPropertiesExtractor() { + return flinkPulsarPropertiesExtractor; + } + + /** * Gets this producer's operating mode. */ public PulsarProduceMode getProduceMode() { @@ -185,6 +203,16 @@ public class FlinkPulsarProducer<T> } } + @SuppressWarnings("unchecked") + private static <T> PulsarPropertiesExtractor<T> getOrNullPropertiesExtractor( + PulsarPropertiesExtractor<T> extractor) { + if (null == extractor) { + return PulsarPropertiesExtractor.EMPTY; + } else { + return extractor; + } + } + private Producer<byte[]> createProducer() throws Exception { PulsarClientImpl client = CachedPulsarClient.getOrCreate(clientConf); return client.createProducerAsync(producerConf).get(); @@ -257,6 +285,7 @@ public class FlinkPulsarProducer<T> } } msgBuilder.value(serializedValue) + .properties(this.flinkPulsarPropertiesExtractor.getProperties(value)) .sendAsync() .thenApply(successCallback) .exceptionally(failureCallback); diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java index d4602d9..51b3572 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java @@ -34,6 +34,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.formats.avro.AvroRowSerializationSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor; +import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor; import org.apache.flink.table.sinks.AppendStreamTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.types.Row; @@ -53,6 +54,7 @@ public class PulsarAvroTableSink implements AppendStreamTableSink<Row> { protected String[] fieldNames; protected TypeInformation[] fieldTypes; protected PulsarKeyExtractor<Row> keyExtractor; + protected PulsarPropertiesExtractor<Row> propertiesExtractor; private Class<? extends SpecificRecord> recordClazz; /** @@ -106,7 +108,8 @@ public class PulsarAvroTableSink implements AppendStreamTableSink<Row> { clientConfigurationData, producerConfigurationData, serializationSchema, - keyExtractor); + keyExtractor, + propertiesExtractor); } @Override @@ -151,6 +154,7 @@ public class PulsarAvroTableSink implements AppendStreamTableSink<Row> { fieldNames, fieldTypes, recordClazz); + sink.propertiesExtractor = PulsarPropertiesExtractor.EMPTY; return sink; } diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java index 9606dfc..9570f17 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java @@ -164,7 +164,7 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageI } } - private T deserialize(Message message) throws IOException { + protected T deserialize(Message message) throws IOException { return deserializer.deserialize(message.getData()); } diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java index b89b60b..6d7479c 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java @@ -30,6 +30,7 @@ import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor; +import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor; import org.apache.flink.table.sinks.AppendStreamTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.types.Row; @@ -46,6 +47,7 @@ public abstract class PulsarTableSink implements AppendStreamTableSink<Row> { protected ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData(); protected SerializationSchema<Row> serializationSchema; protected PulsarKeyExtractor<Row> keyExtractor; + protected PulsarPropertiesExtractor<Row> propertiesExtractor; protected String[] fieldNames; protected TypeInformation[] fieldTypes; protected final String routingKeyFieldName; @@ -95,7 +97,8 @@ public abstract class PulsarTableSink implements AppendStreamTableSink<Row> { clientConfigurationData, producerConfigurationData, serializationSchema, - keyExtractor); + keyExtractor, + propertiesExtractor); } @Override @@ -141,6 +144,7 @@ public abstract class PulsarTableSink implements AppendStreamTableSink<Row> { routingKeyFieldName, fieldNames, fieldTypes); + sink.propertiesExtractor = PulsarPropertiesExtractor.EMPTY; return sink; } diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarPropertiesExtractor.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarPropertiesExtractor.java new file mode 100644 index 0000000..67e86f3 --- /dev/null +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarPropertiesExtractor.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flink.streaming.connectors.pulsar.partitioner; + +import java.io.Serializable; +import java.util.Collections; +import java.util.Map; + +/** + * Extract message properties from a value or others. + */ +public interface PulsarPropertiesExtractor<T> extends Serializable { + + PulsarPropertiesExtractor EMPTY = in -> Collections.emptyMap(); + + /** + * Retrieve properties from the value or others. + * + * @param in the value to extract a key. + * @return key. + */ + Map<String, String> getProperties(T in); + +} diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java index 83a7549..49b50bb 100644 --- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java +++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.avro.generated.NasaMission; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor; +import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.types.Row; import org.apache.pulsar.client.api.Authentication; @@ -107,12 +108,14 @@ public class PulsarAvroTableSinkTest { Mockito.anyString(), Mockito.any(Authentication.class), Mockito.any(SerializationSchema.class), - Mockito.any(PulsarKeyExtractor.class) + Mockito.any(PulsarKeyExtractor.class), + Mockito.any(PulsarPropertiesExtractor.class) ).thenReturn(producer); FieldUtils.writeField(sink, "fieldNames", fieldNames, true); FieldUtils.writeField(sink, "fieldTypes", typeInformations, true); FieldUtils.writeField(sink, "serializationSchema", Mockito.mock(SerializationSchema.class), true); FieldUtils.writeField(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class), true); + FieldUtils.writeField(sink, "propertiesExtractor", Mockito.mock(PulsarPropertiesExtractor.class), true); return sink; } diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java index 02462b3..d895a5f 100644 --- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java +++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor; +import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.types.Row; import org.apache.pulsar.client.api.Authentication; @@ -103,13 +104,15 @@ public class PulsarJsonTableSinkTest { Mockito.anyString(), Mockito.any(Authentication.class), Mockito.any(SerializationSchema.class), - Mockito.any(PulsarKeyExtractor.class) + Mockito.any(PulsarKeyExtractor.class), + Mockito.any(PulsarPropertiesExtractor.class) ).thenReturn(producer); FieldUtils.writeField(sink, "fieldNames", fieldNames, true); FieldUtils.writeField(sink, "fieldTypes", typeInformations, true); FieldUtils.writeField(sink, "serializationSchema", Mockito.mock(SerializationSchema.class), true); FieldUtils.writeField(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class), true); + FieldUtils.writeField(sink, "propertiesExtractor", Mockito.mock(PulsarPropertiesExtractor.class), true); return sink; } }