srkukarni closed pull request #2490: Attach Producer/Consumer property tags so 
its easier to identify topics being produced/consumed by functions
URL: https://github.com/apache/incubator-pulsar/pull/2490
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 4e34911744..cdcad60eaa 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -553,7 +553,8 @@ public void setupInput(ContextImpl contextImpl) throws 
Exception {
             if (sourceSpec.getTimeoutMs() > 0 ) {
                 pulsarSourceConfig.setTimeoutMs(sourceSpec.getTimeoutMs());
             }
-            object = new PulsarSource(this.client, pulsarSourceConfig);
+            object = new PulsarSource(this.client, pulsarSourceConfig,
+                    
FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
         } else {
             object = Reflections.createInstance(
                     sourceSpec.getClassName(),
@@ -599,7 +600,8 @@ public void setupOutput(ContextImpl contextImpl) throws 
Exception {
 
                 pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());
 
-                object = new PulsarSink(this.client, pulsarSinkConfig);
+                object = new PulsarSink(this.client, pulsarSinkConfig,
+                        
FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
             }
         } else {
             object = Reflections.createInstance(
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
index fa47252d1d..8160a1a190 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
@@ -54,23 +54,20 @@
                 .messageRouter(FunctionResultRouter.of());
     }
 
-    protected Producer<T> createProducer(String topic, Schema<T> schema)
+    protected Producer<T> createProducerWithProducerName(String topic, String 
producerName, Schema<T> schema, String fqfn)
             throws PulsarClientException {
-        return createProducer(client, topic, schema);
+        return createProducer(client, topic, producerName, schema, fqfn);
     }
 
-    public static <T> Producer<T> createProducer(PulsarClient client, String 
topic, Schema<T> schema)
+    public static <T> Producer<T> createProducer(PulsarClient client, String 
topic, String producerName, Schema<T> schema, String fqfn)
             throws PulsarClientException {
-        return newProducerBuilder(client, schema).topic(topic).create();
-    }
+        ProducerBuilder<T> builder = newProducerBuilder(client, 
schema).topic(topic);
+        if (producerName != null) {
+            builder.producerName(producerName);
+        }
 
-    protected Producer<T> createProducer(String topic, String producerName, 
Schema<T> schema)
-            throws PulsarClientException {
-        return createProducer(client, topic, schema, producerName);
-    }
-
-    public static <T> Producer<T> createProducer(PulsarClient client, String 
topic, Schema<T> schema, String producerName)
-            throws PulsarClientException {
-        return newProducerBuilder(client, 
schema).topic(topic).producerName(producerName).create();
+        return builder
+                .property("application", "pulsarfunction")
+                .property("fqfn", fqfn).create();
     }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
index f15df42e5e..3994dd1436 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
@@ -42,14 +42,17 @@
     private final Map<String, Producer<T>> producers;
 
     private final Schema<T> schema;
+    private final String fqfn;
 
 
     public MultiConsumersOneOuputTopicProducers(PulsarClient client,
-                                                String outputTopic, Schema<T> 
schema)
+                                                String outputTopic, Schema<T> 
schema,
+                                                String fqfn)
             throws PulsarClientException {
         super(client, outputTopic);
         this.producers = new ConcurrentHashMap<>();
         this.schema = schema;
+        this.fqfn = fqfn;
     }
 
     @Override
@@ -65,7 +68,7 @@ static String makeProducerName(String srcTopicName, String 
srcTopicPartition) {
     public synchronized Producer<T> getProducer(String srcPartitionId) throws 
PulsarClientException {
         Producer<T> producer = producers.get(srcPartitionId);
         if (null == producer) {
-            producer = createProducer(outputTopic, srcPartitionId, schema);
+            producer = createProducerWithProducerName(outputTopic, 
srcPartitionId, schema, fqfn);
             producers.put(srcPartitionId, producer);
         }
         return producer;
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index c3df393592..5ec725cdc4 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -57,9 +57,10 @@
     private PulsarSinkProcessor<T> pulsarSinkProcessor;
 
     private final TopicSchema topicSchema;
+    private final String fqfn;
 
     private interface PulsarSinkProcessor<T> {
-        void initializeOutputProducer(String outputTopic, Schema<T> schema) 
throws Exception;
+        void initializeOutputProducer(String outputTopic, Schema<T> schema, 
String fqfn) throws Exception;
 
         TypedMessageBuilder<T> newMessage(Record<T> record) throws Exception;
 
@@ -72,9 +73,9 @@
         private Producer<T> producer;
 
         @Override
-        public void initializeOutputProducer(String outputTopic, Schema<T> 
schema) throws Exception {
+        public void initializeOutputProducer(String outputTopic, Schema<T> 
schema, String fqfn) throws Exception {
             this.producer = AbstractOneOuputTopicProducers.createProducer(
-                    client, pulsarSinkConfig.getTopic(), schema);
+                    client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
         }
 
         @Override
@@ -103,9 +104,9 @@ public void close() throws Exception {
         private Producer<T> producer;
 
         @Override
-        public void initializeOutputProducer(String outputTopic, Schema<T> 
schema) throws Exception {
+        public void initializeOutputProducer(String outputTopic, Schema<T> 
schema, String fqfn) throws Exception {
             this.producer = AbstractOneOuputTopicProducers.createProducer(
-                    client, pulsarSinkConfig.getTopic(), schema);
+                    client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
         }
 
         @Override
@@ -136,8 +137,8 @@ public void close() throws Exception {
         protected Producers<T> outputProducer;
 
         @Override
-        public void initializeOutputProducer(String outputTopic, Schema<T> 
schema) throws Exception {
-            outputProducer = new 
MultiConsumersOneOuputTopicProducers<T>(client, outputTopic, schema);
+        public void initializeOutputProducer(String outputTopic, Schema<T> 
schema, String fqfn) throws Exception {
+            outputProducer = new 
MultiConsumersOneOuputTopicProducers<T>(client, outputTopic, schema, fqfn);
             outputProducer.initialize();
         }
 
@@ -195,10 +196,11 @@ public void becameInactive(Consumer<?> consumer, int 
partitionId) {
         }
     }
 
-    public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig) {
+    public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, 
String fqfn) {
         this.client = client;
         this.pulsarSinkConfig = pulsarSinkConfig;
         this.topicSchema = new TopicSchema(client);
+        this.fqfn = fqfn;
     }
 
     @Override
@@ -217,7 +219,7 @@ public void open(Map<String, Object> config, SinkContext 
sinkContext) throws Exc
                 this.pulsarSinkProcessor = new 
PulsarSinkEffectivelyOnceProcessor();
                 break;
         }
-        
this.pulsarSinkProcessor.initializeOutputProducer(this.pulsarSinkConfig.getTopic(),
 schema);
+        
this.pulsarSinkProcessor.initializeOutputProducer(this.pulsarSinkConfig.getTopic(),
 schema, fqfn);
     }
 
     @Override
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index f1e5c015ea..244ab70d4a 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -22,10 +22,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -58,11 +55,14 @@
     private List<String> inputTopics;
     private List<Consumer<T>> inputConsumers;
     private final TopicSchema topicSchema;
+    private final String fqfn;
 
-    public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig 
pulsarConfig) {
+    public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig 
pulsarConfig,
+                        String fqfn) {
         this.pulsarClient = pulsarClient;
         this.pulsarSourceConfig = pulsarConfig;
         this.topicSchema = new TopicSchema(pulsarClient);
+        this.fqfn = fqfn;
     }
 
     @Override
@@ -71,6 +71,10 @@ public void open(Map<String, Object> config, SourceContext 
sourceContext) throws
         log.info("Opening pulsar source with config: {}", pulsarSourceConfig);
         Map<String, ConsumerConfig<T>> configs = setupConsumerConfigs();
 
+        Map<String, String> properties = new HashMap<>();
+        properties.put("application", "pulsarfunction");
+        properties.put("fqfn", fqfn);
+
         inputConsumers = configs.entrySet().stream().map(e -> {
             String topic = e.getKey();
             ConsumerConfig<T> conf = e.getValue();
@@ -87,6 +91,7 @@ public void open(Map<String, Object> config, SourceContext 
sourceContext) throws
             } else {
                 cb.topic(topic);
             }
+            cb.properties(properties);
 
             if (pulsarSourceConfig.getTimeoutMs() != null) {
                 cb.ackTimeout(pulsarSourceConfig.getTimeoutMs(), 
TimeUnit.MILLISECONDS);
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
index e5af97d060..89efcf594d 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
@@ -199,7 +199,7 @@ public void setup() throws Exception {
         when(mockClient.newProducer(any(Schema.class)))
             .thenReturn(new MockProducerBuilder());
 
-        producers = new 
MultiConsumersOneOuputTopicProducers<byte[]>(mockClient, TEST_OUTPUT_TOPIC, 
Schema.BYTES);
+        producers = new 
MultiConsumersOneOuputTopicProducers<byte[]>(mockClient, TEST_OUTPUT_TOPIC, 
Schema.BYTES, "test");
         producers.initialize();
     }
 
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 4891991b6b..766dee389c 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -117,7 +117,7 @@ public void testVoidOutputClasses() throws Exception {
         PulsarSinkConfig pulsarConfig = getPulsarConfigs();
         // set type to void
         pulsarConfig.setTypeClassName(Void.class.getName());
-        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig);
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig, "test");
 
         try {
             pulsarSink.initializeSchema();
@@ -134,7 +134,7 @@ public void testInconsistentOutputType() throws IOException 
{
         // set type to be inconsistent to that of SerDe
         pulsarConfig.setTypeClassName(Integer.class.getName());
         pulsarConfig.setSerdeClassName(TestSerDe.class.getName());
-        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig);
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig, "test");
         try {
             pulsarSink.initializeSchema();
             fail("Should fail constructing java instance if function type is 
inconsistent with serde type");
@@ -156,7 +156,7 @@ public void testDefaultSerDe() throws PulsarClientException 
{
         PulsarSinkConfig pulsarConfig = getPulsarConfigs();
         // set type to void
         pulsarConfig.setTypeClassName(String.class.getName());
-        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig);
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig, "test");
 
         try {
             pulsarSink.initializeSchema();
@@ -175,7 +175,7 @@ public void testExplicitDefaultSerDe() throws 
PulsarClientException {
         // set type to void
         pulsarConfig.setTypeClassName(String.class.getName());
         pulsarConfig.setSerdeClassName(TopicSchema.DEFAULT_SERDE);
-        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig);
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig, "test");
 
         try {
             pulsarSink.initializeSchema();
@@ -191,7 +191,7 @@ public void testComplexOuputType() throws 
PulsarClientException {
         // set type to void
         pulsarConfig.setTypeClassName(ComplexUserDefinedType.class.getName());
         pulsarConfig.setSerdeClassName(ComplexSerDe.class.getName());
-        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig);
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig, "test");
 
         try {
             pulsarSink.initializeSchema();
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 7071d58707..60f684fee2 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -127,7 +127,7 @@ public void testVoidInputClasses() throws IOException {
         PulsarSourceConfig pulsarConfig = getPulsarConfigs();
         // set type to void
         pulsarConfig.setTypeClassName(Void.class.getName());
-        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig);
+        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig, "test");
 
         try {
             pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
@@ -153,7 +153,7 @@ public void testInconsistentInputType() throws IOException {
         
topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result",
                 
ConsumerConfig.builder().serdeClassName(TestSerDe.class.getName()).build());
         pulsarConfig.setTopicSchema(topicSerdeClassNameMap);
-        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig);
+        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig, "test");
         try {
             pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
             fail("Should fail constructing java instance if function type is 
inconsistent with serde type");
@@ -178,7 +178,7 @@ public void testDefaultSerDe() throws Exception {
         consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
                 
ConsumerConfig.builder().serdeClassName(TopicSchema.DEFAULT_SERDE).build());
         pulsarConfig.setTopicSchema(consumerConfigs);
-        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig);
+        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig, "test");
 
         pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
     }
@@ -194,7 +194,7 @@ public void testExplicitDefaultSerDe() throws Exception {
         consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
                 
ConsumerConfig.builder().serdeClassName(TopicSchema.DEFAULT_SERDE).build());
         pulsarConfig.setTopicSchema(consumerConfigs);
-        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig);
+        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig, "test");
 
         pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
     }
@@ -207,7 +207,7 @@ public void testComplexOuputType() throws Exception {
         consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
                 
ConsumerConfig.builder().serdeClassName(ComplexSerDe.class.getName()).build());
         pulsarConfig.setTopicSchema(consumerConfigs);
-        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig);
+        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig, "test");
 
         pulsarSource.setupConsumerConfigs();
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to