This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f38c5aa5b9 [pulsar-broker] PIP-100 Support pluggable topic factory 
(#12235)
6f38c5aa5b9 is described below

commit 6f38c5aa5b9838c3a2a5fea10e13dd3830947d9f
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Wed Jul 20 13:01:44 2022 -0700

    [pulsar-broker] PIP-100 Support pluggable topic factory (#12235)
    
    add Closeable
    
    fix test
    
    close factory and handle exception
---
 conf/broker.conf                                   |  3 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 ++
 .../pulsar/broker/service/BrokerService.java       | 43 ++++++++++++-
 .../apache/pulsar/broker/service/TopicFactory.java | 31 ++++++++++
 .../broker/service/PersistentTopicE2ETest.java     | 71 +++++++++++++++++++---
 .../common/naming/ServiceConfigurationTest.java    |  1 -
 site2/docs/reference-configuration.md              |  2 +
 7 files changed, 146 insertions(+), 11 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index d2e13d117e8..c19afe981ca 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -136,6 +136,9 @@ brokerShutdownTimeoutMs=60000
 # Flag to skip broker shutdown when broker handles Out of memory error
 skipBrokerShutdownOnOOM=false
 
+# Factory class-name to create topic with custom workflow
+topicFactoryClassName=
+
 # Enable backlog quota check. Enforces action on topic when the quota is 
reached
 backlogQuotaCheckEnabled=true
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index d2e68fb437c..ad4188d2882 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -479,6 +479,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private String metadataStoreConfigPath = null;
 
+    @FieldContext(
+            dynamic = true,
+            doc = "Factory class-name to create topic with custom workflow"
+        )
+    private String topicFactoryClassName;
+
     @FieldContext(
         category = CATEGORY_POLICIES,
         doc = "Enable backlog quota check. Enforces actions on topic when the 
quota is reached"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e80d603a141..8e701ebf5bf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -276,6 +276,7 @@ public class BrokerService implements Closeable {
     private final LongAdder pausedConnections = new LongAdder();
     private BrokerInterceptor interceptor;
     private ImmutableMap<String, EntryFilterWithClassLoader> entryFilters;
+    private TopicFactory topicFactory;
 
     private Set<BrokerEntryMetadataInterceptor> 
brokerEntryMetadataInterceptors;
     private Set<ManagedLedgerPayloadProcessor> brokerEntryPayloadProcessors;
@@ -346,6 +347,7 @@ public class BrokerService implements Closeable {
         this.authenticationService = new 
AuthenticationService(pulsar.getConfiguration());
         this.blockedDispatchers =
                 
ConcurrentOpenHashSet.<PersistentDispatcherMultipleConsumers>newBuilder().build();
+        this.topicFactory = createPersistentTopicFactory();
         // update dynamic configuration and register-listener
         updateConfigurationAndRegisterListeners();
         this.lookupRequestSemaphore = new AtomicReference<Semaphore>(
@@ -1127,7 +1129,13 @@ public class BrokerService implements Closeable {
                     new NotAllowedException("Broker is not unable to load 
non-persistent topic"));
         }
         final long topicCreateTimeMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
-        NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, 
this);
+        NonPersistentTopic nonPersistentTopic;
+        try {
+            nonPersistentTopic = newTopic(topic, null, this, 
NonPersistentTopic.class);
+        } catch (Exception e) {
+            log.warn("Failed to create topic {}", topic, e);
+            return FutureUtil.failedFuture(e);
+        }
         CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
         isOwner.thenRun(() -> {
             nonPersistentTopic.initialize()
@@ -1412,7 +1420,7 @@ public class BrokerService implements Closeable {
                             try {
                                 PersistentTopic persistentTopic = 
isSystemTopic(topic)
                                         ? new SystemTopic(topic, ledger, 
BrokerService.this)
-                                        : new PersistentTopic(topic, ledger, 
BrokerService.this);
+                                        : newTopic(topic, ledger, 
BrokerService.this, PersistentTopic.class);
                                 persistentTopic
                                         .initialize()
                                         .thenCompose(__ -> 
persistentTopic.preCreateSubscriptionForCompactionIfNeeded())
@@ -3061,6 +3069,37 @@ public class BrokerService implements Closeable {
         return pausedConnections.longValue();
     }
 
+    @SuppressWarnings("unchecked")
+    private <T extends Topic> T newTopic(String topic, ManagedLedger ledger, 
BrokerService brokerService,
+            Class<T> topicClazz) throws PulsarServerException {
+        if (topicFactory != null) {
+            try {
+                Topic newTopic = topicFactory.create(topic, ledger, 
brokerService, topicClazz);
+                if (newTopic != null) {
+                    return (T) newTopic;
+                }
+            } catch (Throwable e) {
+                log.warn("Failed to create persistent topic using factory {}", 
topic, e);
+                throw new PulsarServerException("Topic factory failed to 
create topic ", e);
+            }
+        }
+        return topicClazz == NonPersistentTopic.class ? (T) new 
NonPersistentTopic(topic, BrokerService.this)
+                : (T) new PersistentTopic(topic, ledger, brokerService);
+    }
+
+    private TopicFactory createPersistentTopicFactory() throws Exception {
+        String topicFactoryClassName = 
pulsar.getConfig().getTopicFactoryClassName();
+        if (StringUtils.isNotBlank(topicFactoryClassName)) {
+            try {
+                return (TopicFactory) 
Class.forName(topicFactoryClassName).newInstance();
+            } catch (Exception e) {
+                log.warn("Failed to initialize topic factory class {}", 
topicFactoryClassName, e);
+                throw e;
+            }
+        }
+        return null;
+    }
+
     @VisibleForTesting
     public void 
setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) {
         this.pulsarChannelInitFactory = factory;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java
new file mode 100644
index 00000000000..d407a9cc613
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.io.Closeable;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+
+/**
+ * Pluggable TopicFactory to create topic with specific behavior in broker.
+ * Note: This API and feature is in experimental phase.
+ */
+public interface TopicFactory extends Closeable {
+
+    <T extends Topic> T create(String topic, ManagedLedger ledger, 
BrokerService brokerService, Class<T> topicClazz);
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index c3f6a634a11..f8ee8a3148d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -25,9 +25,8 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-import com.google.common.collect.Sets;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.socket.SocketChannel;
+
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -43,16 +42,15 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import lombok.Cleanup;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.ToString;
+
 import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -96,6 +94,15 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.Sets;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.SocketChannel;
+import lombok.Cleanup;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
 @Test(groups = "flaky")
 public class PersistentTopicE2ETest extends BrokerTestBase {
     private final List<AutoCloseable> closeables = new ArrayList<>();
@@ -104,6 +111,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase 
{
 
     private AtomicInteger inactiveCount;
 
+    @DataProvider(name = "topic")
+    public Object[][] isPersistent() {
+        return new Object[][] { { true }, { false } };
+    }
+
     @BeforeMethod(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
@@ -1974,4 +1986,47 @@ public class PersistentTopicE2ETest extends 
BrokerTestBase {
             PersistentTopicE2ETest.this.inactiveCount.incrementAndGet();
         }
     }
-}
+
+    @Test(dataProvider = "topic")
+    public void testPersistentTopicFactory(boolean isPersistent) throws 
Exception {
+        conf.setTopicFactoryClassName(MyTopicFactory.class.getName());
+        restartBroker();
+
+        final String topicName = (isPersistent ? "persistent" : 
"non-persistent") + "://prop/ns-abc/factoryTopic"
+                + isPersistent;
+        MyTopicFactory.count.set(0);
+
+        // 1. producer connect
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).enableBatching(false)
+                
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
+
+        assertTrue(MyTopicFactory.count.get() > 0);
+        producer.close();
+        consumer.close();
+    }
+
+    public static class MyTopicFactory implements TopicFactory {
+        private static AtomicInteger count = new AtomicInteger(0);
+
+        @Override
+        public <T extends Topic> T create(String topic, ManagedLedger ledger, 
BrokerService brokerService,
+                Class<T> topicClazz) {
+            try {
+                count.incrementAndGet();
+                if(topicClazz == NonPersistentTopic.class) {
+                    return (T) new NonPersistentTopic(topic, brokerService);
+                }else {
+                    return (T) new PersistentTopic(topic, ledger, 
brokerService); 
+                }
+            } catch (Exception e) {
+                throw new IllegalStateException(e);
+            }
+        }
+
+        @Override
+        public void close() throws IOException {
+            // No-op
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index 1c7ce107468..55d725b1810 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -238,7 +238,6 @@ public class ServiceConfigurationTest {
                         + key + "' conf/broker.conf default value doesn't 
match java default value\nConf: "+ fileValue + "\nJava: " + javaValue);
             }
         }
-
     }
 
     @Test
diff --git a/site2/docs/reference-configuration.md 
b/site2/docs/reference-configuration.md
index f7a7fd5bda8..ac9c7530968 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -181,6 +181,7 @@ Pulsar brokers are responsible for handling incoming 
messages from producers, di
 |metadataStoreSessionTimeoutMillis| Metadata store session timeout in 
milliseconds |30000|
 |brokerShutdownTimeoutMs| Time to wait for broker graceful shutdown. After 
this time elapses, the process will be killed  |60000|
 |skipBrokerShutdownOnOOM| Flag to skip broker shutdown when broker handles Out 
of memory error. |false|
+|topicFactoryClassName| Factory class-name to create topic with custom 
workflow. ||
 |backlogQuotaCheckEnabled|  Enable backlog quota check. Enforces action on 
topic when the quota is reached  |true|
 |backlogQuotaCheckIntervalInSeconds|  How often to check for topics that have 
reached the quota |60|
 |backlogQuotaDefaultLimitBytes| The default per-topic backlog quota limit. 
Being less than 0 means no limitation. By default, it is -1. | -1 |
@@ -515,6 +516,7 @@ You can set the log level and configuration in the  
[log4j2.yaml](https://github
 |metadataStoreOperationTimeoutSeconds|Metadata store operation timeout in 
seconds.|30|
 |brokerShutdownTimeoutMs| The time to wait for graceful broker shutdown. After 
this time elapses, the process will be killed. |60000|
 |skipBrokerShutdownOnOOM| Flag to skip broker shutdown when broker handles Out 
of memory error. |false|
+|topicFactoryClassName| Factory class-name to create topic with custom 
workflow. ||
 |backlogQuotaCheckEnabled|  Enable the backlog quota check, which enforces a 
specified action when the quota is reached.  |true|
 |backlogQuotaCheckIntervalInSeconds|  How often to check for topics that have 
reached the backlog quota.  |60|
 |backlogQuotaDefaultLimitBytes| The default per-topic backlog quota limit. 
Being less than 0 means no limitation. By default, it is -1. |-1|

Reply via email to