Technoboy- commented on code in PR #21117:
URL: https://github.com/apache/pulsar/pull/21117#discussion_r1315485479
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/policies/TableViewTopicPolicyService.java:
##########
@@ -0,0 +1,361 @@
+package org.apache.pulsar.broker.policies;
+
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.TopicPoliciesService;
+import org.apache.pulsar.broker.service.TopicPolicyListener;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TableView;
+import org.apache.pulsar.common.events.ActionType;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.events.PulsarEvent;
+import org.apache.pulsar.common.events.TopicPoliciesEvent;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Supplier;
+
+@Slf4j
+public class TableViewTopicPolicyService implements TopicPoliciesService {
+ private final String clusterName;
+ private final NamespaceService namespaceService;
+ private final Map<NamespaceName,
CompletableFuture<TableView<PulsarEvent>>> views;
+ private final Map<NamespaceName, CompletableFuture<Producer<PulsarEvent>>>
writers;
+ private final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>>
listeners;
+ private final Supplier<PulsarClient> clientSupplier;
+
+ public TableViewTopicPolicyService(@Nonnull PulsarService pulsarService) {
+ requireNonNull(pulsarService);
+ this.clientSupplier = () -> {
+ try {
+ return pulsarService.getClient();
+ } catch (Throwable ex) {
+ throw new RuntimeException(ex);
+ }
+ };
+ this.namespaceService = pulsarService.getNamespaceService();
+ this.views = new ConcurrentHashMap<>();
+ this.writers = new ConcurrentHashMap<>();
+ this.listeners = new ConcurrentHashMap<>();
+ this.clusterName = pulsarService.getConfiguration().getClusterName();
+ }
+
+ @Override
+ public @Nonnull CompletableFuture<Void> deleteTopicPoliciesAsync(@Nonnull
TopicName topicName) {
+ return updateTopicPoliciesAsync(topicName, null);
+ }
+
+ @Override
+ public @Nonnull CompletableFuture<Void> updateTopicPoliciesAsync(@Nonnull
TopicName topicName,
+ @Nullable
TopicPolicies policies) {
+ if (topicName == null) {
+ return failedFuture(new NullPointerException());
+ }
+ final var ns = topicName.getNamespaceObject();
+ if (NamespaceService.isHeartbeatNamespace(ns) ||
SystemTopicNames.isSystemTopic(topicName)) {
+ return completedFuture(null);
+ }
+ final CompletableFuture<Void> updateFuture = getOrInitWriterAsync(ns)
+ .thenCompose(writer -> {
+ final var key = topicName.getPartitionedTopicName();
+ if (policies == null) {
+ return
writer.newMessage().key(key).value(null).sendAsync();
+ }
+ final var builder = PulsarEvent.builder();
+ if (!policies.isGlobalPolicies()) {
+ // we don't need to replicate local policies to remote
cluster, so set `replicateTo` to empty.
+ builder.replicateTo(Sets.newHashSet(clusterName));
+ }
+ final var event = builder
+ .actionType(ActionType.UPDATE)
+ .eventType(EventType.TOPIC_POLICY)
+ .topicPoliciesEvent(
+ TopicPoliciesEvent.builder()
+
.domain(topicName.getDomain().toString())
+ .tenant(topicName.getTenant())
+
.namespace(topicName.getNamespaceObject().getLocalName())
+
.topic(TopicName.get(topicName.getPartitionedTopicName()).getLocalName())
+ .policies(policies)
+ .build())
+ .build();
+ return
writer.newMessage().key(key).value(event).sendAsync();
+ }).thenApply(__ -> null);
+ updateFuture.exceptionally(ex -> {
+ // auto-recovery
+ cleanupWriter(ns).exceptionally(innerEx -> {
+ log.warn("Exception occur while trying auto-cleanup bad status
writer. namespace: {}", ns, ex);
+ return null;
+ });
+ return null;
+ });
+ return updateFuture;
+ }
+
+ @Override
+ public @Nullable TopicPolicies getTopicPolicies(@Nonnull TopicName
topicName)
+ throws BrokerServiceException.TopicPoliciesCacheNotInitException {
+ requireNonNull(topicName);
+ return getTopicPolicies(topicName, false);
+ }
+
+ @Override
+ public @Nullable TopicPolicies getTopicPoliciesIfExists(@Nonnull TopicName
topicName) {
+ requireNonNull(topicName);
+ final var policiesFuture = getTopicPoliciesAsync(topicName, false);
+ if (!policiesFuture.isDone() ||
policiesFuture.isCompletedExceptionally()) {
+ return null;
+ }
+ return policiesFuture.join().orElse(null);
+ }
+
+ @Override
+ public @Nullable TopicPolicies getTopicPolicies(@Nonnull TopicName
topicName, boolean isGlobal)
+ throws BrokerServiceException.TopicPoliciesCacheNotInitException {
+ final var policiesFuture = getTopicPoliciesAsync(topicName, isGlobal);
+ // using retry to implement async-like logic
+ if (!policiesFuture.isDone() ||
policiesFuture.isCompletedExceptionally()) {
+ throw new
BrokerServiceException.TopicPoliciesCacheNotInitException();
+ }
+ return policiesFuture.join().orElse(null);
+ }
+ @Override
+ public CompletableFuture<TopicPolicies>
getTopicPoliciesBypassCacheAsync(@Nonnull TopicName topicName) {
+ if (topicName == null) {
+ return failedFuture(new NullPointerException());
+ }
+ return getTopicPoliciesAsync(topicName, false)
+ .thenApply(topicPolicies -> topicPolicies.orElse(null));
+ }
+
+ @Override
+ public @Nonnull CompletableFuture<Optional<TopicPolicies>>
getTopicPoliciesAsync(@Nonnull TopicName topicName,
+
boolean isGlobal) {
+ final var ns = topicName.getNamespaceObject();
+ if (NamespaceService.isHeartbeatNamespace(ns) ||
SystemTopicNames.isSystemTopic(topicName)) {
+ return completedFuture(Optional.empty());
+ }
+ final var viewFuture = getOrInitViewAsync(ns);
+ final CompletableFuture<Optional<TopicPolicies>> policiesFuture =
viewFuture.thenApply(view -> {
+ final var event = view.get(topicName.getPartitionedTopicName());
+ if (event == null || event.getEventType() !=
EventType.TOPIC_POLICY) {
+ return Optional.empty();
+ }
+ final var topicPoliciesEvent = event.getTopicPoliciesEvent();
+ final TopicPolicies policies;
+ if (topicPoliciesEvent == null || (policies =
topicPoliciesEvent.getPolicies()) == null) {
+ return Optional.empty();
+ }
+ if (policies.isGlobalPolicies() != isGlobal) {
+ return Optional.empty();
+ }
+ return Optional.of(policies);
+ });
+ policiesFuture.exceptionally(ex -> {
+ cleanupView(ns).exceptionally(innerEx -> {
+ log.warn("Exception occur while trying auto-cleanup bad status
view. namespace: {}", ns, ex);
+ return null;
+ });
+ return Optional.empty();
+ });
+ return policiesFuture;
+ }
+
+ @Override
+ public @Nonnull CompletableFuture<Void>
addOwnedNamespaceBundleAsync(@Nonnull NamespaceBundle bundle) {
+ if (bundle == null) {
+ return failedFuture(new NullPointerException());
+ }
+ final var ns = bundle.getNamespaceObject();
+
+ if (NamespaceService.isHeartbeatNamespace(ns)) {
+ return completedFuture(null);
+ }
+ return getOrInitViewAsync(ns).thenApply(__ -> null);
+ }
+
+ @Override
+ public @Nonnull CompletableFuture<Void>
removeOwnedNamespaceBundleAsync(@Nonnull NamespaceBundle bundle) {
+ final var ns = bundle.getNamespaceObject();
+ return namespaceService.getOwnedBundles(ns)
+ .thenCompose(ownedBundles -> {
+ if (!ownedBundles.isEmpty()) {
+ // don't need to clean up view
+ return completedFuture(null);
+ }
+ final List<CompletableFuture<Void>> container = new
ArrayList<>(2);
+ container.add(cleanupView(ns));
+ container.add(cleanupWriter(ns));
+ return CompletableFuture.allOf(container.toArray(new
CompletableFuture[]{}));
+ });
+ }
+
+ @Override
+ public void start() {
+ namespaceService.addNamespaceBundleOwnershipListener(
+ new NamespaceBundleOwnershipListener() {
+ @Override
+ public void onLoad(NamespaceBundle bundle) {
+ // switch thread to avoid deadlock causing topic load
timeout
+ supplyAsync(() -> null)
Review Comment:
better to use a specified executor, otherwise it may start many threads
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]