eolivelli commented on a change in pull request #11293:
URL: https://github.com/apache/pulsar/pull/11293#discussion_r668443930



##########
File path: 
pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
##########
@@ -90,6 +95,16 @@ public void open(Map<String, Object> config, SourceContext 
sourceContext) throws
         setConfigIfNull(config, 
PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG,
             topicNamespace + "/" + sourceName + "-" + DEFAULT_OFFSET_TOPIC);
 
+        ClientBuilder clientBuilder = sourceContext.getPulsarClientBuilder();
+        ByteArrayOutputStream bao = new ByteArrayOutputStream();
+        try (ObjectOutputStream oos = new ObjectOutputStream(bao)) {

Review comment:
       what about moving this code to some utility method?

##########
File path: 
pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
##########
@@ -201,4 +202,14 @@
     default PulsarClient getPulsarClient() {
         throw new UnsupportedOperationException("not implemented");
     }
+
+    /**
+     * Get the pulsar client builder.

Review comment:
       can we add something like "This Builder is pre-configured with 
Connection parameters, Authentication credentials and other context values.
   You can use this Builder in order to connect to the local Pulsar cluster.
   The Function will be responsible for disposing any PulsarClient created by 
this builder." 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to