Technoboy- commented on code in PR #21117:
URL: https://github.com/apache/pulsar/pull/21117#discussion_r1315485479


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/policies/TableViewTopicPolicyService.java:
##########
@@ -0,0 +1,361 @@
+package org.apache.pulsar.broker.policies;
+
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.TopicPoliciesService;
+import org.apache.pulsar.broker.service.TopicPolicyListener;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TableView;
+import org.apache.pulsar.common.events.ActionType;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.events.PulsarEvent;
+import org.apache.pulsar.common.events.TopicPoliciesEvent;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Supplier;
+
+@Slf4j
+public class TableViewTopicPolicyService implements TopicPoliciesService {
+    private final String clusterName;
+    private final NamespaceService namespaceService;
+    private final Map<NamespaceName, 
CompletableFuture<TableView<PulsarEvent>>> views;
+    private final Map<NamespaceName, CompletableFuture<Producer<PulsarEvent>>> 
writers;
+    private final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> 
listeners;
+    private final Supplier<PulsarClient> clientSupplier;
+
+    public TableViewTopicPolicyService(@Nonnull PulsarService pulsarService) {
+        requireNonNull(pulsarService);
+        this.clientSupplier = () -> {
+            try {
+                return pulsarService.getClient();
+            } catch (Throwable ex) {
+                throw new RuntimeException(ex);
+            }
+        };
+        this.namespaceService = pulsarService.getNamespaceService();
+        this.views = new ConcurrentHashMap<>();
+        this.writers = new ConcurrentHashMap<>();
+        this.listeners = new ConcurrentHashMap<>();
+        this.clusterName = pulsarService.getConfiguration().getClusterName();
+    }
+
+    @Override
+    public @Nonnull CompletableFuture<Void> deleteTopicPoliciesAsync(@Nonnull 
TopicName topicName) {
+        return updateTopicPoliciesAsync(topicName, null);
+    }
+
+    @Override
+    public @Nonnull CompletableFuture<Void> updateTopicPoliciesAsync(@Nonnull 
TopicName topicName,
+                                                                     @Nullable 
TopicPolicies policies) {
+        if (topicName == null) {
+            return failedFuture(new NullPointerException());
+        }
+        final var ns = topicName.getNamespaceObject();
+        if (NamespaceService.isHeartbeatNamespace(ns) || 
SystemTopicNames.isSystemTopic(topicName)) {
+            return completedFuture(null);
+        }
+        final CompletableFuture<Void> updateFuture = getOrInitWriterAsync(ns)
+                .thenCompose(writer -> {
+                    final var key = topicName.getPartitionedTopicName();
+                    if (policies == null) {
+                        return 
writer.newMessage().key(key).value(null).sendAsync();
+                    }
+                    final var builder = PulsarEvent.builder();
+                    if (!policies.isGlobalPolicies()) {
+                        // we don't need to replicate local policies to remote 
cluster, so set `replicateTo` to empty.
+                        builder.replicateTo(Sets.newHashSet(clusterName));
+                    }
+                    final var event = builder
+                            .actionType(ActionType.UPDATE)
+                            .eventType(EventType.TOPIC_POLICY)
+                            .topicPoliciesEvent(
+                                    TopicPoliciesEvent.builder()
+                                            
.domain(topicName.getDomain().toString())
+                                            .tenant(topicName.getTenant())
+                                            
.namespace(topicName.getNamespaceObject().getLocalName())
+                                            
.topic(TopicName.get(topicName.getPartitionedTopicName()).getLocalName())
+                                            .policies(policies)
+                                            .build())
+                            .build();
+                    return 
writer.newMessage().key(key).value(event).sendAsync();
+                }).thenApply(__ -> null);
+        updateFuture.exceptionally(ex -> {
+            // auto-recovery
+            cleanupWriter(ns).exceptionally(innerEx -> {
+                log.warn("Exception occur while trying auto-cleanup bad status 
writer. namespace: {}", ns, ex);
+                return null;
+            });
+            return null;
+        });
+        return updateFuture;
+    }
+
+    @Override
+    public @Nullable TopicPolicies getTopicPolicies(@Nonnull TopicName 
topicName)
+            throws BrokerServiceException.TopicPoliciesCacheNotInitException {
+        requireNonNull(topicName);
+        return getTopicPolicies(topicName, false);
+    }
+
+    @Override
+    public @Nullable TopicPolicies getTopicPoliciesIfExists(@Nonnull TopicName 
topicName) {
+        requireNonNull(topicName);
+        final var policiesFuture = getTopicPoliciesAsync(topicName, false);
+        if (!policiesFuture.isDone() || 
policiesFuture.isCompletedExceptionally()) {
+            return null;
+        }
+        return policiesFuture.join().orElse(null);
+    }
+
+    @Override
+    public @Nullable TopicPolicies getTopicPolicies(@Nonnull TopicName 
topicName, boolean isGlobal)
+            throws BrokerServiceException.TopicPoliciesCacheNotInitException {
+        final var policiesFuture = getTopicPoliciesAsync(topicName, isGlobal);
+        // using retry to implement async-like logic
+        if (!policiesFuture.isDone() || 
policiesFuture.isCompletedExceptionally()) {
+            throw new 
BrokerServiceException.TopicPoliciesCacheNotInitException();
+        }
+        return policiesFuture.join().orElse(null);
+    }
+    @Override
+    public CompletableFuture<TopicPolicies> 
getTopicPoliciesBypassCacheAsync(@Nonnull TopicName topicName) {
+        if (topicName == null) {
+            return failedFuture(new NullPointerException());
+        }
+        return getTopicPoliciesAsync(topicName, false)
+                .thenApply(topicPolicies -> topicPolicies.orElse(null));
+    }
+
+    @Override
+    public @Nonnull CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsync(@Nonnull TopicName topicName,
+                                                                               
      boolean isGlobal) {
+        final var ns = topicName.getNamespaceObject();
+        if (NamespaceService.isHeartbeatNamespace(ns) || 
SystemTopicNames.isSystemTopic(topicName)) {
+            return completedFuture(Optional.empty());
+        }
+        final var viewFuture = getOrInitViewAsync(ns);
+        final CompletableFuture<Optional<TopicPolicies>> policiesFuture = 
viewFuture.thenApply(view -> {
+            final var event = view.get(topicName.getPartitionedTopicName());
+            if (event == null || event.getEventType() != 
EventType.TOPIC_POLICY) {
+                return Optional.empty();
+            }
+            final var topicPoliciesEvent = event.getTopicPoliciesEvent();
+            final TopicPolicies policies;
+            if (topicPoliciesEvent == null || (policies = 
topicPoliciesEvent.getPolicies()) == null) {
+                return Optional.empty();
+            }
+            if (policies.isGlobalPolicies() != isGlobal) {
+                return Optional.empty();
+            }
+            return Optional.of(policies);
+        });
+        policiesFuture.exceptionally(ex -> {
+            cleanupView(ns).exceptionally(innerEx -> {
+                log.warn("Exception occur while trying auto-cleanup bad status 
view. namespace: {}", ns, ex);
+                return null;
+            });
+            return Optional.empty();
+        });
+        return policiesFuture;
+    }
+
+    @Override
+    public @Nonnull CompletableFuture<Void> 
addOwnedNamespaceBundleAsync(@Nonnull NamespaceBundle bundle) {
+        if (bundle == null) {
+            return failedFuture(new NullPointerException());
+        }
+        final var ns = bundle.getNamespaceObject();
+
+        if (NamespaceService.isHeartbeatNamespace(ns)) {
+            return completedFuture(null);
+        }
+        return getOrInitViewAsync(ns).thenApply(__ -> null);
+    }
+
+    @Override
+    public @Nonnull CompletableFuture<Void> 
removeOwnedNamespaceBundleAsync(@Nonnull NamespaceBundle bundle) {
+        final var ns = bundle.getNamespaceObject();
+        return namespaceService.getOwnedBundles(ns)
+                .thenCompose(ownedBundles -> {
+                    if (!ownedBundles.isEmpty()) {
+                        // don't need to clean up view
+                        return completedFuture(null);
+                    }
+                    final List<CompletableFuture<Void>> container = new 
ArrayList<>(2);
+                    container.add(cleanupView(ns));
+                    container.add(cleanupWriter(ns));
+                    return CompletableFuture.allOf(container.toArray(new 
CompletableFuture[]{}));
+                });
+    }
+
+    @Override
+    public void start() {
+        namespaceService.addNamespaceBundleOwnershipListener(
+                new NamespaceBundleOwnershipListener() {
+                    @Override
+                    public void onLoad(NamespaceBundle bundle) {
+                        // switch thread to avoid deadlock causing topic load 
timeout
+                        supplyAsync(() -> null)

Review Comment:
   better to use a specified executor, otherwise it may start many threads 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to