This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new f3bb89d4a68 Revert Add listener interface for namespace service #20406
f3bb89d4a68 is described below

commit f3bb89d4a685a36ffaa1dffa962b90387328fbca
Author: Jiwe Guo <[email protected]>
AuthorDate: Mon Jul 10 17:28:17 2023 +0800

    Revert Add listener interface for namespace service #20406
---
 .../channel/ServiceUnitStateChannelImpl.java       |  1 -
 .../namespace/NamespaceBundleSplitListener.java    | 29 -------------------
 .../pulsar/broker/namespace/NamespaceService.java  | 28 ------------------
 .../extensions/ExtensibleLoadManagerImplTest.java  | 19 -------------
 .../namespace/NamespaceCreateBundlesTest.java      | 33 ----------------------
 5 files changed, 110 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 717ff484fe7..4eb25848fda 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -930,7 +930,6 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                     double splitBundleTime = 
TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime));
                     log.info("Successfully split {} parent namespace-bundle to 
{} in {} ms",
                             parentBundle, childBundles, splitBundleTime);
-                    namespaceService.onNamespaceBundleSplit(parentBundle);
                     completionFuture.complete(null);
                 })
                 .exceptionally(ex -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java
deleted file mode 100644
index a3312f5689e..00000000000
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.namespace;
-
-import java.util.function.Predicate;
-import org.apache.pulsar.common.naming.NamespaceBundle;
-
-/**
- * Listener for <code>NamespaceBundle</code> split.
- */
-public interface NamespaceBundleSplitListener extends 
Predicate<NamespaceBundle> {
-    void onSplit(NamespaceBundle bundle);
-}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 9be8d4938e3..141e48515b2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -139,10 +139,6 @@ public class NamespaceService implements AutoCloseable {
     private final ConcurrentOpenHashMap<ClusterDataImpl, PulsarClientImpl> 
namespaceClients;
 
     private final List<NamespaceBundleOwnershipListener> 
bundleOwnershipListeners;
-
-    private final List<NamespaceBundleSplitListener> bundleSplitListeners;
-
-
     private final RedirectManager redirectManager;
 
 
@@ -171,7 +167,6 @@ public class NamespaceService implements AutoCloseable {
         this.namespaceClients =
                 ConcurrentOpenHashMap.<ClusterDataImpl, 
PulsarClientImpl>newBuilder().build();
         this.bundleOwnershipListeners = new CopyOnWriteArrayList<>();
-        this.bundleSplitListeners = new CopyOnWriteArrayList<>();
         this.localBrokerDataCache = 
pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
         this.redirectManager = new RedirectManager(pulsar);
     }
@@ -1018,7 +1013,6 @@ public class NamespaceService implements AutoCloseable {
                                 // affect the split operation which is already 
safely completed
                                 r.forEach(this::unloadNamespaceBundle);
                             }
-                            onNamespaceBundleSplit(bundle);
                         })
                         .exceptionally(e -> {
                             String msg1 = format(
@@ -1261,19 +1255,6 @@ public class NamespaceService implements AutoCloseable {
             }
         }
     }
-
-    public void onNamespaceBundleSplit(NamespaceBundle bundle) {
-        for (NamespaceBundleSplitListener bundleSplitListener : 
bundleSplitListeners) {
-            try {
-                if (bundleSplitListener.test(bundle)) {
-                    bundleSplitListener.onSplit(bundle);
-                }
-            } catch (Throwable t) {
-                LOG.error("Call bundle {} split listener {} error", bundle, 
bundleSplitListener, t);
-            }
-        }
-    }
-
     public void 
addNamespaceBundleOwnershipListener(NamespaceBundleOwnershipListener... 
listeners) {
         Objects.requireNonNull(listeners);
         for (NamespaceBundleOwnershipListener listener : listeners) {
@@ -1284,15 +1265,6 @@ public class NamespaceService implements AutoCloseable {
         getOwnedServiceUnits().forEach(bundle -> 
notifyNamespaceBundleOwnershipListener(bundle, listeners));
     }
 
-    public void 
addNamespaceBundleSplitListener(NamespaceBundleSplitListener... listeners) {
-        Objects.requireNonNull(listeners);
-        for (NamespaceBundleSplitListener listener : listeners) {
-            if (listener != null) {
-                bundleSplitListeners.add(listener);
-            }
-        }
-    }
-
     private void notifyNamespaceBundleOwnershipListener(NamespaceBundle bundle,
                     NamespaceBundleOwnershipListener... listeners) {
         if (listeners != null) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 533e6178d79..cff92ab27c6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -88,7 +88,6 @@ import 
org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
 import org.apache.pulsar.broker.lookup.LookupResult;
 import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
-import org.apache.pulsar.broker.namespace.NamespaceBundleSplitListener;
 import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
@@ -406,23 +405,6 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
 
         String firstBundle = bundleRanges.get(0) + "_" + bundleRanges.get(1);
 
-        AtomicInteger splitCount = new AtomicInteger(0);
-        NamespaceBundleSplitListener namespaceBundleSplitListener = new 
NamespaceBundleSplitListener() {
-            @Override
-            public void onSplit(NamespaceBundle bundle) {
-                splitCount.incrementAndGet();
-            }
-
-            @Override
-            public boolean test(NamespaceBundle namespaceBundle) {
-                return namespaceBundle
-                        .toString()
-                        .equals(String.format(namespace + "/0x%08x_0x%08x", 
bundleRanges.get(0), bundleRanges.get(1)));
-            }
-        };
-        
pulsar1.getNamespaceService().addNamespaceBundleSplitListener(namespaceBundleSplitListener);
-        
pulsar2.getNamespaceService().addNamespaceBundleSplitListener(namespaceBundleSplitListener);
-
         long mid = bundleRanges.get(0) + (bundleRanges.get(1) - 
bundleRanges.get(0)) / 2;
 
         admin.namespaces().splitNamespaceBundle(namespace, firstBundle, true, 
null);
@@ -435,7 +417,6 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         assertTrue(bundlesData.getBoundaries().contains(lowBundle));
         assertTrue(bundlesData.getBoundaries().contains(midBundle));
         assertTrue(bundlesData.getBoundaries().contains(highBundle));
-        assertEquals(splitCount.get(), 1);
 
         // Test split bundle with invalid bundle range.
         try {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java
index 73cfaf1b0d9..43d37466918 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java
@@ -20,21 +20,15 @@ package org.apache.pulsar.broker.namespace;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-
-import lombok.Cleanup;
 
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.Policies;
-import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -87,31 +81,4 @@ public class NamespaceCreateBundlesTest extends 
BrokerTestBase {
         
assertNotNull(admin.namespaces().getBookieAffinityGroup(namespaceName));
         producer.close();
     }
-
-    @Test
-    public void testBundleSplitListener() throws Exception {
-        String namespaceName = "prop/" + UUID.randomUUID().toString();
-        String topicName = "persistent://" + namespaceName + "/my-topic5";
-        admin.namespaces().createNamespace(namespaceName);
-        @Cleanup
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).sendTimeout(1,
-            TimeUnit.SECONDS).create();
-        producer.send(new byte[1]);
-        String bundleRange = admin.lookups().getBundleRange(topicName);
-        AtomicBoolean isTriggered = new AtomicBoolean(false);
-        pulsar.getNamespaceService().addNamespaceBundleSplitListener(new 
NamespaceBundleSplitListener() {
-            @Override
-            public void onSplit(NamespaceBundle bundle) {
-                assertEquals(bundleRange, bundle.getBundleRange());
-                isTriggered.set(true);
-            }
-
-            @Override
-            public boolean test(NamespaceBundle namespaceBundle) {
-                return true;
-            }
-        });
-        admin.namespaces().splitNamespaceBundle(namespaceName, bundleRange, 
false, null);
-        Awaitility.await().untilAsserted(() -> assertTrue(isTriggered.get()));
-    }
 }

Reply via email to