This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new f3bb89d4a68 Revert Add listener interface for namespace service #20406
f3bb89d4a68 is described below
commit f3bb89d4a685a36ffaa1dffa962b90387328fbca
Author: Jiwe Guo <[email protected]>
AuthorDate: Mon Jul 10 17:28:17 2023 +0800
Revert Add listener interface for namespace service #20406
---
.../channel/ServiceUnitStateChannelImpl.java | 1 -
.../namespace/NamespaceBundleSplitListener.java | 29 -------------------
.../pulsar/broker/namespace/NamespaceService.java | 28 ------------------
.../extensions/ExtensibleLoadManagerImplTest.java | 19 -------------
.../namespace/NamespaceCreateBundlesTest.java | 33 ----------------------
5 files changed, 110 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 717ff484fe7..4eb25848fda 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -930,7 +930,6 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
double splitBundleTime =
TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime));
log.info("Successfully split {} parent namespace-bundle to
{} in {} ms",
parentBundle, childBundles, splitBundleTime);
- namespaceService.onNamespaceBundleSplit(parentBundle);
completionFuture.complete(null);
})
.exceptionally(ex -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java
deleted file mode 100644
index a3312f5689e..00000000000
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceBundleSplitListener.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.namespace;
-
-import java.util.function.Predicate;
-import org.apache.pulsar.common.naming.NamespaceBundle;
-
-/**
- * Listener for <code>NamespaceBundle</code> split.
- */
-public interface NamespaceBundleSplitListener extends
Predicate<NamespaceBundle> {
- void onSplit(NamespaceBundle bundle);
-}
\ No newline at end of file
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 9be8d4938e3..141e48515b2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -139,10 +139,6 @@ public class NamespaceService implements AutoCloseable {
private final ConcurrentOpenHashMap<ClusterDataImpl, PulsarClientImpl>
namespaceClients;
private final List<NamespaceBundleOwnershipListener>
bundleOwnershipListeners;
-
- private final List<NamespaceBundleSplitListener> bundleSplitListeners;
-
-
private final RedirectManager redirectManager;
@@ -171,7 +167,6 @@ public class NamespaceService implements AutoCloseable {
this.namespaceClients =
ConcurrentOpenHashMap.<ClusterDataImpl,
PulsarClientImpl>newBuilder().build();
this.bundleOwnershipListeners = new CopyOnWriteArrayList<>();
- this.bundleSplitListeners = new CopyOnWriteArrayList<>();
this.localBrokerDataCache =
pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
this.redirectManager = new RedirectManager(pulsar);
}
@@ -1018,7 +1013,6 @@ public class NamespaceService implements AutoCloseable {
// affect the split operation which is already
safely completed
r.forEach(this::unloadNamespaceBundle);
}
- onNamespaceBundleSplit(bundle);
})
.exceptionally(e -> {
String msg1 = format(
@@ -1261,19 +1255,6 @@ public class NamespaceService implements AutoCloseable {
}
}
}
-
- public void onNamespaceBundleSplit(NamespaceBundle bundle) {
- for (NamespaceBundleSplitListener bundleSplitListener :
bundleSplitListeners) {
- try {
- if (bundleSplitListener.test(bundle)) {
- bundleSplitListener.onSplit(bundle);
- }
- } catch (Throwable t) {
- LOG.error("Call bundle {} split listener {} error", bundle,
bundleSplitListener, t);
- }
- }
- }
-
public void
addNamespaceBundleOwnershipListener(NamespaceBundleOwnershipListener...
listeners) {
Objects.requireNonNull(listeners);
for (NamespaceBundleOwnershipListener listener : listeners) {
@@ -1284,15 +1265,6 @@ public class NamespaceService implements AutoCloseable {
getOwnedServiceUnits().forEach(bundle ->
notifyNamespaceBundleOwnershipListener(bundle, listeners));
}
- public void
addNamespaceBundleSplitListener(NamespaceBundleSplitListener... listeners) {
- Objects.requireNonNull(listeners);
- for (NamespaceBundleSplitListener listener : listeners) {
- if (listener != null) {
- bundleSplitListeners.add(listener);
- }
- }
- }
-
private void notifyNamespaceBundleOwnershipListener(NamespaceBundle bundle,
NamespaceBundleOwnershipListener... listeners) {
if (listeners != null) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 533e6178d79..cff92ab27c6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -88,7 +88,6 @@ import
org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
-import org.apache.pulsar.broker.namespace.NamespaceBundleSplitListener;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
@@ -406,23 +405,6 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
String firstBundle = bundleRanges.get(0) + "_" + bundleRanges.get(1);
- AtomicInteger splitCount = new AtomicInteger(0);
- NamespaceBundleSplitListener namespaceBundleSplitListener = new
NamespaceBundleSplitListener() {
- @Override
- public void onSplit(NamespaceBundle bundle) {
- splitCount.incrementAndGet();
- }
-
- @Override
- public boolean test(NamespaceBundle namespaceBundle) {
- return namespaceBundle
- .toString()
- .equals(String.format(namespace + "/0x%08x_0x%08x",
bundleRanges.get(0), bundleRanges.get(1)));
- }
- };
-
pulsar1.getNamespaceService().addNamespaceBundleSplitListener(namespaceBundleSplitListener);
-
pulsar2.getNamespaceService().addNamespaceBundleSplitListener(namespaceBundleSplitListener);
-
long mid = bundleRanges.get(0) + (bundleRanges.get(1) -
bundleRanges.get(0)) / 2;
admin.namespaces().splitNamespaceBundle(namespace, firstBundle, true,
null);
@@ -435,7 +417,6 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
assertTrue(bundlesData.getBoundaries().contains(lowBundle));
assertTrue(bundlesData.getBoundaries().contains(midBundle));
assertTrue(bundlesData.getBoundaries().contains(highBundle));
- assertEquals(splitCount.get(), 1);
// Test split bundle with invalid bundle range.
try {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java
index 73cfaf1b0d9..43d37466918 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceCreateBundlesTest.java
@@ -20,21 +20,15 @@ package org.apache.pulsar.broker.namespace;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-
-import lombok.Cleanup;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.Policies;
-import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -87,31 +81,4 @@ public class NamespaceCreateBundlesTest extends
BrokerTestBase {
assertNotNull(admin.namespaces().getBookieAffinityGroup(namespaceName));
producer.close();
}
-
- @Test
- public void testBundleSplitListener() throws Exception {
- String namespaceName = "prop/" + UUID.randomUUID().toString();
- String topicName = "persistent://" + namespaceName + "/my-topic5";
- admin.namespaces().createNamespace(namespaceName);
- @Cleanup
- Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).sendTimeout(1,
- TimeUnit.SECONDS).create();
- producer.send(new byte[1]);
- String bundleRange = admin.lookups().getBundleRange(topicName);
- AtomicBoolean isTriggered = new AtomicBoolean(false);
- pulsar.getNamespaceService().addNamespaceBundleSplitListener(new
NamespaceBundleSplitListener() {
- @Override
- public void onSplit(NamespaceBundle bundle) {
- assertEquals(bundleRange, bundle.getBundleRange());
- isTriggered.set(true);
- }
-
- @Override
- public boolean test(NamespaceBundle namespaceBundle) {
- return true;
- }
- });
- admin.namespaces().splitNamespaceBundle(namespaceName, bundleRange,
false, null);
- Awaitility.await().untilAsserted(() -> assertTrue(isTriggered.get()));
- }
}