This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ad3e02  ISSUE-5934: Support read/write properties from/to Message in 
flink pulsar consumer/producer (#5955)
5ad3e02 is described below

commit 5ad3e020cf32a0c0a2db79c7471a9bcf7bf1a249
Author: duli559 <554979...@qq.com>
AuthorDate: Thu Mar 26 19:00:25 2020 +0800

    ISSUE-5934: Support read/write properties from/to Message in flink pulsar 
consumer/producer (#5955)
    
    Fix #5934
    
    Motivation
    Support read/write properties from/to Message in flink pulsar 
consumer/producer, and you can override it in your derived class
    
    Modifications
    
    1. modify `PulsarConsumerSource.deserialize` access right from 'private' to 
'protected'.
    2. add method `protected Map<String, String> generateProperties(T value)` 
in class `FlinkPulsarProducer`, and invoked in 
`TypedMessageBuilder.properties()` to add it in pulsar Message.
    
    * fix unit test failure
    
    Co-authored-by: herodu <her...@tencent.com>
    Co-authored-by: Sijie Guo <si...@apache.org>
    Co-authored-by: duli <554979...@163.com>
---
 .../example/PulsarConsumerSourceWordCount.java     |  4 ++-
 .../connectors/pulsar/FlinkPulsarProducer.java     | 33 ++++++++++++++++--
 .../connectors/pulsar/PulsarAvroTableSink.java     |  6 +++-
 .../connectors/pulsar/PulsarConsumerSource.java    |  2 +-
 .../connectors/pulsar/PulsarTableSink.java         |  6 +++-
 .../partitioner/PulsarPropertiesExtractor.java     | 40 ++++++++++++++++++++++
 .../connectors/pulsar/PulsarAvroTableSinkTest.java |  5 ++-
 .../connectors/pulsar/PulsarJsonTableSinkTest.java |  5 ++-
 8 files changed, 93 insertions(+), 8 deletions(-)

diff --git 
a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java
 
b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java
index a323f8f..2240a4d 100644
--- 
a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java
+++ 
b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer;
 import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
+import 
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 
 /**
@@ -100,7 +101,8 @@ public class PulsarConsumerSourceWordCount {
                 outputTopic,
                 new AuthenticationDisabled(),
                 wordWithCount -> wordWithCount.toString().getBytes(UTF_8),
-                wordWithCount -> wordWithCount.word
+                wordWithCount -> wordWithCount.word,
+                null
             )).setParallelism(parallelism);
         } else {
             // print the results with a single thread, rather than in parallel
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index 376439d..cf3c657 100644
--- 
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -22,6 +22,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 import java.util.function.Function;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -33,6 +34,7 @@ import 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import 
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import 
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
 import org.apache.flink.util.SerializableObject;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.MessageId;
@@ -68,6 +70,11 @@ public class FlinkPulsarProducer<T>
     protected final PulsarKeyExtractor<T> flinkPulsarKeyExtractor;
 
     /**
+     * User-provided properties extractor for assigning a key to a pulsar 
message.
+     */
+    protected final PulsarPropertiesExtractor<T> 
flinkPulsarPropertiesExtractor;
+
+    /**
      * Produce Mode.
      */
     protected PulsarProduceMode produceMode = PulsarProduceMode.AT_LEAST_ONCE;
@@ -110,7 +117,8 @@ public class FlinkPulsarProducer<T>
                                String defaultTopicName,
                                Authentication authentication,
                                SerializationSchema<T> serializationSchema,
-                               PulsarKeyExtractor<T> keyExtractor) {
+                               PulsarKeyExtractor<T> keyExtractor,
+                               PulsarPropertiesExtractor<T> 
propertiesExtractor) {
         checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url cannot 
be blank");
         checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName 
cannot be blank");
         checkNotNull(authentication, "auth cannot be null, set disabled for no 
auth");
@@ -123,17 +131,20 @@ public class FlinkPulsarProducer<T>
         this.producerConf.setTopicName(defaultTopicName);
         this.schema = checkNotNull(serializationSchema, "Serialization Schema 
not set");
         this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
+        this.flinkPulsarPropertiesExtractor = 
getOrNullPropertiesExtractor(propertiesExtractor);
         ClosureCleaner.ensureSerializable(serializationSchema);
     }
 
     public FlinkPulsarProducer(ClientConfigurationData clientConfigurationData,
                                ProducerConfigurationData 
producerConfigurationData,
                                SerializationSchema<T> serializationSchema,
-                               PulsarKeyExtractor<T> keyExtractor) {
+                               PulsarKeyExtractor<T> keyExtractor,
+                               PulsarPropertiesExtractor<T> 
propertiesExtractor) {
         this.clientConf = checkNotNull(clientConfigurationData, "client conf 
can not be null");
         this.producerConf = checkNotNull(producerConfigurationData, "producer 
conf can not be null");
         this.schema = checkNotNull(serializationSchema, "Serialization Schema 
not set");
         this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
+        this.flinkPulsarPropertiesExtractor = 
getOrNullPropertiesExtractor(propertiesExtractor);
         ClosureCleaner.ensureSerializable(serializationSchema);
     }
 
@@ -148,6 +159,13 @@ public class FlinkPulsarProducer<T>
     }
 
     /**
+     * @return pulsar properties extractor.
+     */
+    public PulsarPropertiesExtractor<T> getPulsarPropertiesExtractor() {
+        return flinkPulsarPropertiesExtractor;
+    }
+
+    /**
      * Gets this producer's operating mode.
      */
     public PulsarProduceMode getProduceMode() {
@@ -185,6 +203,16 @@ public class FlinkPulsarProducer<T>
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private static <T> PulsarPropertiesExtractor<T> 
getOrNullPropertiesExtractor(
+            PulsarPropertiesExtractor<T> extractor) {
+        if (null == extractor) {
+            return PulsarPropertiesExtractor.EMPTY;
+        } else {
+            return extractor;
+        }
+    }
+
     private Producer<byte[]> createProducer() throws Exception {
         PulsarClientImpl client = CachedPulsarClient.getOrCreate(clientConf);
         return client.createProducerAsync(producerConf).get();
@@ -257,6 +285,7 @@ public class FlinkPulsarProducer<T>
             }
         }
         msgBuilder.value(serializedValue)
+                
.properties(this.flinkPulsarPropertiesExtractor.getProperties(value))
                 .sendAsync()
                 .thenApply(successCallback)
                 .exceptionally(failureCallback);
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
index d4602d9..51b3572 100644
--- 
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.formats.avro.AvroRowSerializationSchema;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import 
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import 
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
@@ -53,6 +54,7 @@ public class PulsarAvroTableSink implements 
AppendStreamTableSink<Row> {
     protected String[] fieldNames;
     protected TypeInformation[] fieldTypes;
     protected PulsarKeyExtractor<Row> keyExtractor;
+    protected PulsarPropertiesExtractor<Row> propertiesExtractor;
     private Class<? extends SpecificRecord> recordClazz;
 
     /**
@@ -106,7 +108,8 @@ public class PulsarAvroTableSink implements 
AppendStreamTableSink<Row> {
                 clientConfigurationData,
                 producerConfigurationData,
                 serializationSchema,
-                keyExtractor);
+                keyExtractor,
+                propertiesExtractor);
     }
 
     @Override
@@ -151,6 +154,7 @@ public class PulsarAvroTableSink implements 
AppendStreamTableSink<Row> {
                 fieldNames,
                 fieldTypes,
                 recordClazz);
+        sink.propertiesExtractor = PulsarPropertiesExtractor.EMPTY;
 
         return sink;
     }
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index 9606dfc..9570f17 100644
--- 
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -164,7 +164,7 @@ class PulsarConsumerSource<T> extends 
MessageAcknowledgingSourceBase<T, MessageI
         }
     }
 
-    private T deserialize(Message message) throws IOException {
+    protected T deserialize(Message message) throws IOException {
         return deserializer.deserialize(message.getData());
     }
 
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
index b89b60b..6d7479c 100644
--- 
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import 
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import 
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
@@ -46,6 +47,7 @@ public abstract class PulsarTableSink implements 
AppendStreamTableSink<Row> {
     protected ProducerConfigurationData producerConfigurationData = new 
ProducerConfigurationData();
     protected SerializationSchema<Row> serializationSchema;
     protected PulsarKeyExtractor<Row> keyExtractor;
+    protected PulsarPropertiesExtractor<Row> propertiesExtractor;
     protected String[] fieldNames;
     protected TypeInformation[] fieldTypes;
     protected final String routingKeyFieldName;
@@ -95,7 +97,8 @@ public abstract class PulsarTableSink implements 
AppendStreamTableSink<Row> {
             clientConfigurationData,
             producerConfigurationData,
             serializationSchema,
-            keyExtractor);
+            keyExtractor,
+            propertiesExtractor);
     }
 
     @Override
@@ -141,6 +144,7 @@ public abstract class PulsarTableSink implements 
AppendStreamTableSink<Row> {
                 routingKeyFieldName,
                 fieldNames,
                 fieldTypes);
+        sink.propertiesExtractor = PulsarPropertiesExtractor.EMPTY;
 
         return sink;
     }
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarPropertiesExtractor.java
 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarPropertiesExtractor.java
new file mode 100644
index 0000000..67e86f3
--- /dev/null
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarPropertiesExtractor.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar.partitioner;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Extract message properties from a value or others.
+ */
+public interface PulsarPropertiesExtractor<T> extends Serializable {
+
+    PulsarPropertiesExtractor EMPTY = in -> Collections.emptyMap();
+
+    /**
+     * Retrieve properties from the value or others.
+     *
+     * @param in the value to extract a key.
+     * @return key.
+     */
+    Map<String, String> getProperties(T in);
+
+}
diff --git 
a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
 
b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
index 83a7549..49b50bb 100644
--- 
a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
+++ 
b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.avro.generated.NasaMission;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import 
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import 
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
 import org.apache.pulsar.client.api.Authentication;
@@ -107,12 +108,14 @@ public class PulsarAvroTableSinkTest {
                 Mockito.anyString(),
                 Mockito.any(Authentication.class),
                 Mockito.any(SerializationSchema.class),
-                Mockito.any(PulsarKeyExtractor.class)
+                Mockito.any(PulsarKeyExtractor.class),
+                Mockito.any(PulsarPropertiesExtractor.class)
         ).thenReturn(producer);
         FieldUtils.writeField(sink, "fieldNames", fieldNames, true);
         FieldUtils.writeField(sink, "fieldTypes", typeInformations, true);
         FieldUtils.writeField(sink, "serializationSchema", 
Mockito.mock(SerializationSchema.class), true);
         FieldUtils.writeField(sink, "keyExtractor", 
Mockito.mock(PulsarKeyExtractor.class), true);
+        FieldUtils.writeField(sink, "propertiesExtractor", 
Mockito.mock(PulsarPropertiesExtractor.class), true);
         return sink;
     }
 
diff --git 
a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
 
b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
index 02462b3..d895a5f 100644
--- 
a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
+++ 
b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import 
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import 
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
 import org.apache.pulsar.client.api.Authentication;
@@ -103,13 +104,15 @@ public class PulsarJsonTableSinkTest {
                 Mockito.anyString(),
                 Mockito.any(Authentication.class),
                 Mockito.any(SerializationSchema.class),
-                Mockito.any(PulsarKeyExtractor.class)
+                Mockito.any(PulsarKeyExtractor.class),
+                Mockito.any(PulsarPropertiesExtractor.class)
         ).thenReturn(producer);
 
         FieldUtils.writeField(sink, "fieldNames", fieldNames, true);
         FieldUtils.writeField(sink, "fieldTypes", typeInformations, true);
         FieldUtils.writeField(sink, "serializationSchema", 
Mockito.mock(SerializationSchema.class), true);
         FieldUtils.writeField(sink, "keyExtractor", 
Mockito.mock(PulsarKeyExtractor.class), true);
+        FieldUtils.writeField(sink, "propertiesExtractor", 
Mockito.mock(PulsarPropertiesExtractor.class), true);
         return sink;
     }
 }

Reply via email to