This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3939bce  PIP-85: [pulsar-io] pass pulsar client via context to 
connector (#11056)
3939bce is described below

commit 3939bce49ff7bc50f7a83910ff77e88c1b31cc39
Author: Neng Lu <[email protected]>
AuthorDate: Fri Jul 2 11:53:49 2021 -0700

    PIP-85: [pulsar-io] pass pulsar client via context to connector (#11056)
    
    ### Motivation
    
    Fixes #8668
    
    ### Modifications
    
    Expose `PulsarClient` via `BaseContext`, and allow connectors to use the 
inherited pulsar client from function worker to produce/consume messages.
    
    ### Verifying this change
    
    - [ ] Make sure that the change passes the CI checks.
    
    This change is already covered by existing tests, such as:
    - PulsarOffsetBackingStoreTest
    - KafkaConnectSourceTest
    - KafkaConnectSinkTest
    
    ### Does this pull request potentially affect one of the following parts:
    
    
      - The public API: `SourceContext` and `SinkContext` need to implement the 
`getPulsarClient` method
---
 .../apache/pulsar/functions/api/BaseContext.java   | 10 +++
 .../pulsar/functions/instance/ContextImpl.java     |  5 ++
 .../apache/pulsar/io/common/IOConfigUtilsTest.java | 11 ++++
 .../apache/pulsar/io/debezium/DebeziumSource.java  |  7 ---
 .../resources/debezium-mongodb-source-config.yaml  |  3 -
 .../resources/debezium-mysql-source-config.yaml    |  2 -
 .../resources/debezium-postgres-source-config.yaml |  2 -
 .../kafka/connect/AbstractKafkaConnectSource.java  |  2 +-
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  |  2 -
 .../connect/PulsarKafkaConnectSinkConfig.java      | 12 +---
 .../kafka/connect/PulsarKafkaSinkTaskContext.java  |  2 +-
 .../io/kafka/connect/PulsarKafkaWorkerConfig.java  | 11 ----
 .../io/kafka/connect/PulsarOffsetBackingStore.java | 25 +++-----
 .../io/kafka/connect/KafkaConnectSinkTest.java     | 71 ++++++++++++----------
 .../io/kafka/connect/KafkaConnectSourceTest.java   | 17 +++++-
 .../connect/PulsarOffsetBackingStoreTest.java      | 10 ++-
 .../io/kafka/sink/KafkaAbstractSinkTest.java       |  6 ++
 17 files changed, 105 insertions(+), 93 deletions(-)

diff --git 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
index 5105df7..37184a4 100644
--- 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
+++ 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.api;
 
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
 import org.slf4j.Logger;
@@ -191,4 +192,13 @@ public interface BaseContext {
      * @param value The value of the metric
      */
     void recordMetric(String metricName, double value);
+
+    /**
+     * Get the pulsar client.
+     *
+     * @return the instance of pulsar client
+     */
+    default PulsarClient getPulsarClient() {
+        throw new UnsupportedOperationException("not implemented");
+    }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 3170236..afd57e5 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -487,6 +487,11 @@ class ContextImpl implements Context, SinkContext, 
SourceContext, AutoCloseable
         }
     }
 
+    @Override
+    public PulsarClient getPulsarClient() {
+        return client;
+    }
+
     private <O> Producer<O> getProducer(String topicName, Schema<O> schema) 
throws PulsarClientException {
         Producer<O> producer;
         if (tlPublishProducers != null) {
diff --git 
a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
 
b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
index bdb43a0..e2ec491 100644
--- 
a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
+++ 
b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.io.common;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
@@ -203,6 +204,11 @@ public class IOConfigUtilsTest {
         public <O> ConsumerBuilder<O> newConsumerBuilder(Schema<O> schema) 
throws PulsarClientException {
             return null;
         }
+
+        @Override
+        public PulsarClient getPulsarClient() {
+            return null;
+        }
     }
 
     @Test
@@ -351,6 +357,11 @@ public class IOConfigUtilsTest {
         public CompletableFuture<Void> deleteStateAsync(String key) {
                return null;
         }
+
+        @Override
+        public PulsarClient getPulsarClient() {
+            return null;
+        }
     }
 
     @Test
diff --git 
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
 
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
index 3da0d98..4f7cc85 100644
--- 
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
+++ 
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
@@ -78,13 +78,6 @@ public abstract class DebeziumSource extends 
KafkaConnectSource {
         // database.history : implementation class for database history.
         setConfigIfNull(config, 
HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name(), 
DEFAULT_HISTORY);
 
-        // database.history.pulsar.service.url, this is set as the value of 
pulsar.service.url if null.
-        String serviceUrl = (String) 
config.get(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG);
-        if (serviceUrl == null) {
-            throw new IllegalArgumentException("Pulsar service URL not 
provided.");
-        }
-        setConfigIfNull(config, PulsarDatabaseHistory.SERVICE_URL.name(), 
serviceUrl);
-
         String topicNamespace = topicNamespace(sourceContext);
         // topic.namespace
         setConfigIfNull(config, 
PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, topicNamespace);
diff --git 
a/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml
 
b/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml
index af73516..6350e20 100644
--- 
a/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml
+++ 
b/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml
@@ -33,6 +33,3 @@ configs:
   mongodb.password: "dbz"
   mongodb.task.id: "1"
   database.whitelist: "inventory"
-
-  ## PULSAR_SERVICE_URL_CONFIG
-  pulsar.service.url: "pulsar://127.0.0.1:6650"
\ No newline at end of file
diff --git 
a/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml 
b/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml
index 7056bc1..f581700 100644
--- 
a/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml
+++ 
b/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml
@@ -35,8 +35,6 @@ configs:
   database.server.name: "dbserver1"
   database.whitelist: "inventory"
 
-  ## PULSAR_SERVICE_URL_CONFIG
-  pulsar.service.url: "pulsar://127.0.0.1:6650"
   database.history.pulsar.topic: "mysql-history-topic"
   offset.storage.topic: "mysql-offset-topic"
 
diff --git 
a/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml
 
b/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml
index e24f2e1..151b409 100644
--- 
a/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml
+++ 
b/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml
@@ -35,7 +35,5 @@ configs:
   database.server.name: "dbserver1"
   schema.whitelist: "inventory"
 
-  ## PULSAR_SERVICE_URL_CONFIG
-  pulsar.service.url: "pulsar://127.0.0.1:6650"
 
 
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
index 783451a..987f6f6 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
@@ -115,7 +115,7 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
         keyConverter.configure(config, true);
         valueConverter.configure(config, false);
 
-        offsetStore = new PulsarOffsetBackingStore();
+        offsetStore = new 
PulsarOffsetBackingStore(sourceContext.getPulsarClient());
         PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new 
PulsarKafkaWorkerConfig(stringConfig);
         offsetStore.configure(pulsarKafkaWorkerConfig);
         offsetStore.start();
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index cdd6c04b..8b60f54 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -55,7 +55,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG;
-import static 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG;
 
 @Slf4j
 public class KafkaConnectSink implements Sink<GenericObject> {
@@ -156,7 +155,6 @@ public class KafkaConnectSink implements 
Sink<GenericObject> {
 
         configs.forEach(x -> {
             x.put(OFFSET_STORAGE_TOPIC_CONFIG, 
kafkaSinkConfig.getOffsetStorageTopic());
-            x.put(PULSAR_SERVICE_URL_CONFIG, 
kafkaSinkConfig.getPulsarServiceUrl());
         });
         task = (SinkTask) taskClass.getConstructor().newInstance();
         taskContext =
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
index 3037bfe..7e83ff6 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
@@ -69,16 +69,6 @@ public class PulsarKafkaConnectSinkConfig implements 
Serializable {
             help = "Pulsar topic to store offsets at.")
     private String offsetStorageTopic;
 
-    /*
-    This is used to configure PulsarOffsetBackingStore.
-    It will become unnecessary after the PulsarClient is exposed to the 
context.
-    */
-    @FieldDoc(
-            required = true,
-            defaultValue = "",
-            help = "Pulsar service URL to use for the offset store.")
-    private String pulsarServiceUrl;
-
     @FieldDoc(
             defaultValue = "true",
             help = "In case of Record<KeyValue<>> data use key from KeyValue<> 
instead of one from Record.")
@@ -93,4 +83,4 @@ public class PulsarKafkaConnectSinkConfig implements 
Serializable {
         ObjectMapper mapper = new ObjectMapper();
         return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
PulsarKafkaConnectSinkConfig.class);
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
index f06329f..b04ca16 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
@@ -67,7 +67,7 @@ public class PulsarKafkaSinkTaskContext implements 
SinkTaskContext {
         this.config = config;
         this.ctx = ctx;
 
-        offsetStore = new PulsarOffsetBackingStore();
+        offsetStore = new PulsarOffsetBackingStore(ctx.getPulsarClient());
         PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new 
PulsarKafkaWorkerConfig(config);
         offsetStore.configure(pulsarKafkaWorkerConfig);
         offsetStore.start();
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
index 624c59a..a6cc725 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
@@ -37,13 +37,6 @@ public class PulsarKafkaWorkerConfig extends WorkerConfig {
     public static final String OFFSET_STORAGE_TOPIC_CONFIG = 
"offset.storage.topic";
     private static final String OFFSET_STORAGE_TOPIC_CONFIG_DOC = "pulsar 
topic to store kafka connector offsets in";
 
-
-    /**
-     * <code>pulsar.service.url</code>
-     */
-    public static final String PULSAR_SERVICE_URL_CONFIG = 
"pulsar.service.url";
-    private static final String PULSAR_SERVICE_URL_CONFIG_DOC = "pulsar 
service url";
-
     /**
      * <code>topic.namespace</code>
      */
@@ -56,10 +49,6 @@ public class PulsarKafkaWorkerConfig extends WorkerConfig {
                 Type.STRING,
                 Importance.HIGH,
                 OFFSET_STORAGE_TOPIC_CONFIG_DOC)
-            .define(PULSAR_SERVICE_URL_CONFIG,
-                Type.STRING,
-                Importance.HIGH,
-                PULSAR_SERVICE_URL_CONFIG_DOC)
             .define(TOPIC_NAMESPACE_CONFIG,
                 Type.STRING,
                 "public/default",
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
index e616e84..a248c7a 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
@@ -57,13 +57,15 @@ public class PulsarOffsetBackingStore implements 
OffsetBackingStore {
     private Reader<byte[]> reader;
     private volatile CompletableFuture<Void> outstandingReadToEnd = null;
 
+    public PulsarOffsetBackingStore(PulsarClient client) {
+        checkArgument(client != null, "Pulsar Client must be provided");
+        this.client = client;
+    }
+
     @Override
     public void configure(WorkerConfig workerConfig) {
         this.topic = 
workerConfig.getString(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG);
         checkArgument(!isBlank(topic), "Offset storage topic must be 
specified");
-        this.serviceUrl = 
workerConfig.getString(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG);
-        checkArgument(!isBlank(serviceUrl), "Pulsar service url must be 
specified at `"
-            + WorkerConfig.BOOTSTRAP_SERVERS_CONFIG + "`");
         this.data = new HashMap<>();
 
         log.info("Configure offset backing store on pulsar topic {} at cluster 
{}",
@@ -136,25 +138,23 @@ public class PulsarOffsetBackingStore implements 
OffsetBackingStore {
     @Override
     public void start() {
         try {
-            client = PulsarClient.builder()
-                .serviceUrl(serviceUrl)
-                .build();
-            log.info("Successfully created pulsar client to {}", serviceUrl);
             producer = client.newProducer(Schema.BYTES)
                 .topic(topic)
                 .create();
             log.info("Successfully created producer to produce updates to 
topic {}", topic);
+
             reader = client.newReader(Schema.BYTES)
                     .topic(topic)
                     .startMessageId(MessageId.earliest)
                 .create();
             log.info("Successfully created reader to replay updates from topic 
{}", topic);
+
             CompletableFuture<Void> endFuture = new CompletableFuture<>();
             readToEnd(endFuture);
             endFuture.join();
         } catch (PulsarClientException e) {
-            log.error("Failed to create pulsar client to cluster at {}", 
serviceUrl, e);
-            throw new RuntimeException("Failed to create pulsar client to 
cluster at " + serviceUrl, e);
+            log.error("Failed to setup pulsar producer/reader to cluster at 
{}", serviceUrl, e);
+            throw new RuntimeException("Failed to setup pulsar producer/reader 
to cluster at " + serviceUrl, e);
         }
     }
 
@@ -174,13 +174,6 @@ public class PulsarOffsetBackingStore implements 
OffsetBackingStore {
                 log.warn("Failed to close reader", e);
             }
         }
-        if (null != client) {
-            try {
-                client.close();
-            } catch (IOException e) {
-                log.warn("Failed to close client", e);
-            }
-        }
     }
 
     @Override
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 6f4f954..2bce8f5 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.schema.GenericObject;
@@ -73,6 +74,8 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase  {
 
     private Path file;
     private Map<String, Object> props;
+    private SinkContext context;
+    private PulsarClient client;
 
     @BeforeMethod
     @Override
@@ -84,13 +87,19 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase  {
 
         props = Maps.newHashMap();
         props.put("topic", "test-topic");
-        props.put("pulsarServiceUrl", brokerUrl.toString());
         props.put("offsetStorageTopic", offsetTopicName);
         props.put("kafkaConnectorSinkClass", 
"org.apache.kafka.connect.file.FileStreamSinkConnector");
 
         Map<String, String> kafkaConnectorProps = Maps.newHashMap();
         kafkaConnectorProps.put("file", file.toString());
         props.put("kafkaConnectorConfigProperties", kafkaConnectorProps);
+
+        this.context = mock(SinkContext.class);
+        this.client = PulsarClient.builder()
+                .serviceUrl(brokerUrl.toString())
+                .build();
+        
when(context.getSubscriptionType()).thenReturn(SubscriptionType.Failover);
+        when(context.getPulsarClient()).thenReturn(client);
     }
 
     @AfterMethod(alwaysRun = true)
@@ -100,15 +109,17 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase  {
             Files.delete(file);
         }
 
+        if (this.client != null) {
+            client.close();
+        }
+
         super.internalCleanup();
     }
 
     @Test
     public void smokeTest() throws Exception {
         KafkaConnectSink sink = new KafkaConnectSink();
-        SinkContext mockCtx = Mockito.mock(SinkContext.class);
-        
when(mockCtx.getSubscriptionType()).thenReturn(SubscriptionType.Failover);
-        sink.open(props, mockCtx);
+        sink.open(props, context);
 
         final GenericRecord rec = getGenericRecord("value", Schema.STRING);
         Message msg = mock(MessageImpl.class);
@@ -138,9 +149,7 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase  {
     @Test
     public void seekPauseResumeTest() throws Exception {
         KafkaConnectSink sink = new KafkaConnectSink();
-        SinkContext mockCtx = Mockito.mock(SinkContext.class);
-        
when(mockCtx.getSubscriptionType()).thenReturn(SubscriptionType.Failover);
-        sink.open(props, mockCtx);
+        sink.open(props, context);
 
         final GenericRecord rec = getGenericRecord("value", Schema.STRING);
         Message msg = mock(MessageImpl.class);
@@ -167,13 +176,13 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase  {
         assertEquals(MessageIdUtils.getOffset(msgId), 
sink.currentOffset(tp.topic(), tp.partition()));
 
         sink.taskContext.offset(tp, 0);
-        verify(mockCtx, times(1)).seek(Mockito.anyString(), Mockito.anyInt(), 
any());
+        verify(context, times(1)).seek(Mockito.anyString(), Mockito.anyInt(), 
any());
         assertEquals(0, sink.currentOffset(tp.topic(), tp.partition()));
 
         sink.taskContext.pause(tp);
-        verify(mockCtx, times(1)).pause(tp.topic(), tp.partition());
+        verify(context, times(1)).pause(tp.topic(), tp.partition());
         sink.taskContext.resume(tp);
-        verify(mockCtx, times(1)).resume(tp.topic(), tp.partition());
+        verify(context, times(1)).resume(tp.topic(), tp.partition());
 
         sink.close();
     }
@@ -181,41 +190,39 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase  {
 
     @Test
     public void subscriptionTypeTest() throws Exception {
-        SinkContext mockCtx = Mockito.mock(SinkContext.class);
-        
when(mockCtx.getSubscriptionType()).thenReturn(SubscriptionType.Exclusive);
         try (KafkaConnectSink sink = new KafkaConnectSink()) {
-            log.info("Exclusive is allowed");
-            sink.open(props, mockCtx);
+            log.info("Failover is allowed");
+            sink.open(props, context);
         }
 
-        
when(mockCtx.getSubscriptionType()).thenReturn(SubscriptionType.Failover);
+        
when(context.getSubscriptionType()).thenReturn(SubscriptionType.Exclusive);
         try (KafkaConnectSink sink = new KafkaConnectSink()) {
-            log.info("Failover is allowed");
-            sink.open(props, mockCtx);
+            log.info("Exclusive is allowed");
+            sink.open(props, context);
         }
 
-        
when(mockCtx.getSubscriptionType()).thenReturn(SubscriptionType.Key_Shared);
+        
when(context.getSubscriptionType()).thenReturn(SubscriptionType.Key_Shared);
         try (KafkaConnectSink sink = new KafkaConnectSink()) {
             log.info("Key_Shared is not allowed");
-            sink.open(props, mockCtx);
+            sink.open(props, context);
             fail("expected exception");
         } catch (IllegalArgumentException iae) {
             // pass
         }
 
-        
when(mockCtx.getSubscriptionType()).thenReturn(SubscriptionType.Shared);
+        
when(context.getSubscriptionType()).thenReturn(SubscriptionType.Shared);
         try (KafkaConnectSink sink = new KafkaConnectSink()) {
             log.info("Shared is not allowed");
-            sink.open(props, mockCtx);
+            sink.open(props, context);
             fail("expected exception");
         } catch (IllegalArgumentException iae) {
             // pass
         }
 
-        when(mockCtx.getSubscriptionType()).thenReturn(null);
+        when(context.getSubscriptionType()).thenReturn(null);
         try (KafkaConnectSink sink = new KafkaConnectSink()) {
             log.info("Type is required");
-            sink.open(props, mockCtx);
+            sink.open(props, context);
             fail("expected exception");
         } catch (IllegalArgumentException iae) {
             // pass
@@ -232,9 +239,7 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase  {
         props.put("kafkaConnectorSinkClass", 
SchemaedFileStreamSinkConnector.class.getCanonicalName());
 
         KafkaConnectSink sink = new KafkaConnectSink();
-        SinkContext mockCtx = Mockito.mock(SinkContext.class);
-        
when(mockCtx.getSubscriptionType()).thenReturn(SubscriptionType.Failover);
-        sink.open(props, mockCtx);
+        sink.open(props, context);
 
         final GenericRecord rec = getGenericRecord(value, schema);
         Message msg = mock(MessageImpl.class);
@@ -357,9 +362,7 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase  {
         props.put("kafkaConnectorSinkClass", 
SchemaedFileStreamSinkConnector.class.getCanonicalName());
 
         KafkaConnectSink sink = new KafkaConnectSink();
-        SinkContext mockCtx = Mockito.mock(SinkContext.class);
-        
when(mockCtx.getSubscriptionType()).thenReturn(SubscriptionType.Failover);
-        sink.open(props, mockCtx);
+        sink.open(props, context);
 
         final GenericRecord rec = getGenericRecord(obj, null);
         Message msg = mock(MessageImpl.class);
@@ -411,9 +414,8 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase  {
                 .build();
 
         KafkaConnectSink sink = new KafkaConnectSink();
-        SinkContext mockCtx = Mockito.mock(SinkContext.class);
-        
when(mockCtx.getSubscriptionType()).thenReturn(SubscriptionType.Exclusive);
-        sink.open(props, mockCtx);
+        
when(context.getSubscriptionType()).thenReturn(SubscriptionType.Exclusive);
+        sink.open(props, context);
 
         // offset is -1 before any data is written (aka no offset)
         assertEquals(-1L, sink.currentOffset(topicName, partition));
@@ -433,7 +435,10 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase  {
 
         // close the producer, open again
         sink = new KafkaConnectSink();
-        sink.open(props, mockCtx);
+        when(context.getPulsarClient()).thenReturn(PulsarClient.builder()
+                .serviceUrl(brokerUrl.toString())
+                .build());
+        sink.open(props, context);
 
         // offset is 1 after reopening the producer
         assertEquals(1, sink.currentOffset(topicName, partition));
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
index 98b2618..26ecf21 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.io.kafka.connect;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -31,8 +33,10 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.connect.file.FileStreamSourceConnector;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SourceContext;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -49,6 +53,8 @@ public class KafkaConnectSourceTest extends 
ProducerConsumerBase  {
     private String topicName;
     private KafkaConnectSource kafkaConnectSource;
     private File tempFile;
+    private SourceContext context;
+    private PulsarClient client;
 
     @BeforeMethod
     @Override
@@ -61,7 +67,6 @@ public class KafkaConnectSourceTest extends 
ProducerConsumerBase  {
         config.put(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.storage.StringConverter");
 
         this.offsetTopicName = 
"persistent://my-property/my-ns/kafka-connect-source-offset";
-        config.put(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG, 
brokerUrl.toString());
         config.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
offsetTopicName);
 
         this.topicName = "persistent://my-property/my-ns/kafka-connect-source";
@@ -70,11 +75,19 @@ public class KafkaConnectSourceTest extends 
ProducerConsumerBase  {
         config.put(FileStreamSourceConnector.FILE_CONFIG, 
tempFile.getAbsoluteFile().toString());
         config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, 
String.valueOf(FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE));
 
+        this.context = mock(SourceContext.class);
+        this.client = PulsarClient.builder()
+                .serviceUrl(brokerUrl.toString())
+                .build();
+        when(context.getPulsarClient()).thenReturn(this.client);
     }
 
     @AfterMethod(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
+        if (this.client != null) {
+            this.client.close();
+        }
         tempFile.delete();
         super.internalCleanup();
     }
@@ -89,7 +102,7 @@ public class KafkaConnectSourceTest extends 
ProducerConsumerBase  {
     @Test
     public void testOpenAndRead() throws Exception {
         kafkaConnectSource = new KafkaConnectSource();
-        kafkaConnectSource.open(config, null);
+        kafkaConnectSource.open(config, context);
 
         // use FileStreamSourceConnector, each line is a record, need "\n" and 
end of each record.
         OutputStream os = Files.newOutputStream(tempFile.toPath());
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
index fc338c7..8d8b754 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
@@ -36,10 +36,13 @@ import java.util.concurrent.atomic.AtomicInteger;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Test the implementation of {@link PulsarOffsetBackingStore}.
  */
@@ -50,6 +53,7 @@ public class PulsarOffsetBackingStoreTest extends 
ProducerConsumerBase {
     private PulsarKafkaWorkerConfig distributedConfig;
     private String topicName;
     private PulsarOffsetBackingStore offsetBackingStore;
+    private PulsarClient client;
 
     @BeforeMethod
     @Override
@@ -58,10 +62,12 @@ public class PulsarOffsetBackingStoreTest extends 
ProducerConsumerBase {
         super.producerBaseSetup();
 
         this.topicName = "persistent://my-property/my-ns/offset-topic";
-        
this.defaultProps.put(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG, 
brokerUrl.toString());
         
this.defaultProps.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
topicName);
         this.distributedConfig = new 
PulsarKafkaWorkerConfig(this.defaultProps);
-        this.offsetBackingStore = new PulsarOffsetBackingStore();
+        this.client = PulsarClient.builder()
+                .serviceUrl(brokerUrl.toString())
+                .build();
+        this.offsetBackingStore = new PulsarOffsetBackingStore(client);
         this.offsetBackingStore.configure(distributedConfig);
         this.offsetBackingStore.start();
     }
diff --git 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
index 5baf233..f5fe46b 100644
--- 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
+++ 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.io.kafka.sink;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.SinkContext;
@@ -167,6 +168,11 @@ public class KafkaAbstractSinkTest {
             public CompletableFuture<Void> deleteStateAsync(String key) {
                return null;
             }
+
+            @Override
+            public PulsarClient getPulsarClient() {
+                return null;
+            }
         };
         ThrowingRunnable openAndClose = ()->{
             try {

Reply via email to