This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3939bce PIP-85: [pulsar-io] pass pulsar client via context to
connector (#11056)
3939bce is described below
commit 3939bce49ff7bc50f7a83910ff77e88c1b31cc39
Author: Neng Lu <[email protected]>
AuthorDate: Fri Jul 2 11:53:49 2021 -0700
PIP-85: [pulsar-io] pass pulsar client via context to connector (#11056)
### Motivation
Fixes #8668
### Modifications
Expose `PulsarClient` via `BaseContext`, and allow connectors to use the
inherited pulsar client from function worker to produce/consume messages.
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
This change is already covered by existing tests, such as:
- PulsarOffsetBackingStoreTest
- KafkaConnectSourceTest
- KafkaConnectSinkTest
### Does this pull request potentially affect one of the following parts:
- The public API: `SourceContext` and `SinkContext` need to implement the
`getPulsarClient` method
---
.../apache/pulsar/functions/api/BaseContext.java | 10 +++
.../pulsar/functions/instance/ContextImpl.java | 5 ++
.../apache/pulsar/io/common/IOConfigUtilsTest.java | 11 ++++
.../apache/pulsar/io/debezium/DebeziumSource.java | 7 ---
.../resources/debezium-mongodb-source-config.yaml | 3 -
.../resources/debezium-mysql-source-config.yaml | 2 -
.../resources/debezium-postgres-source-config.yaml | 2 -
.../kafka/connect/AbstractKafkaConnectSource.java | 2 +-
.../pulsar/io/kafka/connect/KafkaConnectSink.java | 2 -
.../connect/PulsarKafkaConnectSinkConfig.java | 12 +---
.../kafka/connect/PulsarKafkaSinkTaskContext.java | 2 +-
.../io/kafka/connect/PulsarKafkaWorkerConfig.java | 11 ----
.../io/kafka/connect/PulsarOffsetBackingStore.java | 25 +++-----
.../io/kafka/connect/KafkaConnectSinkTest.java | 71 ++++++++++++----------
.../io/kafka/connect/KafkaConnectSourceTest.java | 17 +++++-
.../connect/PulsarOffsetBackingStoreTest.java | 10 ++-
.../io/kafka/sink/KafkaAbstractSinkTest.java | 6 ++
17 files changed, 105 insertions(+), 93 deletions(-)
diff --git
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
index 5105df7..37184a4 100644
---
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
+++
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.api;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.slf4j.Logger;
@@ -191,4 +192,13 @@ public interface BaseContext {
* @param value The value of the metric
*/
void recordMetric(String metricName, double value);
+
+ /**
+ * Get the pulsar client.
+ *
+ * @return the instance of pulsar client
+ */
+ default PulsarClient getPulsarClient() {
+ throw new UnsupportedOperationException("not implemented");
+ }
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 3170236..afd57e5 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -487,6 +487,11 @@ class ContextImpl implements Context, SinkContext,
SourceContext, AutoCloseable
}
}
+ @Override
+ public PulsarClient getPulsarClient() {
+ return client;
+ }
+
private <O> Producer<O> getProducer(String topicName, Schema<O> schema)
throws PulsarClientException {
Producer<O> producer;
if (tlPublishProducers != null) {
diff --git
a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
index bdb43a0..e2ec491 100644
---
a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
+++
b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.io.common;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
@@ -203,6 +204,11 @@ public class IOConfigUtilsTest {
public <O> ConsumerBuilder<O> newConsumerBuilder(Schema<O> schema)
throws PulsarClientException {
return null;
}
+
+ @Override
+ public PulsarClient getPulsarClient() {
+ return null;
+ }
}
@Test
@@ -351,6 +357,11 @@ public class IOConfigUtilsTest {
public CompletableFuture<Void> deleteStateAsync(String key) {
return null;
}
+
+ @Override
+ public PulsarClient getPulsarClient() {
+ return null;
+ }
}
@Test
diff --git
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
index 3da0d98..4f7cc85 100644
---
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
+++
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
@@ -78,13 +78,6 @@ public abstract class DebeziumSource extends
KafkaConnectSource {
// database.history : implementation class for database history.
setConfigIfNull(config,
HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name(),
DEFAULT_HISTORY);
- // database.history.pulsar.service.url, this is set as the value of
pulsar.service.url if null.
- String serviceUrl = (String)
config.get(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG);
- if (serviceUrl == null) {
- throw new IllegalArgumentException("Pulsar service URL not
provided.");
- }
- setConfigIfNull(config, PulsarDatabaseHistory.SERVICE_URL.name(),
serviceUrl);
-
String topicNamespace = topicNamespace(sourceContext);
// topic.namespace
setConfigIfNull(config,
PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, topicNamespace);
diff --git
a/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml
b/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml
index af73516..6350e20 100644
---
a/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml
+++
b/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml
@@ -33,6 +33,3 @@ configs:
mongodb.password: "dbz"
mongodb.task.id: "1"
database.whitelist: "inventory"
-
- ## PULSAR_SERVICE_URL_CONFIG
- pulsar.service.url: "pulsar://127.0.0.1:6650"
\ No newline at end of file
diff --git
a/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml
b/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml
index 7056bc1..f581700 100644
---
a/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml
+++
b/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml
@@ -35,8 +35,6 @@ configs:
database.server.name: "dbserver1"
database.whitelist: "inventory"
- ## PULSAR_SERVICE_URL_CONFIG
- pulsar.service.url: "pulsar://127.0.0.1:6650"
database.history.pulsar.topic: "mysql-history-topic"
offset.storage.topic: "mysql-offset-topic"
diff --git
a/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml
b/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml
index e24f2e1..151b409 100644
---
a/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml
+++
b/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml
@@ -35,7 +35,5 @@ configs:
database.server.name: "dbserver1"
schema.whitelist: "inventory"
- ## PULSAR_SERVICE_URL_CONFIG
- pulsar.service.url: "pulsar://127.0.0.1:6650"
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
index 783451a..987f6f6 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
@@ -115,7 +115,7 @@ public abstract class AbstractKafkaConnectSource<T>
implements Source<T> {
keyConverter.configure(config, true);
valueConverter.configure(config, false);
- offsetStore = new PulsarOffsetBackingStore();
+ offsetStore = new
PulsarOffsetBackingStore(sourceContext.getPulsarClient());
PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new
PulsarKafkaWorkerConfig(stringConfig);
offsetStore.configure(pulsarKafkaWorkerConfig);
offsetStore.start();
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index cdd6c04b..8b60f54 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -55,7 +55,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG;
-import static
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG;
@Slf4j
public class KafkaConnectSink implements Sink<GenericObject> {
@@ -156,7 +155,6 @@ public class KafkaConnectSink implements
Sink<GenericObject> {
configs.forEach(x -> {
x.put(OFFSET_STORAGE_TOPIC_CONFIG,
kafkaSinkConfig.getOffsetStorageTopic());
- x.put(PULSAR_SERVICE_URL_CONFIG,
kafkaSinkConfig.getPulsarServiceUrl());
});
task = (SinkTask) taskClass.getConstructor().newInstance();
taskContext =
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
index 3037bfe..7e83ff6 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
@@ -69,16 +69,6 @@ public class PulsarKafkaConnectSinkConfig implements
Serializable {
help = "Pulsar topic to store offsets at.")
private String offsetStorageTopic;
- /*
- This is used to configure PulsarOffsetBackingStore.
- It will become unnecessary after the PulsarClient is exposed to the
context.
- */
- @FieldDoc(
- required = true,
- defaultValue = "",
- help = "Pulsar service URL to use for the offset store.")
- private String pulsarServiceUrl;
-
@FieldDoc(
defaultValue = "true",
help = "In case of Record<KeyValue<>> data use key from KeyValue<>
instead of one from Record.")
@@ -93,4 +83,4 @@ public class PulsarKafkaConnectSinkConfig implements
Serializable {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(new ObjectMapper().writeValueAsString(map),
PulsarKafkaConnectSinkConfig.class);
}
-}
\ No newline at end of file
+}
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
index f06329f..b04ca16 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
@@ -67,7 +67,7 @@ public class PulsarKafkaSinkTaskContext implements
SinkTaskContext {
this.config = config;
this.ctx = ctx;
- offsetStore = new PulsarOffsetBackingStore();
+ offsetStore = new PulsarOffsetBackingStore(ctx.getPulsarClient());
PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new
PulsarKafkaWorkerConfig(config);
offsetStore.configure(pulsarKafkaWorkerConfig);
offsetStore.start();
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
index 624c59a..a6cc725 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
@@ -37,13 +37,6 @@ public class PulsarKafkaWorkerConfig extends WorkerConfig {
public static final String OFFSET_STORAGE_TOPIC_CONFIG =
"offset.storage.topic";
private static final String OFFSET_STORAGE_TOPIC_CONFIG_DOC = "pulsar
topic to store kafka connector offsets in";
-
- /**
- * <code>pulsar.service.url</code>
- */
- public static final String PULSAR_SERVICE_URL_CONFIG =
"pulsar.service.url";
- private static final String PULSAR_SERVICE_URL_CONFIG_DOC = "pulsar
service url";
-
/**
* <code>topic.namespace</code>
*/
@@ -56,10 +49,6 @@ public class PulsarKafkaWorkerConfig extends WorkerConfig {
Type.STRING,
Importance.HIGH,
OFFSET_STORAGE_TOPIC_CONFIG_DOC)
- .define(PULSAR_SERVICE_URL_CONFIG,
- Type.STRING,
- Importance.HIGH,
- PULSAR_SERVICE_URL_CONFIG_DOC)
.define(TOPIC_NAMESPACE_CONFIG,
Type.STRING,
"public/default",
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
index e616e84..a248c7a 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
@@ -57,13 +57,15 @@ public class PulsarOffsetBackingStore implements
OffsetBackingStore {
private Reader<byte[]> reader;
private volatile CompletableFuture<Void> outstandingReadToEnd = null;
+ public PulsarOffsetBackingStore(PulsarClient client) {
+ checkArgument(client != null, "Pulsar Client must be provided");
+ this.client = client;
+ }
+
@Override
public void configure(WorkerConfig workerConfig) {
this.topic =
workerConfig.getString(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG);
checkArgument(!isBlank(topic), "Offset storage topic must be
specified");
- this.serviceUrl =
workerConfig.getString(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG);
- checkArgument(!isBlank(serviceUrl), "Pulsar service url must be
specified at `"
- + WorkerConfig.BOOTSTRAP_SERVERS_CONFIG + "`");
this.data = new HashMap<>();
log.info("Configure offset backing store on pulsar topic {} at cluster
{}",
@@ -136,25 +138,23 @@ public class PulsarOffsetBackingStore implements
OffsetBackingStore {
@Override
public void start() {
try {
- client = PulsarClient.builder()
- .serviceUrl(serviceUrl)
- .build();
- log.info("Successfully created pulsar client to {}", serviceUrl);
producer = client.newProducer(Schema.BYTES)
.topic(topic)
.create();
log.info("Successfully created producer to produce updates to
topic {}", topic);
+
reader = client.newReader(Schema.BYTES)
.topic(topic)
.startMessageId(MessageId.earliest)
.create();
log.info("Successfully created reader to replay updates from topic
{}", topic);
+
CompletableFuture<Void> endFuture = new CompletableFuture<>();
readToEnd(endFuture);
endFuture.join();
} catch (PulsarClientException e) {
- log.error("Failed to create pulsar client to cluster at {}",
serviceUrl, e);
- throw new RuntimeException("Failed to create pulsar client to
cluster at " + serviceUrl, e);
+ log.error("Failed to setup pulsar producer/reader to cluster at
{}", serviceUrl, e);
+ throw new RuntimeException("Failed to setup pulsar producer/reader
to cluster at " + serviceUrl, e);
}
}
@@ -174,13 +174,6 @@ public class PulsarOffsetBackingStore implements
OffsetBackingStore {
log.warn("Failed to close reader", e);
}
}
- if (null != client) {
- try {
- client.close();
- } catch (IOException e) {
- log.warn("Failed to close client", e);
- }
- }
}
@Override
diff --git
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 6f4f954..2bce8f5 100644
---
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.GenericObject;
@@ -73,6 +74,8 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
private Path file;
private Map<String, Object> props;
+ private SinkContext context;
+ private PulsarClient client;
@BeforeMethod
@Override
@@ -84,13 +87,19 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
props = Maps.newHashMap();
props.put("topic", "test-topic");
- props.put("pulsarServiceUrl", brokerUrl.toString());
props.put("offsetStorageTopic", offsetTopicName);
props.put("kafkaConnectorSinkClass",
"org.apache.kafka.connect.file.FileStreamSinkConnector");
Map<String, String> kafkaConnectorProps = Maps.newHashMap();
kafkaConnectorProps.put("file", file.toString());
props.put("kafkaConnectorConfigProperties", kafkaConnectorProps);
+
+ this.context = mock(SinkContext.class);
+ this.client = PulsarClient.builder()
+ .serviceUrl(brokerUrl.toString())
+ .build();
+
when(context.getSubscriptionType()).thenReturn(SubscriptionType.Failover);
+ when(context.getPulsarClient()).thenReturn(client);
}
@AfterMethod(alwaysRun = true)
@@ -100,15 +109,17 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
Files.delete(file);
}
+ if (this.client != null) {
+ client.close();
+ }
+
super.internalCleanup();
}
@Test
public void smokeTest() throws Exception {
KafkaConnectSink sink = new KafkaConnectSink();
- SinkContext mockCtx = Mockito.mock(SinkContext.class);
-
when(mockCtx.getSubscriptionType()).thenReturn(SubscriptionType.Failover);
- sink.open(props, mockCtx);
+ sink.open(props, context);
final GenericRecord rec = getGenericRecord("value", Schema.STRING);
Message msg = mock(MessageImpl.class);
@@ -138,9 +149,7 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
@Test
public void seekPauseResumeTest() throws Exception {
KafkaConnectSink sink = new KafkaConnectSink();
- SinkContext mockCtx = Mockito.mock(SinkContext.class);
-
when(mockCtx.getSubscriptionType()).thenReturn(SubscriptionType.Failover);
- sink.open(props, mockCtx);
+ sink.open(props, context);
final GenericRecord rec = getGenericRecord("value", Schema.STRING);
Message msg = mock(MessageImpl.class);
@@ -167,13 +176,13 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
assertEquals(MessageIdUtils.getOffset(msgId),
sink.currentOffset(tp.topic(), tp.partition()));
sink.taskContext.offset(tp, 0);
- verify(mockCtx, times(1)).seek(Mockito.anyString(), Mockito.anyInt(),
any());
+ verify(context, times(1)).seek(Mockito.anyString(), Mockito.anyInt(),
any());
assertEquals(0, sink.currentOffset(tp.topic(), tp.partition()));
sink.taskContext.pause(tp);
- verify(mockCtx, times(1)).pause(tp.topic(), tp.partition());
+ verify(context, times(1)).pause(tp.topic(), tp.partition());
sink.taskContext.resume(tp);
- verify(mockCtx, times(1)).resume(tp.topic(), tp.partition());
+ verify(context, times(1)).resume(tp.topic(), tp.partition());
sink.close();
}
@@ -181,41 +190,39 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
@Test
public void subscriptionTypeTest() throws Exception {
- SinkContext mockCtx = Mockito.mock(SinkContext.class);
-
when(mockCtx.getSubscriptionType()).thenReturn(SubscriptionType.Exclusive);
try (KafkaConnectSink sink = new KafkaConnectSink()) {
- log.info("Exclusive is allowed");
- sink.open(props, mockCtx);
+ log.info("Failover is allowed");
+ sink.open(props, context);
}
-
when(mockCtx.getSubscriptionType()).thenReturn(SubscriptionType.Failover);
+
when(context.getSubscriptionType()).thenReturn(SubscriptionType.Exclusive);
try (KafkaConnectSink sink = new KafkaConnectSink()) {
- log.info("Failover is allowed");
- sink.open(props, mockCtx);
+ log.info("Exclusive is allowed");
+ sink.open(props, context);
}
-
when(mockCtx.getSubscriptionType()).thenReturn(SubscriptionType.Key_Shared);
+
when(context.getSubscriptionType()).thenReturn(SubscriptionType.Key_Shared);
try (KafkaConnectSink sink = new KafkaConnectSink()) {
log.info("Key_Shared is not allowed");
- sink.open(props, mockCtx);
+ sink.open(props, context);
fail("expected exception");
} catch (IllegalArgumentException iae) {
// pass
}
-
when(mockCtx.getSubscriptionType()).thenReturn(SubscriptionType.Shared);
+
when(context.getSubscriptionType()).thenReturn(SubscriptionType.Shared);
try (KafkaConnectSink sink = new KafkaConnectSink()) {
log.info("Shared is not allowed");
- sink.open(props, mockCtx);
+ sink.open(props, context);
fail("expected exception");
} catch (IllegalArgumentException iae) {
// pass
}
- when(mockCtx.getSubscriptionType()).thenReturn(null);
+ when(context.getSubscriptionType()).thenReturn(null);
try (KafkaConnectSink sink = new KafkaConnectSink()) {
log.info("Type is required");
- sink.open(props, mockCtx);
+ sink.open(props, context);
fail("expected exception");
} catch (IllegalArgumentException iae) {
// pass
@@ -232,9 +239,7 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
props.put("kafkaConnectorSinkClass",
SchemaedFileStreamSinkConnector.class.getCanonicalName());
KafkaConnectSink sink = new KafkaConnectSink();
- SinkContext mockCtx = Mockito.mock(SinkContext.class);
-
when(mockCtx.getSubscriptionType()).thenReturn(SubscriptionType.Failover);
- sink.open(props, mockCtx);
+ sink.open(props, context);
final GenericRecord rec = getGenericRecord(value, schema);
Message msg = mock(MessageImpl.class);
@@ -357,9 +362,7 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
props.put("kafkaConnectorSinkClass",
SchemaedFileStreamSinkConnector.class.getCanonicalName());
KafkaConnectSink sink = new KafkaConnectSink();
- SinkContext mockCtx = Mockito.mock(SinkContext.class);
-
when(mockCtx.getSubscriptionType()).thenReturn(SubscriptionType.Failover);
- sink.open(props, mockCtx);
+ sink.open(props, context);
final GenericRecord rec = getGenericRecord(obj, null);
Message msg = mock(MessageImpl.class);
@@ -411,9 +414,8 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
.build();
KafkaConnectSink sink = new KafkaConnectSink();
- SinkContext mockCtx = Mockito.mock(SinkContext.class);
-
when(mockCtx.getSubscriptionType()).thenReturn(SubscriptionType.Exclusive);
- sink.open(props, mockCtx);
+
when(context.getSubscriptionType()).thenReturn(SubscriptionType.Exclusive);
+ sink.open(props, context);
// offset is -1 before any data is written (aka no offset)
assertEquals(-1L, sink.currentOffset(topicName, partition));
@@ -433,7 +435,10 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
// close the producer, open again
sink = new KafkaConnectSink();
- sink.open(props, mockCtx);
+ when(context.getPulsarClient()).thenReturn(PulsarClient.builder()
+ .serviceUrl(brokerUrl.toString())
+ .build());
+ sink.open(props, context);
// offset is 1 after reopening the producer
assertEquals(1, sink.currentOffset(topicName, partition));
diff --git
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
index 98b2618..26ecf21 100644
---
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
+++
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.io.kafka.connect;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -31,8 +33,10 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.file.FileStreamSourceConnector;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SourceContext;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -49,6 +53,8 @@ public class KafkaConnectSourceTest extends
ProducerConsumerBase {
private String topicName;
private KafkaConnectSource kafkaConnectSource;
private File tempFile;
+ private SourceContext context;
+ private PulsarClient client;
@BeforeMethod
@Override
@@ -61,7 +67,6 @@ public class KafkaConnectSourceTest extends
ProducerConsumerBase {
config.put(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.storage.StringConverter");
this.offsetTopicName =
"persistent://my-property/my-ns/kafka-connect-source-offset";
- config.put(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG,
brokerUrl.toString());
config.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG,
offsetTopicName);
this.topicName = "persistent://my-property/my-ns/kafka-connect-source";
@@ -70,11 +75,19 @@ public class KafkaConnectSourceTest extends
ProducerConsumerBase {
config.put(FileStreamSourceConnector.FILE_CONFIG,
tempFile.getAbsoluteFile().toString());
config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG,
String.valueOf(FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE));
+ this.context = mock(SourceContext.class);
+ this.client = PulsarClient.builder()
+ .serviceUrl(brokerUrl.toString())
+ .build();
+ when(context.getPulsarClient()).thenReturn(this.client);
}
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
+ if (this.client != null) {
+ this.client.close();
+ }
tempFile.delete();
super.internalCleanup();
}
@@ -89,7 +102,7 @@ public class KafkaConnectSourceTest extends
ProducerConsumerBase {
@Test
public void testOpenAndRead() throws Exception {
kafkaConnectSource = new KafkaConnectSource();
- kafkaConnectSource.open(config, null);
+ kafkaConnectSource.open(config, context);
// use FileStreamSourceConnector, each line is a record, need "\n" and
end of each record.
OutputStream os = Files.newOutputStream(tempFile.toPath());
diff --git
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
index fc338c7..8d8b754 100644
---
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
+++
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
@@ -36,10 +36,13 @@ import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.util.Callback;
import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import static org.mockito.Mockito.mock;
+
/**
* Test the implementation of {@link PulsarOffsetBackingStore}.
*/
@@ -50,6 +53,7 @@ public class PulsarOffsetBackingStoreTest extends
ProducerConsumerBase {
private PulsarKafkaWorkerConfig distributedConfig;
private String topicName;
private PulsarOffsetBackingStore offsetBackingStore;
+ private PulsarClient client;
@BeforeMethod
@Override
@@ -58,10 +62,12 @@ public class PulsarOffsetBackingStoreTest extends
ProducerConsumerBase {
super.producerBaseSetup();
this.topicName = "persistent://my-property/my-ns/offset-topic";
-
this.defaultProps.put(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG,
brokerUrl.toString());
this.defaultProps.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG,
topicName);
this.distributedConfig = new
PulsarKafkaWorkerConfig(this.defaultProps);
- this.offsetBackingStore = new PulsarOffsetBackingStore();
+ this.client = PulsarClient.builder()
+ .serviceUrl(brokerUrl.toString())
+ .build();
+ this.offsetBackingStore = new PulsarOffsetBackingStore(client);
this.offsetBackingStore.configure(distributedConfig);
this.offsetBackingStore.start();
}
diff --git
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
index 5baf233..f5fe46b 100644
---
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
+++
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.io.kafka.sink;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.SinkContext;
@@ -167,6 +168,11 @@ public class KafkaAbstractSinkTest {
public CompletableFuture<Void> deleteStateAsync(String key) {
return null;
}
+
+ @Override
+ public PulsarClient getPulsarClient() {
+ return null;
+ }
};
ThrowingRunnable openAndClose = ()->{
try {