This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ce102dac329 [fix][broker]Data lost due to conflict loaded up a topic
for two brokers, when enabled ServiceUnitStateMetadataStoreTableViewImpl
(#24478)
ce102dac329 is described below
commit ce102dac32978740b8dd7ddb1598ce3263ef6202
Author: fengyubiao <[email protected]>
AuthorDate: Wed Jul 9 10:12:38 2025 +0800
[fix][broker]Data lost due to conflict loaded up a topic for two brokers,
when enabled ServiceUnitStateMetadataStoreTableViewImpl (#24478)
---
.../channel/ServiceUnitStateChannelImpl.java | 23 ++-
...ServiceUnitStateMetadataStoreTableViewImpl.java | 30 ++-
.../channel/ServiceUnitStateTableView.java | 15 +-
.../channel/ServiceUnitStateTableViewBase.java | 7 +-
.../channel/ServiceUnitStateTableViewImpl.java | 9 +-
.../channel/ServiceUnitStateTableViewSyncer.java | 4 +
.../ExtensibleLoadManagerImplBaseTest.java | 8 +
.../channel/ServiceUnitStateChannelTest.java | 38 +++-
.../broker/service/NetworkErrorTestBase.java | 20 ++
.../pulsar/broker/service/ZkSessionExpireTest.java | 146 ++++++++++++---
.../tableview/impl/MetadataStoreTableViewImpl.java | 206 ++++++++++++++++-----
11 files changed, 419 insertions(+), 87 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 3e2a9c7468d..ecd98f188b4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -315,7 +315,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
pulsar.getConfiguration().getDefaultNumberOfNamespaceBundles());
tableview = createServiceUnitStateTableView();
- tableview.start(pulsar, this::handleEvent, this::handleExisting);
+ tableview.start(pulsar, this::handleEvent, this::handleExisting,
this::handleInvalidate);
if (debug) {
log.info("Successfully started the channel tableview.");
@@ -465,7 +465,12 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
// we return the owner without its activeness check.
// This broker tries to serve lookups on a best efforts basis when
metadata store connection is unstable.
if (!brokerRegistry.isRegistered()) {
- return CompletableFuture.completedFuture(owner);
+ if (tableview.isMetadataStoreBased()) {
+ return FutureUtil.failedFuture(new
MetadataStoreException("broker is unavailable so far because it is"
+ + " in the state that tries to reconnect to the metadata
store."));
+ } else {
+ return CompletableFuture.completedFuture(owner);
+ }
}
return dedupeGetOwnerRequest(serviceUnit)
@@ -736,6 +741,20 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
}
+ /***
+ * When the {@link #tableview} can not determine the ownership of the
service-unit, this method will be called.
+ * Often happens when the current broker can not connect to others.
+ */
+ private void handleInvalidate(String serviceUnit, ServiceUnitStateData
data) {
+ closeServiceUnit(serviceUnit, true).whenComplete((__, ex) -> {
+ if (ex == null) {
+ log.info("Unloaded serviceUnit:{} because the ownership is
invalidate", serviceUnit);
+ return;
+ }
+ log.error("Failed to unload serviceUnit:{} after the ownership is
invalidate", serviceUnit, ex);
+ });
+ }
+
private static boolean isTransferCommand(ServiceUnitStateData data) {
if (data == null) {
return false;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateMetadataStoreTableViewImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateMetadataStoreTableViewImpl.java
index f488b31c774..f70fbbb45c9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateMetadataStoreTableViewImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateMetadataStoreTableViewImpl.java
@@ -31,10 +31,12 @@ import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.MetadataSessionExpiredPolicy;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreTableView;
+import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
import org.apache.pulsar.metadata.tableview.impl.MetadataStoreTableViewImpl;
@Slf4j
@@ -54,13 +56,23 @@ public class ServiceUnitStateMetadataStoreTableViewImpl
extends ServiceUnitState
private ServiceUnitStateDataConflictResolver conflictResolver;
private volatile MetadataStoreTableView<ServiceUnitStateData> tableview;
+ @Override
public void start(PulsarService pulsar,
BiConsumer<String, ServiceUnitStateData>
tailItemListener,
- BiConsumer<String, ServiceUnitStateData>
existingItemListener)
+ BiConsumer<String, ServiceUnitStateData>
existingItemListener,
+ BiConsumer<String, ServiceUnitStateData>
outdatedItemListeners)
throws MetadataStoreException {
init(pulsar);
conflictResolver = new ServiceUnitStateDataConflictResolver();
conflictResolver.setStorageType(MetadataStore);
+ if (!(pulsar.getLocalMetadataStore() instanceof AbstractMetadataStore)
+ &&
!MetadataSessionExpiredPolicy.shutdown.equals(pulsar.getConfig().getZookeeperSessionExpiredPolicy()))
{
+ String errorMsg = String.format("Your current metadata store [%s]
does not support the registration of "
+ + "session event listeners. Please set
\"zookeeperSessionExpiredPolicy\" to \"shutdown\";"
+ + " otherwise, you will encounter the issue that messages
lost because of conflicted topic loading",
+ pulsar.getLocalMetadataStore().getClass().getName());
+ log.warn(errorMsg);
+ }
tableview = new
MetadataStoreTableViewImpl<>(ServiceUnitStateData.class,
pulsar.getBrokerId(),
pulsar.getLocalMetadataStore(),
@@ -69,12 +81,21 @@ public class ServiceUnitStateMetadataStoreTableViewImpl
extends ServiceUnitState
this::validateServiceUnitPath,
List.of(this::updateOwnedServiceUnits, tailItemListener),
List.of(this::updateOwnedServiceUnits, existingItemListener),
-
TimeUnit.SECONDS.toMillis(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds())
+ List.of(this::invalidateOwnedServiceUnits,
outdatedItemListeners),
+ true,
+
TimeUnit.SECONDS.toMillis(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds()),
+ t -> handleTableViewShutDownEvent(t)
);
tableview.start();
}
+ protected void handleTableViewShutDownEvent(Throwable throwable) {
+ log.error("The component of load-balance, which named metadata store
table view has shutdown. This Broker can"
+ + " not work anymore, start tp shutdow,");
+ pulsar.shutdownNow();
+ }
+
protected boolean resolveConflict(ServiceUnitStateData prev,
ServiceUnitStateData cur) {
return !conflictResolver.shouldKeepLeft(prev, cur);
}
@@ -131,6 +152,11 @@ public class ServiceUnitStateMetadataStoreTableViewImpl
extends ServiceUnitState
// no-op
}
+ @Override
+ public boolean isMetadataStoreBased() {
+ return true;
+ }
+
@Override
public CompletableFuture<Void> delete(String key) {
if (!isValidState()) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableView.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableView.java
index 5ac57fe5c19..e75bbfeb1a3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableView.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableView.java
@@ -47,10 +47,18 @@ public interface ServiceUnitStateTableView extends
Closeable {
* @param tailItemListener listener to listen tail(newly updated) items
* @param existingItemListener listener to listen existing items
* @throws IOException if it fails to init the tableview.
+ * @param itemOutdatedListeners Let's introduce how to ensure the correct
element values: 1. The table
+ * view will try its best to ensure that the data is always the
latest. When it cannot be guaranteed, see
+ * the next Article 2. If you get an old value, you will receive an
update event later, ultimately ensuring
+ * the accuracy of the data. 3. You will receive this notification
when the first two cannot be guaranteed
+ * due to the expiration of the metadata store session of the table
view。After that, you also received
+ * notifications {@param tailItemListeners} and {@param
existingItemListeners}, indicating that the table
+ * view can once again ensure that the first two can work properly.
*/
void start(PulsarService pulsar,
BiConsumer<String, ServiceUnitStateData> tailItemListener,
- BiConsumer<String, ServiceUnitStateData> existingItemListener)
throws IOException;
+ BiConsumer<String, ServiceUnitStateData> existingItemListener,
+ BiConsumer<String, ServiceUnitStateData> itemOutdatedListeners)
throws IOException;
/**
@@ -110,4 +118,9 @@ public interface ServiceUnitStateTableView extends
Closeable {
* @throws TimeoutException
*/
void flush(long waitDurationInMillis) throws ExecutionException,
InterruptedException, TimeoutException;
+
+ /**
+ * Whether it depends on the local metadata store.
+ */
+ boolean isMetadataStoreBased();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewBase.java
index b690ef101e1..a236de89f0e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewBase.java
@@ -43,7 +43,7 @@ abstract class ServiceUnitStateTableViewBase implements
ServiceUnitStateTableVie
private final Map<NamespaceBundle, Boolean> ownedServiceUnitsMap = new
ConcurrentHashMap<>();
private final Set<NamespaceBundle> ownedServiceUnits =
Collections.unmodifiableSet(ownedServiceUnitsMap.keySet());
private String brokerId;
- private PulsarService pulsar;
+ protected PulsarService pulsar;
protected void init(PulsarService pulsar) throws MetadataStoreException {
this.pulsar = pulsar;
this.brokerId = pulsar.getBrokerId();
@@ -89,4 +89,9 @@ abstract class ServiceUnitStateTableViewBase implements
ServiceUnitStateTableVie
}
});
}
+
+ protected void invalidateOwnedServiceUnits(String key,
ServiceUnitStateData outdatedVal) {
+ NamespaceBundle namespaceBundle =
LoadManagerShared.getNamespaceBundle(pulsar, key);
+ ownedServiceUnitsMap.compute(namespaceBundle, (k, v) -> false);
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java
index 12cf87445a3..d1ac87fdf3c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java
@@ -51,9 +51,11 @@ public class ServiceUnitStateTableViewImpl extends
ServiceUnitStateTableViewBase
private volatile Producer<ServiceUnitStateData> producer;
private volatile TableView<ServiceUnitStateData> tableview;
+ @Override
public void start(PulsarService pulsar,
BiConsumer<String, ServiceUnitStateData>
tailItemListener,
- BiConsumer<String, ServiceUnitStateData>
existingItemListener) throws IOException {
+ BiConsumer<String, ServiceUnitStateData>
existingItemListener,
+ BiConsumer<String, ServiceUnitStateData>
itemOutdatedListeners) throws IOException {
boolean debug =
ExtensibleLoadManagerImpl.debug(pulsar.getConfiguration(), log);
init(pulsar);
@@ -175,6 +177,11 @@ public class ServiceUnitStateTableViewImpl extends
ServiceUnitStateTableViewBase
tableview.refreshAsync().get(waitTimeMs, MILLISECONDS);
}
+ @Override
+ public boolean isMetadataStoreBased() {
+ return false;
+ }
+
@Override
public CompletableFuture<Void> delete(String key) {
return put(key, null);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java
index 10ab39a66d2..45ff0dcb267 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java
@@ -100,6 +100,7 @@ public class ServiceUnitStateTableViewSyncer implements
Closeable {
metadataStoreTableView.start(
pulsar,
this::dummy,
+ this::dummy,
this::dummy
);
@@ -108,6 +109,7 @@ public class ServiceUnitStateTableViewSyncer implements
Closeable {
systemTopicTableView.start(
pulsar,
this::dummy,
+ this::dummy,
this::dummy
);
@@ -152,6 +154,7 @@ public class ServiceUnitStateTableViewSyncer implements
Closeable {
this.metadataStoreTableView.start(
pulsar,
this::syncToSystemTopic,
+ this::dummy,
this::dummy
);
log.info("Started MetadataStoreTableView");
@@ -160,6 +163,7 @@ public class ServiceUnitStateTableViewSyncer implements
Closeable {
this.systemTopicTableView.start(
pulsar,
this::syncToMetadataStore,
+ this::dummy,
this::dummy
);
log.info("Started SystemTopicTableView");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
index bb224cdf7c4..fff5761eb48 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.MetadataSessionExpiredPolicy;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -78,6 +79,13 @@ public abstract class ExtensibleLoadManagerImplBaseTest
extends MockedPulsarServ
protected String serviceUnitStateTableViewClassName;
+ @Override
+ protected ServiceConfiguration getDefaultConf() {
+ ServiceConfiguration conf = super.getDefaultConf();
+
conf.setZookeeperSessionExpiredPolicy(MetadataSessionExpiredPolicy.shutdown);
+ return conf;
+ }
+
protected ArrayList<PulsarClient> clients = new ArrayList<>();
@DataProvider(name = "serviceUnitStateTableViewClassName")
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 63d6bbb652a..bbde38bfbec 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -96,6 +96,7 @@ import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreTableView;
import org.apache.pulsar.metadata.api.NotificationType;
@@ -1791,15 +1792,34 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
assertTrue(ex.getCause() instanceof IllegalStateException);
assertTrue(System.currentTimeMillis() - start >= 1000);
- try {
- // verify getOwnerAsync returns immediately when not registered
- registry.unregister();
- start = System.currentTimeMillis();
- assertEquals(broker, channel1.getOwnerAsync(bundle).get().get());
- elapsed = System.currentTimeMillis() - start;
- assertTrue(elapsed < 1000);
- } finally {
- registry.registerAsync().join();
+ if
(pulsar1.getConfig().getLoadManagerServiceUnitStateTableViewClassName()
+ .equals(ServiceUnitStateTableViewImpl.class.getName())) {
+ try {
+ // verify getOwnerAsync returns immediately when not registered
+ registry.unregister();
+ start = System.currentTimeMillis();
+ assertEquals(broker,
channel1.getOwnerAsync(bundle).get().get());
+ elapsed = System.currentTimeMillis() - start;
+ assertTrue(elapsed < 1000);
+ } finally {
+ registry.registerAsync().join();
+ }
+ }
+
+ if
(pulsar1.getConfig().getLoadManagerServiceUnitStateTableViewClassName()
+
.equals(ServiceUnitStateMetadataStoreTableViewImpl.class.getName())) {
+ try {
+ // verify getOwnerAsync returns immediately when not registered
+ registry.unregister();
+ channel1.getOwnerAsync(bundle).get().get();
+ fail("Request should fail because it is in the state that
tries to reconnect to the metadata store");
+ } catch (Exception e) {
+ Throwable actEx = FutureUtil.unwrapCompletionException(e);
+ assertTrue(actEx instanceof MetadataStoreException);
+ assertTrue(actEx.getMessage().contains("reconnect to the
metadata store."));
+ } finally {
+ registry.registerAsync().join();
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
index 0161a4a63cf..4a0c73b4e00 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
@@ -23,17 +23,20 @@ import java.io.IOException;
import java.net.ServerSocket;
import java.net.URL;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -301,6 +304,23 @@ public abstract class NetworkErrorTestBase extends
TestRetrySupport {
}
}
+ public static Collection<String> getOwnedBundles(PulsarService pulsar) {
+ Object loadManagerWrapper = pulsar.getLoadManager().get();
+ Object loadManager = WhiteboxImpl.getInternalState(loadManagerWrapper,
"loadManager");
+ if (loadManager instanceof ModularLoadManagerImpl) {
+ return
pulsar.getNamespaceService().getOwnershipCache().getOwnedBundles()
+ .keySet().stream().map(k -> k.getNamespaceObject().toString()
+ "/" + k.getBundleRange())
+ .collect(Collectors.toList());
+ } else if (loadManager instanceof ExtensibleLoadManagerImpl
extensibleLoadManager) {
+ ServiceUnitStateChannel serviceUnitStateChannel =
extensibleLoadManager.getServiceUnitStateChannel();
+ return serviceUnitStateChannel.getOwnedServiceUnits().stream()
+ .map(k -> k.getNamespaceObject().toString() + "/" +
k.getBundleRange())
+ .collect(Collectors.toList());
+ } else {
+ throw new RuntimeException("Not support for the load manager: " +
loadManager.getClass().getName());
+ }
+ }
+
public void clearPreferBroker() {
preferBroker.set(null);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java
index b05dae2c3c2..339e5a88786 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java
@@ -19,22 +19,34 @@
package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableView;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.metadata.api.MetadataStoreTableView;
import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -64,22 +76,45 @@ public class ZkSessionExpireTest extends
NetworkErrorTestBase {
@DataProvider(name = "settings")
public Object[][] settings() {
return new Object[][]{
- {false, NetworkErrorTestBase.PreferBrokerModularLoadManager.class},
- {true, NetworkErrorTestBase.PreferBrokerModularLoadManager.class}
+// {false,
NetworkErrorTestBase.PreferBrokerModularLoadManager.class, null},
+// {true,
NetworkErrorTestBase.PreferBrokerModularLoadManager.class, null},
+ {true, NetworkErrorTestBase.PreferExtensibleLoadManager.class,
TableViewType.MetadataStore},
// Create a separate PR to add this test case.
- // {true, NetworkErrorTestBase.PreferExtensibleLoadManager.class}.
+ // {true, NetworkErrorTestBase.PreferExtensibleLoadManager.class,
TableViewType.SystemTopic}
};
}
- @Test(timeOut = 600 * 1000, dataProvider = "settings")
- public void testTopicUnloadAfterSessionRebuild(boolean enableSystemTopic,
Class loadManager) throws Exception {
+ private enum TableViewType {
+ SystemTopic,
+ MetadataStore;
+ }
+
+ @Test(timeOut = 180 * 1000, dataProvider = "settings")
+ public void testTopicUnloadAfterSessionRebuild(boolean enableSystemTopic,
Class loadManager,
+ TableViewType
tableViewType) throws Exception {
// Setup.
setupWithSettings(config -> {
config.setManagedLedgerMaxEntriesPerLedger(1);
config.setSystemTopicEnabled(enableSystemTopic);
config.setTopicLevelPoliciesEnabled(enableSystemTopic);
- if (loadManager != null) {
- config.setLoadManagerClassName(loadManager.getName());
+ // Set load manager.
+ if (loadManager == null) {
+ return;
+ }
+ config.setLoadManagerClassName(loadManager.getName());
+ if (loadManager.getSimpleName().equals("ModularLoadManagerImpl")) {
+ return;
+ }
+ // Set params of metadata store table view.
+ if
(loadManager.getSimpleName().equals("PreferExtensibleLoadManager")) {
+ if (tableViewType == TableViewType.MetadataStore) {
+
config.setLoadManagerServiceUnitStateTableViewClassName("org.apache.pulsar.broker.loadbalance"
+ +
".extensions.channel.ServiceUnitStateMetadataStoreTableViewImpl");
+
config.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.extensions"
+ + ".scheduler.TransferShedder");
+ config.setLoadBalancerSheddingEnabled(true);
+ config.setLoadBalancerTransferEnabled(true);
+ }
}
});
@@ -104,6 +139,8 @@ public class ZkSessionExpireTest extends
NetworkErrorTestBase {
ProducerImpl<String> p1 = (ProducerImpl<String>)
client1.newProducer(Schema.STRING).topic(topicName)
.sendTimeout(10, TimeUnit.SECONDS).create();
Topic broker1Topic1 = pulsar1.getBrokerService().getTopic(topicName,
false).join().get();
+ final String brokerAddr1 = admin1.lookups().lookupTopic(topicName);
+ log.info("the addr of broker 1: {}", brokerAddr1);
assertNotNull(broker1Topic1);
clearPreferBroker();
@@ -139,13 +176,24 @@ public class ZkSessionExpireTest extends
NetworkErrorTestBase {
// Publishing to broker-2 will success.
CompletableFuture<MessageId> broker1Send2 =
p1.sendAsync("broker1_msg2");
CompletableFuture<MessageId> broker2Send2 =
p2.sendAsync("broker2_msg2");
+ admin1.brokers().getActiveBrokers();
try {
broker1Send1.join();
broker1Send2.join();
p1.getClientCnx();
- fail("expected a publish error");
+ // Since system topic load balancer does not rely on ZK, the
publishing can succeed.
+ if (loadManager.getSimpleName().equals("ModularLoadManagerImpl")
+ || TableViewType.MetadataStore.equals(tableViewType)) {
+ fail("Since the client can not get a correct response of
lookup from the broker who can not connect to"
+ + " metadata store, it should failed to publish");
+ }
} catch (Exception ex) {
- // Expected.
+ // Since topic transferring of system topic base load balancer
does not rely on ZK, the both publish can
+ // succeed.
+ if (TableViewType.SystemTopic.equals(tableViewType)) {
+ fail("Since topic transferring of system topic base load
balancer does not rely on ZK, the both publish"
+ + " should succeed");
+ }
}
broker2Send1.join();
broker2Send2.join();
@@ -153,29 +201,63 @@ public class ZkSessionExpireTest extends
NetworkErrorTestBase {
// Broker rebuild ZK session.
metadataZKProxy.unRejectAllConnections();
Awaitility.await().untilAsserted(() -> {
- assertEquals(getAvailableBrokers(pulsar1).size(), 2);
- assertEquals(getAvailableBrokers(pulsar2).size(), 2);
+ Set<String> availableBrokers1 = getAvailableBrokers(pulsar1);
+ Set<String> availableBrokers2 = getAvailableBrokers(pulsar1);
+ log.info("Available brokers 1: {}", availableBrokers1);
+ log.info("Available brokers 2: {}", availableBrokers2);
+ assertEquals(availableBrokers1.size(), 2);
+ assertEquals(availableBrokers1.size(), 2);
});
// Verify: the topic on broker-1 will be unloaded.
// Verify: the topic on broker-2 is fine.
- Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
- CompletableFuture<Optional<Topic>> future =
pulsar1.getBrokerService().getTopic(topicName, false);
+ Awaitility.await().atMost(Duration.ofSeconds(180)).untilAsserted(() ->
{
+ CompletableFuture<Optional<Topic>> future1 =
pulsar1.getBrokerService().getTopic(topicName, false);
log.info("broker 1 topics {}",
pulsar1.getBrokerService().getTopics().keySet());
log.info("broker 2 topics {}",
pulsar2.getBrokerService().getTopics().keySet());
- log.info("broker 1 bundles {}",
pulsar1.getNamespaceService().getOwnershipCache().getOwnedBundles()
- .keySet().stream().map(k ->
k.getNamespaceObject().toString() + "/" + k.getBundleRange())
+ log.info("broker 1 bundles {}", getOwnedBundles(pulsar1).stream()
.filter(s ->
s.contains(defaultNamespace)).collect(Collectors.toList()));
- log.info("broker 2 bundles {}",
pulsar2.getNamespaceService().getOwnershipCache().getOwnedBundles()
- .keySet().stream().map(k ->
k.getNamespaceObject().toString() + "/" + k.getBundleRange())
+ log.info("broker 2 bundles {}", getOwnedBundles(pulsar1).stream()
.filter(s ->
s.contains(defaultNamespace)).collect(Collectors.toList()));
- log.info("future: {}, isDone: {}, isCompletedExceptionally: {}",
- future, future == null ? "null" : future.isDone(),
- future, future == null ? "null" :
future.isCompletedExceptionally());
- assertTrue(future == null
+ String lookup1 = admin1.lookups().lookupTopic(topicName);
+ String lookup2 = admin2.lookups().lookupTopic(topicName);
+ log.info("lookup 1: {}", lookup1);
+ log.info("lookup 2: {}", lookup2);
+ // Both responses from different brokers should be the same.
+ assertEquals(lookup1, lookup2, "both lookup result should be the
same");
+ // Except the system topic based load balancer, the topic should
be loaded up on the broker-2.
+ log.info("future 1: {}, isDone: {}, isCompletedExceptionally: {}",
+ future1, future1 == null ? "null" : future1.isDone(),
+ future1 == null ? "null" :
future1.isCompletedExceptionally());
+ boolean topicDoesNotExists1 = future1 == null
||
!pulsar1.getBrokerService().getTopics().containsKey(topicName)
- || (future.isDone() && !future.isCompletedExceptionally()
&& future.get().isEmpty())
- || future.isCompletedExceptionally());
+ || (future1.isDone() &&
!future1.isCompletedExceptionally() && future1.get().isEmpty())
+ || future1.isCompletedExceptionally();
+ if (loadManager.getSimpleName().equals("ModularLoadManagerImpl")
+ || TableViewType.MetadataStore.equals(tableViewType)) {
+ assertNotEquals(lookup1, brokerAddr1, "lookup result should be
broker-2");
+ assertNotEquals(lookup2, brokerAddr1, "lookup result should be
broker-2");
+ assertTrue(topicDoesNotExists1, "topic should be unloaded from
the broker-1");
+ List<String> bundle1 = getOwnedBundles(pulsar1).stream()
+ .filter(s ->
s.contains(defaultNamespace)).collect(Collectors.toList());
+ List<String> bundle2 = getOwnedBundles(pulsar2).stream()
+ .filter(s ->
s.contains(defaultNamespace)).collect(Collectors.toList());
+ log.info("broker 1 bundles the second time {}", bundle1);
+ log.info("broker 2 bundles the second time {}", bundle2);
+ assertEquals(bundle1.size(), 0);
+ assertEquals(bundle2.size(), 1);
+ if (TableViewType.MetadataStore.equals(tableViewType)) {
+
assertEquals(getMetadataStoreBasedInternalOwnedBundles(pulsar1,
brokerAddr1).stream()
+ .filter(s ->
s.contains(defaultNamespace)).collect(Collectors.toList()).size(), 0);
+
assertEquals(getMetadataStoreBasedInternalOwnedBundles(pulsar2,
lookup2).stream()
+ .filter(s ->
s.contains(defaultNamespace)).collect(Collectors.toList()).size(), 0);
+ }
+ }
+ // Regarding the system topic based load balancer, which does not
rely on ZK, the topic may be owned by any
+ // broker.
+ if (TableViewType.SystemTopic.equals(tableViewType)) {
+ // This implementation will be finished by a seperated PR.
+ }
});
Topic broker2Topic3 = pulsar2.getBrokerService().getTopic(topicName,
false).join().get();
assertNotNull(broker2Topic3);
@@ -183,6 +265,9 @@ public class ZkSessionExpireTest extends
NetworkErrorTestBase {
// Send messages continuously.
// Verify: p1.send will success(it will connect to broker-2).
// Verify: p2.send will success.
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(broker2Topic3.getProducers().size(), 2);
+ });
CompletableFuture<MessageId> broker1Send3 =
p1.sendAsync("broker1_msg3");
CompletableFuture<MessageId> broker2Send3 =
p2.sendAsync("broker2_msg3");
broker1Send3.join();
@@ -196,4 +281,19 @@ public class ZkSessionExpireTest extends
NetworkErrorTestBase {
p2.close();
admin2.topics().delete(topicName, false);
}
+
+ private Collection<String>
getMetadataStoreBasedInternalOwnedBundles(PulsarService pulsar, String
brokerUrl) {
+ Object loadManagerWrapper = pulsar.getLoadManager().get();
+ ExtensibleLoadManagerImpl loadManager =
WhiteboxImpl.getInternalState(loadManagerWrapper, "loadManager");
+ ServiceUnitStateChannel serviceUnitStateChannel =
loadManager.getServiceUnitStateChannel();
+ ServiceUnitStateTableView tableview =
WhiteboxImpl.getInternalState(serviceUnitStateChannel, "tableview");
+ MetadataStoreTableView<ServiceUnitStateData> tableviewInternal =
WhiteboxImpl.getInternalState(tableview, "tableview");
+ ConcurrentMap<String, ServiceUnitStateData> data =
WhiteboxImpl.getInternalState(tableviewInternal, "data");
+ return data.entrySet().stream().filter(e -> {
+ if (brokerUrl.equals(e.getValue().dstBroker()) &&
e.getValue().state().toString().equals("Owned")) {
+ return true;
+ }
+ return false;
+ }).map(e -> e.getKey()).collect(Collectors.toList());
+ }
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/tableview/impl/MetadataStoreTableViewImpl.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/tableview/impl/MetadataStoreTableViewImpl.java
index a2a5e4937a1..c06fbe3cc07 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/tableview/impl/MetadataStoreTableViewImpl.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/tableview/impl/MetadataStoreTableViewImpl.java
@@ -22,6 +22,7 @@ package org.apache.pulsar.metadata.tableview.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -29,16 +30,19 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
+import java.util.function.Consumer;
import java.util.function.Predicate;
import lombok.Builder;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.CacheGetResult;
@@ -48,12 +52,14 @@ import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreTableView;
import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
+import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
+import org.jspecify.annotations.Nullable;
@Slf4j
public class MetadataStoreTableViewImpl<T> implements
MetadataStoreTableView<T> {
private static final int FILL_TIMEOUT_IN_MILLIS = 300_000;
- private static final int MAX_CONCURRENT_METADATA_OPS_DURING_FILL = 50;
private static final long CACHE_REFRESH_FREQUENCY_IN_MILLIS = 600_000;
private final ConcurrentMap<String, T> data;
private final Map<String, T> immutableData;
@@ -64,8 +70,26 @@ public class MetadataStoreTableViewImpl<T> implements
MetadataStoreTableView<T>
private final BiPredicate<T, T> conflictResolver;
private final List<BiConsumer<String, T>> tailItemListeners;
private final List<BiConsumer<String, T>> existingItemListeners;
+ private final List<BiConsumer<String, T>> outdatedItemListeners;
+ private final boolean clearUntrustedData;
private final long timeoutInMillis;
private final String pathPrefix;
+ private final Consumer<Throwable> tableViewShutDownListener;
+
+ @Deprecated
+ @Builder
+ public MetadataStoreTableViewImpl(@NonNull Class<T> clazz,
+ @NonNull String name,
+ @NonNull MetadataStore store,
+ @NonNull String pathPrefix,
+ @NonNull BiPredicate<T, T>
conflictResolver,
+ Predicate<String> listenPathValidator,
+ List<BiConsumer<String, T>>
tailItemListeners,
+ List<BiConsumer<String, T>>
existingItemListeners,
+ long timeoutInMillis) {
+ this(clazz, name, store, pathPrefix, conflictResolver,
listenPathValidator, tailItemListeners,
+ existingItemListeners, null, false, timeoutInMillis, null);
+ }
/**
* Construct MetadataStoreTableViewImpl.
@@ -79,7 +103,20 @@ public class MetadataStoreTableViewImpl<T> implements
MetadataStoreTableView<T>
* @param tailItemListeners listener for tail item(recently updated)
notifications
* @param existingItemListeners listener for existing items in metadata
store
* @param timeoutInMillis timeout duration for each sync operation.
- * @throws MetadataStoreException if init fails.
+ * @param outdatedItemListeners Let's introduce how to ensure the correct
element values: 1. The table
+ * view will try its best to ensure that the data is always the
latest. When it cannot be guaranteed, see
+ * the next Article 2. If you get an old value, you will receive an
update event later, ultimately ensuring
+ * the accuracy of the data. 3. You will receive this notification
when the first two cannot be guaranteed
+ * due to the expiration of the metadata store session of the table
view。After that, you also received
+ * notifications {@param tailItemListeners} and {@param
existingItemListeners}, indicating that the table
+ * view can once again ensure that the first two can work properly.
+ * @param clearUntrustedData clear the items that have received the event
{@param itemStateOutdatedListeners}. Which
+ * means that rather that to get an untrusted value, to clear the
untrusted values. You will not get a
+ * {@param tailItemListeners} for this change, which changed the
value cached to null.
+ * @param tableViewShutDownListener When the table view still cannot
guarantee the real-time performance of the
+ * data after making its best efforts, you will receive this event.
It is recommended to restart the table
+ * view at this time. This event will happen after {@param
itemStateOutdatedListeners}, which mean it will be
+ * disabled if {@param itemStateOutdatedListeners} is null.
*/
@Builder
public MetadataStoreTableViewImpl(@NonNull Class<T> clazz,
@@ -90,7 +127,10 @@ public class MetadataStoreTableViewImpl<T> implements
MetadataStoreTableView<T>
Predicate<String> listenPathValidator,
List<BiConsumer<String, T>>
tailItemListeners,
List<BiConsumer<String, T>>
existingItemListeners,
- long timeoutInMillis) {
+ @Nullable List<BiConsumer<String, T>>
outdatedItemListeners,
+ boolean clearUntrustedData,
+ long timeoutInMillis,
+ @Nullable Consumer<Throwable>
tableViewShutDownListener) {
this.name = name;
this.data = new ConcurrentHashMap<>();
this.immutableData = Collections.unmodifiableMap(data);
@@ -105,8 +145,14 @@ public class MetadataStoreTableViewImpl<T> implements
MetadataStoreTableView<T>
if (existingItemListeners != null) {
this.existingItemListeners.addAll(existingItemListeners);
}
+ this.outdatedItemListeners = new ArrayList<>();
+ if (outdatedItemListeners != null) {
+ this.outdatedItemListeners.addAll(outdatedItemListeners);
+ }
+ this.clearUntrustedData = clearUntrustedData;
this.timeoutInMillis = timeoutInMillis;
this.store = store;
+ this.tableViewShutDownListener = tableViewShutDownListener;
this.cache = store.getMetadataCache(clazz,
MetadataCacheConfig.<T>builder()
.expireAfterWriteMillis(-1)
@@ -115,6 +161,12 @@ public class MetadataStoreTableViewImpl<T> implements
MetadataStoreTableView<T>
.asyncReloadConsumer(this::consumeAsyncReload)
.build());
store.registerListener(this::handleNotification);
+ if (store instanceof AbstractMetadataStore abstractMetadataStore) {
+
abstractMetadataStore.registerSessionListener(this::handleSessionEvent);
+ } else {
+ // Since ServiceUnitStateMetadataStoreTableViewImpl has checked
the configuration that named
+ // "zookeeperSessionExpiredPolicy", skip to print the duplicated
log here.
+ }
}
public void start() throws MetadataStoreException {
@@ -159,6 +211,52 @@ public class MetadataStoreTableViewImpl<T> implements
MetadataStoreTableView<T>
return updated.booleanValue();
}
+ public void handleSessionEvent(SessionEvent sessionEvent) {
+ if (CollectionUtils.isEmpty(outdatedItemListeners)) {
+ log.warn("{} Skipped handle metadata store session event {}
because does not set itemOutdatedListeners",
+ name, sessionEvent);
+ return;
+ }
+ if (sessionEvent == SessionEvent.SessionLost) {
+ Map<String, T> snapshot = new HashMap<>(data);
+ log.warn("{} clearing owned bundles because metadata store session
lost {}", name, snapshot);
+ for (Map.Entry<String, T> entry : snapshot.entrySet()) {
+ for (var listener : outdatedItemListeners) {
+ try {
+ listener.accept(entry.getKey(), entry.getValue());
+ if (clearUntrustedData) {
+ data.remove(entry.getKey());
+ }
+ } catch (Throwable e) {
+ if (tableViewShutDownListener == null) {
+ log.warn("{} failed to listen item whose state is
unknown because of metadata store"
+ + " session lost. key:{}, val:{}", name,
entry.getKey(), entry.getValue(), e);
+ } else {
+ log.warn("{} Shutdown table view, because failed
to listen item whose state is unknown due"
+ + " to metadata store session lost.
key:{}, val:{}", name, entry.getKey(),
+ entry.getValue(), e);
+ tableViewShutDownListener.accept(e);
+ }
+ break;
+ }
+ }
+ }
+ } else if (sessionEvent == SessionEvent.SessionReestablished) {
+ log.info("{} Refilling bundle owner list after metadata store
session reestablished", name);
+ // Since just get a session expired issue, we'd better to print
the initialize detail logs.
+ fillAsync(null, true).exceptionally(ex -> {
+ if (tableViewShutDownListener == null) {
+ log.warn("{} failed to fill existing items after session
reestablished", name, ex);
+ } else {
+ log.error("{} Shutdown table view because failed to fill
existing items after session"
+ + " reestablished", name, ex);
+ tableViewShutDownListener.accept(ex);
+ }
+ return null;
+ });
+ }
+ }
+
private void handleTailItem(String key, T val) {
if (updateData(key, val)) {
if (log.isDebugEnabled()) {
@@ -234,51 +332,63 @@ public class MetadataStoreTableViewImpl<T> implements
MetadataStoreTableView<T>
});
}
+ /**
+ * Note: this method only be called when a broker is starting, please call
{@link #fillAsync(AtomicLong, boolean)}
+ * for other use-case, otherwise, you may get a thread deadlock error.
+ */
private void fill() throws MetadataStoreException {
- final var deadline = System.currentTimeMillis() +
FILL_TIMEOUT_IN_MILLIS;
- log.info("{} start filling existing items under the pathPrefix:{}",
name, pathPrefix);
- ConcurrentLinkedDeque<String> q = new ConcurrentLinkedDeque<>();
- List<CompletableFuture<Void>> futures = new ArrayList<>();
- q.add(pathPrefix);
- LongAdder count = new LongAdder();
- while (!q.isEmpty()) {
- var now = System.currentTimeMillis();
- if (now >= deadline) {
- String err = name + " failed to fill existing items in "
- +
TimeUnit.MILLISECONDS.toSeconds(FILL_TIMEOUT_IN_MILLIS) + " secs. Filled count:"
- + count.sum();
- log.error(err);
- throw new MetadataStoreException(err);
- }
- int size = Math.min(MAX_CONCURRENT_METADATA_OPS_DURING_FILL,
q.size());
- for (int i = 0; i < size; i++) {
- String path = q.poll();
- futures.add(store.getChildren(path)
- .thenCompose(children -> {
- // The path is leaf
- if (children.isEmpty()) {
- count.increment();
- return handleExisting(path);
- } else {
- for (var child : children) {
- q.add(path + "/" + child);
- }
- return CompletableFuture.completedFuture(null);
- }
- }));
- }
- try {
- FutureUtil.waitForAll(futures).get(
- Math.min(timeoutInMillis, deadline - now),
- TimeUnit.MILLISECONDS);
- } catch (Throwable e) {
- Throwable c = FutureUtil.unwrapCompletionException(e);
- log.error("{} failed to fill existing items", name, c);
- throw new MetadataStoreException(c);
- }
- futures.clear();
+ AtomicLong loadedCounter = new AtomicLong();
+ long maxWaitTime = Math.min(timeoutInMillis, FILL_TIMEOUT_IN_MILLIS);
+ try {
+ fillAsync(loadedCounter, false).get(maxWaitTime,
TimeUnit.MILLISECONDS);
+ log.info("{} completed filling existing items with size:{}", name,
loadedCounter.get());
+ } catch (InterruptedException | ExecutionException | TimeoutException
e) {
+ String err = name + " failed to fill existing items in "
+ + TimeUnit.MILLISECONDS.toSeconds(maxWaitTime) + " secs.
Filled count:"
+ + loadedCounter.get();
+ log.error(err);
+ throw new MetadataStoreException(err,
FutureUtil.unwrapCompletionException(e));
}
- log.info("{} completed filling existing items with size:{}", name,
count.sum());
+ }
+
+ private CompletableFuture<Void> handleExistingLeafs(String rootPath,
String path, @Nullable AtomicLong count,
+ boolean printDetails) {
+ return store.getChildren(path).thenCompose(children -> {
+ if (children.isEmpty()) {
+ // Skip root path.
+ if (rootPath.equals(path)) {
+ return CompletableFuture.completedFuture(null);
+ }
+ // Leaf node.
+ if (count != null) {
+ count.incrementAndGet();
+ }
+ if (printDetails) {
+ log.info("handling exist leaf {}", path);
+ }
+ return handleExisting(path);
+ } else {
+ // Dir node.
+ // The limitation of max quantity of concurrency reading is
not needed, because the metadata store
+ // client works in a single thread, and all OPs will be
packaged into a batch, the more the faster.
+ // If metadata store receives too many requests, it will split
them into multi requests itself.
+ if (printDetails) {
+ log.info("handling exist dir {}", path);
+ }
+ List<CompletableFuture<Void>> futureList = new ArrayList<>();
+ for (var child : children) {
+ futureList.add(handleExistingLeafs(rootPath, path + "/" +
child, count, printDetails));
+ }
+ return FutureUtil.waitForAll(futureList);
+ }
+ });
+ }
+
+ /**
+ * @param loadedCounter a metric value, to know how many value has been
loaded, which is an internal value.
+ */
+ private CompletableFuture<Void> fillAsync(@Nullable AtomicLong
loadedCounter, boolean printDetails) {
+ return handleExistingLeafs(pathPrefix, pathPrefix, loadedCounter,
printDetails);
}