This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6f38c5aa5b9 [pulsar-broker] PIP-100 Support pluggable topic factory
(#12235)
6f38c5aa5b9 is described below
commit 6f38c5aa5b9838c3a2a5fea10e13dd3830947d9f
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Wed Jul 20 13:01:44 2022 -0700
[pulsar-broker] PIP-100 Support pluggable topic factory (#12235)
add Closeable
fix test
close factory and handle exception
---
conf/broker.conf | 3 +
.../apache/pulsar/broker/ServiceConfiguration.java | 6 ++
.../pulsar/broker/service/BrokerService.java | 43 ++++++++++++-
.../apache/pulsar/broker/service/TopicFactory.java | 31 ++++++++++
.../broker/service/PersistentTopicE2ETest.java | 71 +++++++++++++++++++---
.../common/naming/ServiceConfigurationTest.java | 1 -
site2/docs/reference-configuration.md | 2 +
7 files changed, 146 insertions(+), 11 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index d2e13d117e8..c19afe981ca 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -136,6 +136,9 @@ brokerShutdownTimeoutMs=60000
# Flag to skip broker shutdown when broker handles Out of memory error
skipBrokerShutdownOnOOM=false
+# Factory class-name to create topic with custom workflow
+topicFactoryClassName=
+
# Enable backlog quota check. Enforces action on topic when the quota is
reached
backlogQuotaCheckEnabled=true
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index d2e68fb437c..ad4188d2882 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -479,6 +479,12 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private String metadataStoreConfigPath = null;
+ @FieldContext(
+ dynamic = true,
+ doc = "Factory class-name to create topic with custom workflow"
+ )
+ private String topicFactoryClassName;
+
@FieldContext(
category = CATEGORY_POLICIES,
doc = "Enable backlog quota check. Enforces actions on topic when the
quota is reached"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e80d603a141..8e701ebf5bf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -276,6 +276,7 @@ public class BrokerService implements Closeable {
private final LongAdder pausedConnections = new LongAdder();
private BrokerInterceptor interceptor;
private ImmutableMap<String, EntryFilterWithClassLoader> entryFilters;
+ private TopicFactory topicFactory;
private Set<BrokerEntryMetadataInterceptor>
brokerEntryMetadataInterceptors;
private Set<ManagedLedgerPayloadProcessor> brokerEntryPayloadProcessors;
@@ -346,6 +347,7 @@ public class BrokerService implements Closeable {
this.authenticationService = new
AuthenticationService(pulsar.getConfiguration());
this.blockedDispatchers =
ConcurrentOpenHashSet.<PersistentDispatcherMultipleConsumers>newBuilder().build();
+ this.topicFactory = createPersistentTopicFactory();
// update dynamic configuration and register-listener
updateConfigurationAndRegisterListeners();
this.lookupRequestSemaphore = new AtomicReference<Semaphore>(
@@ -1127,7 +1129,13 @@ public class BrokerService implements Closeable {
new NotAllowedException("Broker is not unable to load
non-persistent topic"));
}
final long topicCreateTimeMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
- NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic,
this);
+ NonPersistentTopic nonPersistentTopic;
+ try {
+ nonPersistentTopic = newTopic(topic, null, this,
NonPersistentTopic.class);
+ } catch (Exception e) {
+ log.warn("Failed to create topic {}", topic, e);
+ return FutureUtil.failedFuture(e);
+ }
CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
isOwner.thenRun(() -> {
nonPersistentTopic.initialize()
@@ -1412,7 +1420,7 @@ public class BrokerService implements Closeable {
try {
PersistentTopic persistentTopic =
isSystemTopic(topic)
? new SystemTopic(topic, ledger,
BrokerService.this)
- : new PersistentTopic(topic, ledger,
BrokerService.this);
+ : newTopic(topic, ledger,
BrokerService.this, PersistentTopic.class);
persistentTopic
.initialize()
.thenCompose(__ ->
persistentTopic.preCreateSubscriptionForCompactionIfNeeded())
@@ -3061,6 +3069,37 @@ public class BrokerService implements Closeable {
return pausedConnections.longValue();
}
+ @SuppressWarnings("unchecked")
+ private <T extends Topic> T newTopic(String topic, ManagedLedger ledger,
BrokerService brokerService,
+ Class<T> topicClazz) throws PulsarServerException {
+ if (topicFactory != null) {
+ try {
+ Topic newTopic = topicFactory.create(topic, ledger,
brokerService, topicClazz);
+ if (newTopic != null) {
+ return (T) newTopic;
+ }
+ } catch (Throwable e) {
+ log.warn("Failed to create persistent topic using factory {}",
topic, e);
+ throw new PulsarServerException("Topic factory failed to
create topic ", e);
+ }
+ }
+ return topicClazz == NonPersistentTopic.class ? (T) new
NonPersistentTopic(topic, BrokerService.this)
+ : (T) new PersistentTopic(topic, ledger, brokerService);
+ }
+
+ private TopicFactory createPersistentTopicFactory() throws Exception {
+ String topicFactoryClassName =
pulsar.getConfig().getTopicFactoryClassName();
+ if (StringUtils.isNotBlank(topicFactoryClassName)) {
+ try {
+ return (TopicFactory)
Class.forName(topicFactoryClassName).newInstance();
+ } catch (Exception e) {
+ log.warn("Failed to initialize topic factory class {}",
topicFactoryClassName, e);
+ throw e;
+ }
+ }
+ return null;
+ }
+
@VisibleForTesting
public void
setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) {
this.pulsarChannelInitFactory = factory;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java
new file mode 100644
index 00000000000..d407a9cc613
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.io.Closeable;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+
+/**
+ * Pluggable TopicFactory to create topic with specific behavior in broker.
+ * Note: This API and feature is in experimental phase.
+ */
+public interface TopicFactory extends Closeable {
+
+ <T extends Topic> T create(String topic, ManagedLedger ledger,
BrokerService brokerService, Class<T> topicClazz);
+}
\ No newline at end of file
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index c3f6a634a11..f8ee8a3148d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -25,9 +25,8 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-import com.google.common.collect.Sets;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.socket.SocketChannel;
+
+import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -43,16 +42,15 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import lombok.Cleanup;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.ToString;
+
import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -96,6 +94,15 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import com.google.common.collect.Sets;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.SocketChannel;
+import lombok.Cleanup;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
@Test(groups = "flaky")
public class PersistentTopicE2ETest extends BrokerTestBase {
private final List<AutoCloseable> closeables = new ArrayList<>();
@@ -104,6 +111,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase
{
private AtomicInteger inactiveCount;
+ @DataProvider(name = "topic")
+ public Object[][] isPersistent() {
+ return new Object[][] { { true }, { false } };
+ }
+
@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
@@ -1974,4 +1986,47 @@ public class PersistentTopicE2ETest extends
BrokerTestBase {
PersistentTopicE2ETest.this.inactiveCount.incrementAndGet();
}
}
-}
+
+ @Test(dataProvider = "topic")
+ public void testPersistentTopicFactory(boolean isPersistent) throws
Exception {
+ conf.setTopicFactoryClassName(MyTopicFactory.class.getName());
+ restartBroker();
+
+ final String topicName = (isPersistent ? "persistent" :
"non-persistent") + "://prop/ns-abc/factoryTopic"
+ + isPersistent;
+ MyTopicFactory.count.set(0);
+
+ // 1. producer connect
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).enableBatching(false)
+
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
+
+ assertTrue(MyTopicFactory.count.get() > 0);
+ producer.close();
+ consumer.close();
+ }
+
+ public static class MyTopicFactory implements TopicFactory {
+ private static AtomicInteger count = new AtomicInteger(0);
+
+ @Override
+ public <T extends Topic> T create(String topic, ManagedLedger ledger,
BrokerService brokerService,
+ Class<T> topicClazz) {
+ try {
+ count.incrementAndGet();
+ if(topicClazz == NonPersistentTopic.class) {
+ return (T) new NonPersistentTopic(topic, brokerService);
+ }else {
+ return (T) new PersistentTopic(topic, ledger,
brokerService);
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // No-op
+ }
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index 1c7ce107468..55d725b1810 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -238,7 +238,6 @@ public class ServiceConfigurationTest {
+ key + "' conf/broker.conf default value doesn't
match java default value\nConf: "+ fileValue + "\nJava: " + javaValue);
}
}
-
}
@Test
diff --git a/site2/docs/reference-configuration.md
b/site2/docs/reference-configuration.md
index f7a7fd5bda8..ac9c7530968 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -181,6 +181,7 @@ Pulsar brokers are responsible for handling incoming
messages from producers, di
|metadataStoreSessionTimeoutMillis| Metadata store session timeout in
milliseconds |30000|
|brokerShutdownTimeoutMs| Time to wait for broker graceful shutdown. After
this time elapses, the process will be killed |60000|
|skipBrokerShutdownOnOOM| Flag to skip broker shutdown when broker handles Out
of memory error. |false|
+|topicFactoryClassName| Factory class-name to create topic with custom
workflow. ||
|backlogQuotaCheckEnabled| Enable backlog quota check. Enforces action on
topic when the quota is reached |true|
|backlogQuotaCheckIntervalInSeconds| How often to check for topics that have
reached the quota |60|
|backlogQuotaDefaultLimitBytes| The default per-topic backlog quota limit.
Being less than 0 means no limitation. By default, it is -1. | -1 |
@@ -515,6 +516,7 @@ You can set the log level and configuration in the
[log4j2.yaml](https://github
|metadataStoreOperationTimeoutSeconds|Metadata store operation timeout in
seconds.|30|
|brokerShutdownTimeoutMs| The time to wait for graceful broker shutdown. After
this time elapses, the process will be killed. |60000|
|skipBrokerShutdownOnOOM| Flag to skip broker shutdown when broker handles Out
of memory error. |false|
+|topicFactoryClassName| Factory class-name to create topic with custom
workflow. ||
|backlogQuotaCheckEnabled| Enable the backlog quota check, which enforces a
specified action when the quota is reached. |true|
|backlogQuotaCheckIntervalInSeconds| How often to check for topics that have
reached the backlog quota. |60|
|backlogQuotaDefaultLimitBytes| The default per-topic backlog quota limit.
Being less than 0 means no limitation. By default, it is -1. |-1|