This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new ed771af [Cherry-pick] Pass client builder to debezium database
history (#12112)
ed771af is described below
commit ed771af858e0d4f781ef40fe7005f299fd8168c3
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue Jan 25 11:22:56 2022 +0800
[Cherry-pick] Pass client builder to debezium database history (#12112)
Cherry pick #11293
## Motivation
The Debezium requires pulsar a service URL for history database usage.
In #11056 , the service.url field from PulsarKafkaWorkerConfig is no longer
available. And the value is also deleted from multiple yaml config files in
this commit. This causes the integration test for Debezium connector to fail.
Based on the Debezium paradigm, all configurations should be passed as
strings. There's no easy way to inject a PulsarClient via configuration.
We need to ask user to provide the pulsar url explicitly and probably auth
info also.
## Modifications
Make the database.history.pulsar.service.url field required
Add the config value back to example yaml files
Update the integration test config
---
.../apache/pulsar/client/api/ClientBuilder.java | 3 +-
.../apache/pulsar/functions/api/BaseContext.java | 13 ++++
.../pulsar/functions/instance/ContextImpl.java | 11 +++-
.../pulsar/functions/instance/InstanceUtils.java | 30 +++++----
.../functions/instance/JavaInstanceRunnable.java | 13 ++--
.../pulsar/functions/instance/ContextImplTest.java | 24 +++++---
.../instance/JavaInstanceRunnableTest.java | 12 +++-
.../functions/runtime/thread/ThreadRuntime.java | 9 ++-
.../runtime/thread/ThreadRuntimeFactory.java | 5 +-
.../worker/FunctionRuntimeManagerTest.java | 27 ++++----
.../worker/rest/api/FunctionsImplTest.java | 7 ++-
.../apache/pulsar/io/debezium/DebeziumSource.java | 3 +
.../pulsar/io/debezium/PulsarDatabaseHistory.java | 44 ++++++++++----
.../org/apache/pulsar/io/debezium/SerDeUtils.java | 71 ++++++++++++++++++++++
.../io/debezium/PulsarDatabaseHistoryTest.java | 47 +++++++++-----
15 files changed, 242 insertions(+), 77 deletions(-)
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index aaa378f..d371bc3 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;
+import java.io.Serializable;
import java.time.Clock;
import java.util.Map;
import java.util.Set;
@@ -33,7 +34,7 @@ import
org.apache.pulsar.common.classification.InterfaceStability;
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
-public interface ClientBuilder extends Cloneable {
+public interface ClientBuilder extends Serializable, Cloneable {
/**
* Construct the final {@link PulsarClient} instance.
diff --git
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
index 5105df7..d2d0b3e 100644
---
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
+++
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.api;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.slf4j.Logger;
@@ -191,4 +192,16 @@ public interface BaseContext {
* @param value The value of the metric
*/
void recordMetric(String metricName, double value);
+
+ /**
+ * Get the pre-configured pulsar client builder.
+ *
+ * You can use this Builder to setup client to connect to the Pulsar
cluster.
+ * But you need to close client properly after using it.
+ *
+ * @return the instance of pulsar client builder.
+ */
+ default ClientBuilder getPulsarClientBuilder() {
+ throw new UnsupportedOperationException("not implemented");
+ }
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index f3438c4..b171747 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -40,6 +40,7 @@ import lombok.ToString;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.BatcherBuilder;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -89,6 +90,7 @@ class ContextImpl implements Context, SinkContext,
SourceContext, AutoCloseable
// Per Message related
private Record<?> record;
+ private final ClientBuilder clientBuilder;
private final PulsarClient client;
private final PulsarAdmin pulsarAdmin;
private Map<String, Producer<?>> publishProducers;
@@ -133,9 +135,11 @@ class ContextImpl implements Context, SinkContext,
SourceContext, AutoCloseable
public ContextImpl(InstanceConfig config, Logger logger, PulsarClient
client,
SecretsProvider secretsProvider,
FunctionCollectorRegistry collectorRegistry, String[] metricsLabels,
Function.FunctionDetails.ComponentType componentType,
ComponentStatsManager statsManager,
- StateManager stateManager, PulsarAdmin pulsarAdmin) {
+ StateManager stateManager, PulsarAdmin pulsarAdmin,
ClientBuilder
+ clientBuilder) throws PulsarClientException {
this.config = config;
this.logger = logger;
+ this.clientBuilder = clientBuilder;
this.client = client;
this.pulsarAdmin = pulsarAdmin;
this.topicSchema = new TopicSchema(client);
@@ -496,6 +500,11 @@ class ContextImpl implements Context, SinkContext,
SourceContext, AutoCloseable
}
}
+ @Override
+ public ClientBuilder getPulsarClientBuilder() {
+ return clientBuilder;
+ }
+
private <O> Producer<O> getProducer(String topicName, Schema<O> schema)
throws PulsarClientException {
Producer<O> producer;
if (tlPublishProducers != null) {
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index 5501a31..b72e6c3 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -156,15 +156,9 @@ public class InstanceUtils {
return properties;
}
-
- public static PulsarClient createPulsarClient(String pulsarServiceUrl,
AuthenticationConfig authConfig)
- throws PulsarClientException {
- return createPulsarClient(pulsarServiceUrl, authConfig,
Optional.empty());
- }
-
- public static PulsarClient createPulsarClient(String pulsarServiceUrl,
- AuthenticationConfig
authConfig,
- Optional<Long> memoryLimit)
throws PulsarClientException {
+ public static ClientBuilder createPulsarClientBuilder(String
pulsarServiceUrl,
+ AuthenticationConfig
authConfig,
+ Optional<Long>
memoryLimit) throws PulsarClientException {
ClientBuilder clientBuilder = null;
if (isNotBlank(pulsarServiceUrl)) {
clientBuilder =
PulsarClient.builder().serviceUrl(pulsarServiceUrl);
@@ -183,10 +177,22 @@ public class InstanceUtils {
clientBuilder.memoryLimit(memoryLimit.get(), SizeUnit.BYTES);
}
clientBuilder.ioThreads(Runtime.getRuntime().availableProcessors());
- return clientBuilder.build();
+ return clientBuilder;
}
- log.warn("pulsarServiceUrl cannot be null");
- return null;
+ throw new PulsarClientException("pulsarServiceUrl cannot be null");
+ }
+
+
+
+ public static PulsarClient createPulsarClient(String pulsarServiceUrl,
AuthenticationConfig authConfig)
+ throws PulsarClientException {
+ return createPulsarClient(pulsarServiceUrl, authConfig,
Optional.empty());
+ }
+
+ public static PulsarClient createPulsarClient(String pulsarServiceUrl,
+ AuthenticationConfig
authConfig,
+ Optional<Long> memoryLimit)
throws PulsarClientException {
+ return createPulsarClientBuilder(pulsarServiceUrl, authConfig,
memoryLimit).build();
}
public static PulsarAdmin createPulsarAdminClient(String
pulsarWebServiceUrl, AuthenticationConfig authConfig)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index f6ca84d..342806e 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -40,7 +40,9 @@ import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.functions.ConsumerConfig;
@@ -91,7 +93,8 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
private final InstanceConfig instanceConfig;
// input topic consumer & output topic producer
- private final PulsarClientImpl client;
+ private final ClientBuilder clientBuilder;
+ private PulsarClientImpl client;
private final PulsarAdmin pulsarAdmin;
private LogAppender logAppender;
@@ -135,13 +138,15 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
private ReadWriteLock statsLock = new ReentrantReadWriteLock();
public JavaInstanceRunnable(InstanceConfig instanceConfig,
+ ClientBuilder clientBuilder,
PulsarClient pulsarClient,
PulsarAdmin pulsarAdmin,
String stateStorageServiceUrl,
SecretsProvider secretsProvider,
FunctionCollectorRegistry collectorRegistry,
- ClassLoader functionClassLoader) {
+ ClassLoader functionClassLoader) throws
PulsarClientException {
this.instanceConfig = instanceConfig;
+ this.clientBuilder = clientBuilder;
this.client = (PulsarClientImpl) pulsarClient;
this.pulsarAdmin = pulsarAdmin;
this.stateStorageServiceUrl = stateStorageServiceUrl;
@@ -227,12 +232,12 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
isInitialized = true;
}
- ContextImpl setupContext() {
+ ContextImpl setupContext() throws PulsarClientException {
Logger instanceLog = LoggerFactory.getILoggerFactory().getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
return new ContextImpl(instanceConfig, instanceLog, client,
secretsProvider,
collectorRegistry, metricsLabels, this.componentType,
this.stats, stateManager,
- pulsarAdmin);
+ pulsarAdmin, clientBuilder);
}
/**
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 5bb61fd..c959d01 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.functions.instance;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
@@ -74,13 +75,14 @@ public class ContextImplTest {
private InstanceConfig config;
private Logger logger;
+ private ClientBuilder clientBuilder;
private PulsarClientImpl client;
private PulsarAdmin pulsarAdmin;
private ContextImpl context;
private Producer producer = mock(Producer.class);
@BeforeMethod
- public void setup() {
+ public void setup() throws PulsarClientException {
config = new InstanceConfig();
config.setExposePulsarAdminClientEnabled(true);
FunctionDetails functionDetails = FunctionDetails.newBuilder()
@@ -90,12 +92,14 @@ public class ContextImplTest {
logger = mock(Logger.class);
client = mock(PulsarClientImpl.class);
pulsarAdmin = mock(PulsarAdmin.class);
+ client = mock(PulsarClientImpl.class);
when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client,
Schema.BYTES));
when(client.createProducerAsync(any(ProducerConfigurationData.class),
any(), any()))
.thenReturn(CompletableFuture.completedFuture(producer));
when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
when(producer.sendAsync(anyString())).thenReturn(CompletableFuture.completedFuture(null));
-
+ clientBuilder = mock(ClientBuilder.class);
+ when(clientBuilder.build()).thenReturn(client);
TypedMessageBuilder messageBuilder = spy(new
TypedMessageBuilderImpl(mock(ProducerBase.class), Schema.STRING));
doReturn(new CompletableFuture<>()).when(messageBuilder).sendAsync();
when(producer.newMessage()).thenReturn(messageBuilder);
@@ -105,7 +109,7 @@ public class ContextImplTest {
client,
new EnvironmentBasedSecretsProvider(),
FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new
InstanceStateManager(),
- pulsarAdmin);
+ pulsarAdmin, clientBuilder);
context.setCurrentMessageContext((Record<String>) () -> null);
}
@@ -190,7 +194,7 @@ public class ContextImplTest {
}
@Test(expectedExceptions = IllegalStateException.class)
- public void testGetPulsarAdminWithExposePulsarAdminDisabled() {
+ public void testGetPulsarAdminWithExposePulsarAdminDisabled() throws
PulsarClientException {
config.setExposePulsarAdminClientEnabled(false);
context = new ContextImpl(
config,
@@ -198,12 +202,12 @@ public class ContextImplTest {
client,
new EnvironmentBasedSecretsProvider(),
FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new
InstanceStateManager(),
- pulsarAdmin);
+ pulsarAdmin, clientBuilder);
context.getPulsarAdmin();
}
@Test
- public void testUnsupportedExtendedSinkContext(){
+ public void testUnsupportedExtendedSinkContext() throws
PulsarClientException {
config.setExposePulsarAdminClientEnabled(false);
context = new ContextImpl(
config,
@@ -211,7 +215,7 @@ public class ContextImplTest {
client,
new EnvironmentBasedSecretsProvider(),
FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new
InstanceStateManager(),
- pulsarAdmin);
+ pulsarAdmin, clientBuilder);
try {
context.seek("z", 0, Mockito.mock(MessageId.class));
Assert.fail("Expected exception");
@@ -241,7 +245,7 @@ public class ContextImplTest {
client,
new EnvironmentBasedSecretsProvider(),
FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new
InstanceStateManager(),
- pulsarAdmin);
+ pulsarAdmin, clientBuilder);
Consumer<?> mockConsumer = Mockito.mock(Consumer.class);
when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString());
context.setInputConsumers(Lists.newArrayList(mockConsumer));
@@ -272,7 +276,7 @@ public class ContextImplTest {
client,
new EnvironmentBasedSecretsProvider(),
FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new
InstanceStateManager(),
- pulsarAdmin);
+ pulsarAdmin, clientBuilder);
Consumer<?> mockConsumer = Mockito.mock(Consumer.class);
when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString());
context.setInputConsumers(Lists.newArrayList(mockConsumer));
@@ -295,7 +299,7 @@ public class ContextImplTest {
client,
new EnvironmentBasedSecretsProvider(),
FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null, new
InstanceStateManager(),
- pulsarAdmin);
+ pulsarAdmin, clientBuilder);
ConsumerImpl<?> consumer1 = Mockito.mock(ConsumerImpl.class);
when(consumer1.getTopic()).thenReturn(TopicName.get("first").toString());
ConsumerImpl<?> consumer2 = Mockito.mock(ConsumerImpl.class);
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 220802f..4d143c0 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.SerDe;
@@ -37,6 +38,9 @@ import org.testng.annotations.Test;
import java.lang.reflect.Method;
import java.util.Map;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
public class JavaInstanceRunnableTest {
static class IntegerSerDe implements SerDe<Integer> {
@@ -64,8 +68,10 @@ public class JavaInstanceRunnableTest {
private JavaInstanceRunnable createRunnable(String outputSerde) throws
Exception {
InstanceConfig config = createInstanceConfig(outputSerde);
+ ClientBuilder clientBuilder = mock(ClientBuilder.class);
+ when(clientBuilder.build()).thenReturn(null);
JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
- config, null, null, null, null, null, null);
+ config, clientBuilder,null, null, null, null, null, null);
return javaInstanceRunnable;
}
@@ -126,7 +132,7 @@ public class JavaInstanceRunnableTest {
@Test
public void testSinkConfigParsingPreservesOriginalType() throws Exception {
- SinkSpecOrBuilder sinkSpec = Mockito.mock(SinkSpecOrBuilder.class);
+ SinkSpecOrBuilder sinkSpec = mock(SinkSpecOrBuilder.class);
Mockito.when(sinkSpec.getConfigs()).thenReturn("{\"ttl\":
9223372036854775807}");
Map<String, Object> parsedConfig =
new ObjectMapper().readValue(sinkSpec.getConfigs(), new
TypeReference<Map<String, Object>>() {});
@@ -136,7 +142,7 @@ public class JavaInstanceRunnableTest {
@Test
public void testSourceConfigParsingPreservesOriginalType() throws
Exception {
- SourceSpecOrBuilder sourceSpec =
Mockito.mock(SourceSpecOrBuilder.class);
+ SourceSpecOrBuilder sourceSpec = mock(SourceSpecOrBuilder.class);
Mockito.when(sourceSpec.getConfigs()).thenReturn("{\"ttl\":
9223372036854775807}");
Map<String, Object> parsedConfig =
new ObjectMapper().readValue(sourceSpec.getConfigs(), new
TypeReference<Map<String, Object>>() {});
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index 474410c..279298f 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -31,6 +31,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
@@ -62,6 +63,7 @@ public class ThreadRuntime implements Runtime {
private ThreadGroup threadGroup;
private FunctionCacheManager fnCache;
private String jarFile;
+ private ClientBuilder clientBuilder;
private PulsarClient pulsarClient;
private PulsarAdmin pulsarAdmin;
private String stateStorageServiceUrl;
@@ -74,7 +76,8 @@ public class ThreadRuntime implements Runtime {
FunctionCacheManager fnCache,
ThreadGroup threadGroup,
String jarFile,
- PulsarClient pulsarClient,
+ PulsarClient client,
+ ClientBuilder clientBuilder,
PulsarAdmin pulsarAdmin,
String stateStorageServiceUrl,
SecretsProvider secretsProvider,
@@ -89,7 +92,8 @@ public class ThreadRuntime implements Runtime {
this.threadGroup = threadGroup;
this.fnCache = fnCache;
this.jarFile = jarFile;
- this.pulsarClient = pulsarClient;
+ this.clientBuilder = clientBuilder;
+ this.pulsarClient = client;
this.pulsarAdmin = pulsarAdmin;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.secretsProvider = secretsProvider;
@@ -167,6 +171,7 @@ public class ThreadRuntime implements Runtime {
// re-initialize JavaInstanceRunnable so that variables in constructor
can be re-initialized
this.javaInstanceRunnable = new JavaInstanceRunnable(
instanceConfig,
+ clientBuilder,
pulsarClient,
pulsarAdmin,
stateStorageServiceUrl,
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index c020bf9..864a067 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -60,6 +60,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
@Getter
private ThreadGroup threadGroup;
private FunctionCacheManager fnCache;
+ private ClientBuilder clientBuilder;
private PulsarClient pulsarClient;
private PulsarAdmin pulsarAdmin;
private String storageServiceUrl;
@@ -101,7 +102,8 @@ public class ThreadRuntimeFactory implements RuntimeFactory
{
this.fnCache = new FunctionCacheManagerImpl(rootClassLoader);
this.threadGroup = new ThreadGroup(threadGroupName);
this.pulsarAdmin = exposePulsarAdminClientEnabled ?
InstanceUtils.createPulsarAdminClient(pulsarWebServiceUrl, authConfig) : null;
- this.pulsarClient = InstanceUtils.createPulsarClient(pulsarServiceUrl,
authConfig, calculateClientMemoryLimit(memoryLimit));
+ this.clientBuilder =
InstanceUtils.createPulsarClientBuilder(pulsarServiceUrl, authConfig,
calculateClientMemoryLimit(memoryLimit));
+ this.pulsarClient = this.clientBuilder.build();
this.storageServiceUrl = storageServiceUrl;
this.collectorRegistry = collectorRegistry;
this.narExtractionDirectory = narExtractionDirectory;
@@ -175,6 +177,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory
{
threadGroup,
jarFile,
pulsarClient,
+ clientBuilder,
pulsarAdmin,
storageServiceUrl,
secretsProvider,
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index afc75f5..93e0e37 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -93,6 +93,7 @@ import java.util.concurrent.CompletableFuture;
"org.apache.pulsar.functions.runtime.thread"
})
public class FunctionRuntimeManagerTest {
+ private final String PULSAR_SERVICE_URL = "pulsar://localhost:6650";
@ObjectFactory
public IObjectFactory getObjectFactory() {
@@ -108,7 +109,7 @@ public class FunctionRuntimeManagerTest {
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal().convertValue(
new
ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
- workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
@@ -199,7 +200,7 @@ public class FunctionRuntimeManagerTest {
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal().convertValue(
new
ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
- workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
PulsarClient pulsarClient = mock(PulsarClient.class);
@@ -291,7 +292,7 @@ public class FunctionRuntimeManagerTest {
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal().convertValue(
new
ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
- workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
PulsarClient pulsarClient = mock(PulsarClient.class);
@@ -427,7 +428,7 @@ public class FunctionRuntimeManagerTest {
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal().convertValue(
new
ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
- workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
PulsarClient pulsarClient = mock(PulsarClient.class);
@@ -538,7 +539,7 @@ public class FunctionRuntimeManagerTest {
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal().convertValue(
new
ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
- workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
@@ -684,7 +685,7 @@ public class FunctionRuntimeManagerTest {
ObjectMapperFactory.getThreadLocal()
.convertValue(new KubernetesRuntimeFactoryConfig()
.setSubmittingInsidePod(false), Map.class));
- workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setPulsarFunctionsCluster("cluster");
@@ -826,7 +827,7 @@ public class FunctionRuntimeManagerTest {
try {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
- workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
new FunctionRuntimeManager(
@@ -852,7 +853,7 @@ public class FunctionRuntimeManagerTest {
workerConfig.setFunctionRuntimeFactoryClassName("foo");
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal().convertValue(new
KubernetesRuntimeFactoryConfig(), Map.class));
- workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
new FunctionRuntimeManager(
@@ -878,7 +879,7 @@ public class FunctionRuntimeManagerTest {
workerConfig.setFunctionRuntimeFactoryClassName(FunctionRuntimeManagerTest.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal().convertValue(new
KubernetesRuntimeFactoryConfig(), Map.class));
- workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
new FunctionRuntimeManager(
@@ -904,7 +905,7 @@ public class FunctionRuntimeManagerTest {
workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal().convertValue(new
KubernetesRuntimeFactoryConfig(), Map.class));
- workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
@@ -1013,7 +1014,7 @@ public class FunctionRuntimeManagerTest {
threadContainerFactory.setThreadGroupName("threadGroupName");
workerConfig = new WorkerConfig();
workerConfig.setThreadContainerFactory(threadContainerFactory);
-
+ workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
functionRuntimeManager = new FunctionRuntimeManager(
workerConfig,
mock(PulsarWorkerService.class),
@@ -1039,7 +1040,7 @@ public class FunctionRuntimeManagerTest {
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal().convertValue(
new
ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
- workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
@@ -1104,7 +1105,7 @@ public class FunctionRuntimeManagerTest {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
- workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
WorkerConfig.KubernetesContainerFactory kubernetesContainerFactory
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index ba8a13a..3f6d4e7 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.policies.data.FunctionInstanceStatsImpl;
import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
@@ -166,7 +167,7 @@ public class FunctionsImplTest {
instanceConfig.setMaxBufferedTuples(1024);
JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
- instanceConfig, null, null, null, null, null, null);
+ instanceConfig, null, null, null, null, null, null, null);
CompletableFuture<InstanceCommunication.MetricsData>
metricsDataCompletableFuture = new
CompletableFuture<InstanceCommunication.MetricsData>();
metricsDataCompletableFuture.complete(javaInstanceRunnable.getMetrics());
Runtime runtime = mock(Runtime.class);
@@ -214,14 +215,14 @@ public class FunctionsImplTest {
}
@Test
- public void testMetricsEmpty() {
+ public void testMetricsEmpty() throws PulsarClientException {
Function.FunctionDetails.Builder functionDetailsBuilder =
createDefaultFunctionDetails().toBuilder();
InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionDetails(functionDetailsBuilder.build());
instanceConfig.setMaxBufferedTuples(1024);
JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
- instanceConfig, null, null, null, null, null, null);
+ instanceConfig, null, null, null, null, null, null, null);
CompletableFuture<InstanceCommunication.MetricsData> completableFuture
= new CompletableFuture<InstanceCommunication.MetricsData>();
completableFuture.complete(javaInstanceRunnable.getMetrics());
Runtime runtime = mock(Runtime.class);
diff --git
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
index 3da0d98..0f8d028 100644
---
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
+++
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.io.debezium;
import java.util.Map;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
+import io.debezium.relational.history.DatabaseHistory;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.io.core.SourceContext;
@@ -97,6 +98,8 @@ public abstract class DebeziumSource extends
KafkaConnectSource {
setConfigIfNull(config,
PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG,
topicNamespace + "/" + sourceName + "-" + DEFAULT_OFFSET_TOPIC);
+ config.put(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING +
"pulsar.client.builder",
+ SerDeUtils.serialize(sourceContext.getPulsarClientBuilder()));
super.open(config, sourceContext);
}
diff --git
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
index c3d95a7..d8b37c1 100644
---
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
+++
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
@@ -36,6 +36,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
@@ -61,22 +62,32 @@ public final class PulsarDatabaseHistory extends
AbstractDatabaseHistory {
.withValidation(Field::isRequired);
public static final Field SERVICE_URL =
Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.service.url")
- .withDisplayName("Pulsar broker addresses")
+ .withDisplayName("Pulsar service url")
.withType(Type.STRING)
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withDescription("Pulsar service url")
- .withValidation(Field::isRequired);
+ .withValidation(Field::isOptional);
+
+ public static final Field CLIENT_BUILDER =
Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.client.builder")
+ .withDisplayName("Pulsar client builder")
+ .withType(Type.STRING)
+ .withWidth(Width.LONG)
+ .withImportance(Importance.HIGH)
+ .withDescription("Pulsar client builder")
+ .withValidation(Field::isOptional);
public static Field.Set ALL_FIELDS = Field.setOf(
TOPIC,
SERVICE_URL,
+ CLIENT_BUILDER,
DatabaseHistory.NAME);
private final DocumentReader reader = DocumentReader.defaultReader();
private String topicName;
private String serviceUrl;
private String dbHistoryName;
+ private ClientBuilder clientBuilder;
private volatile PulsarClient pulsarClient;
private volatile Producer<String> producer;
@@ -93,12 +104,22 @@ public final class PulsarDatabaseHistory extends
AbstractDatabaseHistory {
+ getClass().getSimpleName() + "; check the logs for details");
}
this.topicName = config.getString(TOPIC);
- this.serviceUrl = config.getString(SERVICE_URL);
+ if (config.getString(CLIENT_BUILDER) == null &&
config.getString(SERVICE_URL) == null) {
+ throw new IllegalArgumentException("Neither Pulsar Service URL nor
ClientBuilder provided.");
+ }
+ String clientBuilderBase64Encoded = config.getString(CLIENT_BUILDER);
+ this.clientBuilder = PulsarClient.builder();
+ if (null != clientBuilderBase64Encoded) {
+ // deserialize the client builder to the same classloader
+ this.clientBuilder = (ClientBuilder)
SerDeUtils.deserialize(clientBuilderBase64Encoded,
this.clientBuilder.getClass().getClassLoader());
+ } else {
+ this.clientBuilder.serviceUrl(config.getString(SERVICE_URL));
+ }
// Copy the relevant portions of the configuration and add useful
defaults ...
this.dbHistoryName = config.getString(DatabaseHistory.NAME,
UUID.randomUUID().toString());
- log.info("Configure to store the debezium database history {} to
pulsar topic {} at {}",
- dbHistoryName, topicName, serviceUrl);
+ log.info("Configure to store the debezium database history {} to
pulsar topic {}",
+ dbHistoryName, topicName);
}
@Override
@@ -117,12 +138,9 @@ public final class PulsarDatabaseHistory extends
AbstractDatabaseHistory {
void setupClientIfNeeded() {
if (null == this.pulsarClient) {
try {
- pulsarClient = PulsarClient.builder()
- .serviceUrl(serviceUrl)
- .build();
+ pulsarClient = clientBuilder.build();
} catch (PulsarClientException e) {
- throw new RuntimeException("Failed to create pulsar client to
pulsar cluster at "
- + serviceUrl, e);
+ throw new RuntimeException("Failed to create pulsar client to
pulsar cluster", e);
}
}
}
@@ -137,9 +155,9 @@ public final class PulsarDatabaseHistory extends
AbstractDatabaseHistory {
.blockIfQueueFull(true)
.create();
} catch (PulsarClientException e) {
- log.error("Failed to create pulsar producer to topic '{}' at
cluster '{}'", topicName, serviceUrl);
+ log.error("Failed to create pulsar producer to topic '{}'",
topicName);
throw new RuntimeException("Failed to create pulsar producer
to topic '"
- + topicName + "' at cluster '" + serviceUrl + "'", e);
+ + topicName, e);
}
}
}
@@ -253,7 +271,7 @@ public final class PulsarDatabaseHistory extends
AbstractDatabaseHistory {
@Override
public String toString() {
if (topicName != null) {
- return "Pulsar topic (" + topicName + ") at " + serviceUrl;
+ return "Pulsar topic (" + topicName + ")";
}
return "Pulsar topic";
}
diff --git
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/SerDeUtils.java
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/SerDeUtils.java
new file mode 100644
index 0000000..0297125
--- /dev/null
+++
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/SerDeUtils.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.debezium;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+import java.util.Base64;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class SerDeUtils {
+ public static Object deserialize(String objectBase64Encoded, ClassLoader
classLoader) {
+ byte[] data = Base64.getDecoder().decode(objectBase64Encoded);
+ try (InputStream bai = new ByteArrayInputStream(data);
+ PulsarClientBuilderInputStream ois = new
PulsarClientBuilderInputStream(bai, classLoader)) {
+ return ois.readObject();
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to initialize the pulsar client to store debezium
database history", e);
+ }
+ }
+
+ public static String serialize(Object obj) throws Exception {
+ try (ByteArrayOutputStream bao = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bao)) {
+ oos.writeObject(obj);
+ oos.flush();
+ byte[] data = bao.toByteArray();
+ return Base64.getEncoder().encodeToString(data);
+ }
+ }
+
+ static class PulsarClientBuilderInputStream extends ObjectInputStream {
+ private final ClassLoader classLoader;
+ public PulsarClientBuilderInputStream(InputStream in, ClassLoader ldr)
throws IOException {
+ super(in);
+ this.classLoader = ldr;
+ }
+
+ protected Class resolveClass(ObjectStreamClass desc) throws
IOException, ClassNotFoundException {
+ try {
+ return Class.forName(desc.getName(), true, classLoader);
+ } catch (Exception ex) {
+ log.warn("PulsarClientBuilderInputStream resolveClass failed
{} {}", desc.getName(), ex);
+ }
+ return super.resolveClass(desc);
+ }
+ }
+}
diff --git
a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
index 6a21812..457320a 100644
---
a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
+++
b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
@@ -30,9 +30,16 @@ import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.text.ParsingException;
import io.debezium.util.Collect;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.util.Base64;
import java.util.Map;
+
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -67,15 +74,27 @@ public class PulsarDatabaseHistoryTest extends
ProducerConsumerBase {
super.internalCleanup();
}
- private void testHistoryTopicContent(boolean skipUnparseableDDL) {
+ private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean
testWithClientBuilder) throws Exception {
+ Configuration.Builder configBuidler = Configuration.create()
+ .with(PulsarDatabaseHistory.TOPIC, topicName)
+ .with(DatabaseHistory.NAME, "my-db-history")
+ .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS,
skipUnparseableDDL);
+
+ if (testWithClientBuilder) {
+ ClientBuilder builder =
PulsarClient.builder().serviceUrl(brokerUrl.toString());
+ ByteArrayOutputStream bao = new ByteArrayOutputStream();
+ try (ObjectOutputStream oos = new ObjectOutputStream(bao)) {
+ oos.writeObject(builder);
+ oos.flush();
+ byte[] data = bao.toByteArray();
+ configBuidler.with(PulsarDatabaseHistory.CLIENT_BUILDER,
Base64.getEncoder().encodeToString(data));
+ }
+ } else {
+ configBuidler.with(PulsarDatabaseHistory.SERVICE_URL,
brokerUrl.toString());
+ }
+
// Start up the history ...
- Configuration config = Configuration.create()
- .with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString())
- .with(PulsarDatabaseHistory.TOPIC, topicName)
- .with(DatabaseHistory.NAME, "my-db-history")
- .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS,
skipUnparseableDDL)
- .build();
- history.configure(config, null, DatabaseHistoryListener.NOOP, true);
+ history.configure(configBuidler.build(), null,
DatabaseHistoryListener.NOOP, true);
history.start();
// Should be able to call start more than once ...
@@ -134,7 +153,7 @@ public class PulsarDatabaseHistoryTest extends
ProducerConsumerBase {
// Stop the history (which should stop the producer) ...
history.stop();
history = new PulsarDatabaseHistory();
- history.configure(config, null, DatabaseHistoryListener.NOOP, true);
+ history.configure(configBuidler.build(), null,
DatabaseHistoryListener.NOOP, true);
// no need to start
// Recover from the very beginning to just past the first change ...
@@ -170,7 +189,7 @@ public class PulsarDatabaseHistoryTest extends
ProducerConsumerBase {
@Test
public void shouldStartWithEmptyTopicAndStoreDataAndRecoverAllState()
throws Exception {
// Create the empty topic ...
- testHistoryTopicContent(false);
+ testHistoryTopicContent(false, true);
}
@Test
@@ -187,7 +206,7 @@ public class PulsarDatabaseHistoryTest extends
ProducerConsumerBase {
producer.send("{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"xxxDROP
TABLE foo;\"}");
}
- testHistoryTopicContent(true);
+ testHistoryTopicContent(true, true);
}
@Test(expectedExceptions = ParsingException.class)
@@ -196,14 +215,14 @@ public class PulsarDatabaseHistoryTest extends
ProducerConsumerBase {
producer.send("{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"xxxDROP
TABLE foo;\"}");
}
- testHistoryTopicContent(false);
+ testHistoryTopicContent(false, false);
}
@Test
- public void testExists() {
+ public void testExists() throws Exception {
// happy path
- testHistoryTopicContent(true);
+ testHistoryTopicContent(true, false);
assertTrue(history.exists());
// Set history to use dummy topic