srkukarni closed pull request #2490: Attach Producer/Consumer property tags so its easier to identify topics being produced/consumed by functions URL: https://github.com/apache/incubator-pulsar/pull/2490
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 4e34911744..cdcad60eaa 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -553,7 +553,8 @@ public void setupInput(ContextImpl contextImpl) throws Exception { if (sourceSpec.getTimeoutMs() > 0 ) { pulsarSourceConfig.setTimeoutMs(sourceSpec.getTimeoutMs()); } - object = new PulsarSource(this.client, pulsarSourceConfig); + object = new PulsarSource(this.client, pulsarSourceConfig, + FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails())); } else { object = Reflections.createInstance( sourceSpec.getClassName(), @@ -599,7 +600,8 @@ public void setupOutput(ContextImpl contextImpl) throws Exception { pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName()); - object = new PulsarSink(this.client, pulsarSinkConfig); + object = new PulsarSink(this.client, pulsarSinkConfig, + FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails())); } } else { object = Reflections.createInstance( diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java index fa47252d1d..8160a1a190 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java @@ -54,23 +54,20 @@ .messageRouter(FunctionResultRouter.of()); } - protected Producer<T> createProducer(String topic, Schema<T> schema) + protected Producer<T> createProducerWithProducerName(String topic, String producerName, Schema<T> schema, String fqfn) throws PulsarClientException { - return createProducer(client, topic, schema); + return createProducer(client, topic, producerName, schema, fqfn); } - public static <T> Producer<T> createProducer(PulsarClient client, String topic, Schema<T> schema) + public static <T> Producer<T> createProducer(PulsarClient client, String topic, String producerName, Schema<T> schema, String fqfn) throws PulsarClientException { - return newProducerBuilder(client, schema).topic(topic).create(); - } + ProducerBuilder<T> builder = newProducerBuilder(client, schema).topic(topic); + if (producerName != null) { + builder.producerName(producerName); + } - protected Producer<T> createProducer(String topic, String producerName, Schema<T> schema) - throws PulsarClientException { - return createProducer(client, topic, schema, producerName); - } - - public static <T> Producer<T> createProducer(PulsarClient client, String topic, Schema<T> schema, String producerName) - throws PulsarClientException { - return newProducerBuilder(client, schema).topic(topic).producerName(producerName).create(); + return builder + .property("application", "pulsarfunction") + .property("fqfn", fqfn).create(); } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java index f15df42e5e..3994dd1436 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java @@ -42,14 +42,17 @@ private final Map<String, Producer<T>> producers; private final Schema<T> schema; + private final String fqfn; public MultiConsumersOneOuputTopicProducers(PulsarClient client, - String outputTopic, Schema<T> schema) + String outputTopic, Schema<T> schema, + String fqfn) throws PulsarClientException { super(client, outputTopic); this.producers = new ConcurrentHashMap<>(); this.schema = schema; + this.fqfn = fqfn; } @Override @@ -65,7 +68,7 @@ static String makeProducerName(String srcTopicName, String srcTopicPartition) { public synchronized Producer<T> getProducer(String srcPartitionId) throws PulsarClientException { Producer<T> producer = producers.get(srcPartitionId); if (null == producer) { - producer = createProducer(outputTopic, srcPartitionId, schema); + producer = createProducerWithProducerName(outputTopic, srcPartitionId, schema, fqfn); producers.put(srcPartitionId, producer); } return producer; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index c3df393592..5ec725cdc4 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -57,9 +57,10 @@ private PulsarSinkProcessor<T> pulsarSinkProcessor; private final TopicSchema topicSchema; + private final String fqfn; private interface PulsarSinkProcessor<T> { - void initializeOutputProducer(String outputTopic, Schema<T> schema) throws Exception; + void initializeOutputProducer(String outputTopic, Schema<T> schema, String fqfn) throws Exception; TypedMessageBuilder<T> newMessage(Record<T> record) throws Exception; @@ -72,9 +73,9 @@ private Producer<T> producer; @Override - public void initializeOutputProducer(String outputTopic, Schema<T> schema) throws Exception { + public void initializeOutputProducer(String outputTopic, Schema<T> schema, String fqfn) throws Exception { this.producer = AbstractOneOuputTopicProducers.createProducer( - client, pulsarSinkConfig.getTopic(), schema); + client, pulsarSinkConfig.getTopic(), null, schema, fqfn); } @Override @@ -103,9 +104,9 @@ public void close() throws Exception { private Producer<T> producer; @Override - public void initializeOutputProducer(String outputTopic, Schema<T> schema) throws Exception { + public void initializeOutputProducer(String outputTopic, Schema<T> schema, String fqfn) throws Exception { this.producer = AbstractOneOuputTopicProducers.createProducer( - client, pulsarSinkConfig.getTopic(), schema); + client, pulsarSinkConfig.getTopic(), null, schema, fqfn); } @Override @@ -136,8 +137,8 @@ public void close() throws Exception { protected Producers<T> outputProducer; @Override - public void initializeOutputProducer(String outputTopic, Schema<T> schema) throws Exception { - outputProducer = new MultiConsumersOneOuputTopicProducers<T>(client, outputTopic, schema); + public void initializeOutputProducer(String outputTopic, Schema<T> schema, String fqfn) throws Exception { + outputProducer = new MultiConsumersOneOuputTopicProducers<T>(client, outputTopic, schema, fqfn); outputProducer.initialize(); } @@ -195,10 +196,11 @@ public void becameInactive(Consumer<?> consumer, int partitionId) { } } - public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig) { + public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, String fqfn) { this.client = client; this.pulsarSinkConfig = pulsarSinkConfig; this.topicSchema = new TopicSchema(client); + this.fqfn = fqfn; } @Override @@ -217,7 +219,7 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc this.pulsarSinkProcessor = new PulsarSinkEffectivelyOnceProcessor(); break; } - this.pulsarSinkProcessor.initializeOutputProducer(this.pulsarSinkConfig.getTopic(), schema); + this.pulsarSinkProcessor.initializeOutputProducer(this.pulsarSinkConfig.getTopic(), schema, fqfn); } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index f1e5c015ea..244ab70d4a 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -22,10 +22,7 @@ import com.google.common.annotations.VisibleForTesting; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -58,11 +55,14 @@ private List<String> inputTopics; private List<Consumer<T>> inputConsumers; private final TopicSchema topicSchema; + private final String fqfn; - public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig) { + public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig, + String fqfn) { this.pulsarClient = pulsarClient; this.pulsarSourceConfig = pulsarConfig; this.topicSchema = new TopicSchema(pulsarClient); + this.fqfn = fqfn; } @Override @@ -71,6 +71,10 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws log.info("Opening pulsar source with config: {}", pulsarSourceConfig); Map<String, ConsumerConfig<T>> configs = setupConsumerConfigs(); + Map<String, String> properties = new HashMap<>(); + properties.put("application", "pulsarfunction"); + properties.put("fqfn", fqfn); + inputConsumers = configs.entrySet().stream().map(e -> { String topic = e.getKey(); ConsumerConfig<T> conf = e.getValue(); @@ -87,6 +91,7 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws } else { cb.topic(topic); } + cb.properties(properties); if (pulsarSourceConfig.getTimeoutMs() != null) { cb.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java index e5af97d060..89efcf594d 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java @@ -199,7 +199,7 @@ public void setup() throws Exception { when(mockClient.newProducer(any(Schema.class))) .thenReturn(new MockProducerBuilder()); - producers = new MultiConsumersOneOuputTopicProducers<byte[]>(mockClient, TEST_OUTPUT_TOPIC, Schema.BYTES); + producers = new MultiConsumersOneOuputTopicProducers<byte[]>(mockClient, TEST_OUTPUT_TOPIC, Schema.BYTES, "test"); producers.initialize(); } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java index 4891991b6b..766dee389c 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java @@ -117,7 +117,7 @@ public void testVoidOutputClasses() throws Exception { PulsarSinkConfig pulsarConfig = getPulsarConfigs(); // set type to void pulsarConfig.setTypeClassName(Void.class.getName()); - PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test"); try { pulsarSink.initializeSchema(); @@ -134,7 +134,7 @@ public void testInconsistentOutputType() throws IOException { // set type to be inconsistent to that of SerDe pulsarConfig.setTypeClassName(Integer.class.getName()); pulsarConfig.setSerdeClassName(TestSerDe.class.getName()); - PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test"); try { pulsarSink.initializeSchema(); fail("Should fail constructing java instance if function type is inconsistent with serde type"); @@ -156,7 +156,7 @@ public void testDefaultSerDe() throws PulsarClientException { PulsarSinkConfig pulsarConfig = getPulsarConfigs(); // set type to void pulsarConfig.setTypeClassName(String.class.getName()); - PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test"); try { pulsarSink.initializeSchema(); @@ -175,7 +175,7 @@ public void testExplicitDefaultSerDe() throws PulsarClientException { // set type to void pulsarConfig.setTypeClassName(String.class.getName()); pulsarConfig.setSerdeClassName(TopicSchema.DEFAULT_SERDE); - PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test"); try { pulsarSink.initializeSchema(); @@ -191,7 +191,7 @@ public void testComplexOuputType() throws PulsarClientException { // set type to void pulsarConfig.setTypeClassName(ComplexUserDefinedType.class.getName()); pulsarConfig.setSerdeClassName(ComplexSerDe.class.getName()); - PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test"); try { pulsarSink.initializeSchema(); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java index 7071d58707..60f684fee2 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java @@ -127,7 +127,7 @@ public void testVoidInputClasses() throws IOException { PulsarSourceConfig pulsarConfig = getPulsarConfigs(); // set type to void pulsarConfig.setTypeClassName(Void.class.getName()); - PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); + PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test"); try { pulsarSource.open(new HashMap<>(), mock(SourceContext.class)); @@ -153,7 +153,7 @@ public void testInconsistentInputType() throws IOException { topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result", ConsumerConfig.builder().serdeClassName(TestSerDe.class.getName()).build()); pulsarConfig.setTopicSchema(topicSerdeClassNameMap); - PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); + PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test"); try { pulsarSource.open(new HashMap<>(), mock(SourceContext.class)); fail("Should fail constructing java instance if function type is inconsistent with serde type"); @@ -178,7 +178,7 @@ public void testDefaultSerDe() throws Exception { consumerConfigs.put("persistent://sample/standalone/ns1/test_result", ConsumerConfig.builder().serdeClassName(TopicSchema.DEFAULT_SERDE).build()); pulsarConfig.setTopicSchema(consumerConfigs); - PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); + PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test"); pulsarSource.open(new HashMap<>(), mock(SourceContext.class)); } @@ -194,7 +194,7 @@ public void testExplicitDefaultSerDe() throws Exception { consumerConfigs.put("persistent://sample/standalone/ns1/test_result", ConsumerConfig.builder().serdeClassName(TopicSchema.DEFAULT_SERDE).build()); pulsarConfig.setTopicSchema(consumerConfigs); - PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); + PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test"); pulsarSource.open(new HashMap<>(), mock(SourceContext.class)); } @@ -207,7 +207,7 @@ public void testComplexOuputType() throws Exception { consumerConfigs.put("persistent://sample/standalone/ns1/test_result", ConsumerConfig.builder().serdeClassName(ComplexSerDe.class.getName()).build()); pulsarConfig.setTopicSchema(consumerConfigs); - PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); + PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test"); pulsarSource.setupConsumerConfigs(); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services