Technoboy- commented on code in PR #21117:
URL: https://github.com/apache/pulsar/pull/21117#discussion_r1315479064
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/policies/TableViewTopicPolicyService.java:
##########
@@ -0,0 +1,361 @@
+package org.apache.pulsar.broker.policies;
+
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.TopicPoliciesService;
+import org.apache.pulsar.broker.service.TopicPolicyListener;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TableView;
+import org.apache.pulsar.common.events.ActionType;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.events.PulsarEvent;
+import org.apache.pulsar.common.events.TopicPoliciesEvent;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Supplier;
+
+@Slf4j
+public class TableViewTopicPolicyService implements TopicPoliciesService {
+ private final String clusterName;
+ private final NamespaceService namespaceService;
+ private final Map<NamespaceName,
CompletableFuture<TableView<PulsarEvent>>> views;
+ private final Map<NamespaceName, CompletableFuture<Producer<PulsarEvent>>>
writers;
+ private final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>>
listeners;
+ private final Supplier<PulsarClient> clientSupplier;
+
+ public TableViewTopicPolicyService(@Nonnull PulsarService pulsarService) {
+ requireNonNull(pulsarService);
+ this.clientSupplier = () -> {
+ try {
+ return pulsarService.getClient();
+ } catch (Throwable ex) {
+ throw new RuntimeException(ex);
+ }
+ };
+ this.namespaceService = pulsarService.getNamespaceService();
+ this.views = new ConcurrentHashMap<>();
+ this.writers = new ConcurrentHashMap<>();
+ this.listeners = new ConcurrentHashMap<>();
+ this.clusterName = pulsarService.getConfiguration().getClusterName();
+ }
+
+ @Override
+ public @Nonnull CompletableFuture<Void> deleteTopicPoliciesAsync(@Nonnull
TopicName topicName) {
+ return updateTopicPoliciesAsync(topicName, null);
+ }
+
+ @Override
+ public @Nonnull CompletableFuture<Void> updateTopicPoliciesAsync(@Nonnull
TopicName topicName,
+ @Nullable
TopicPolicies policies) {
+ if (topicName == null) {
+ return failedFuture(new NullPointerException());
+ }
+ final var ns = topicName.getNamespaceObject();
+ if (NamespaceService.isHeartbeatNamespace(ns) ||
SystemTopicNames.isSystemTopic(topicName)) {
+ return completedFuture(null);
+ }
+ final CompletableFuture<Void> updateFuture = getOrInitWriterAsync(ns)
+ .thenCompose(writer -> {
+ final var key = topicName.getPartitionedTopicName();
+ if (policies == null) {
+ return
writer.newMessage().key(key).value(null).sendAsync();
+ }
+ final var builder = PulsarEvent.builder();
+ if (!policies.isGlobalPolicies()) {
+ // we don't need to replicate local policies to remote
cluster, so set `replicateTo` to empty.
+ builder.replicateTo(Sets.newHashSet(clusterName));
+ }
+ final var event = builder
+ .actionType(ActionType.UPDATE)
+ .eventType(EventType.TOPIC_POLICY)
+ .topicPoliciesEvent(
+ TopicPoliciesEvent.builder()
+
.domain(topicName.getDomain().toString())
+ .tenant(topicName.getTenant())
+
.namespace(topicName.getNamespaceObject().getLocalName())
+
.topic(TopicName.get(topicName.getPartitionedTopicName()).getLocalName())
+ .policies(policies)
+ .build())
+ .build();
+ return
writer.newMessage().key(key).value(event).sendAsync();
+ }).thenApply(__ -> null);
+ updateFuture.exceptionally(ex -> {
+ // auto-recovery
+ cleanupWriter(ns).exceptionally(innerEx -> {
+ log.warn("Exception occur while trying auto-cleanup bad status
writer. namespace: {}", ns, ex);
+ return null;
+ });
+ return null;
+ });
+ return updateFuture;
+ }
+
+ @Override
+ public @Nullable TopicPolicies getTopicPolicies(@Nonnull TopicName
topicName)
+ throws BrokerServiceException.TopicPoliciesCacheNotInitException {
+ requireNonNull(topicName);
+ return getTopicPolicies(topicName, false);
+ }
+
+ @Override
+ public @Nullable TopicPolicies getTopicPoliciesIfExists(@Nonnull TopicName
topicName) {
+ requireNonNull(topicName);
+ final var policiesFuture = getTopicPoliciesAsync(topicName, false);
+ if (!policiesFuture.isDone() ||
policiesFuture.isCompletedExceptionally()) {
+ return null;
+ }
+ return policiesFuture.join().orElse(null);
+ }
+
+ @Override
+ public @Nullable TopicPolicies getTopicPolicies(@Nonnull TopicName
topicName, boolean isGlobal)
+ throws BrokerServiceException.TopicPoliciesCacheNotInitException {
+ final var policiesFuture = getTopicPoliciesAsync(topicName, isGlobal);
+ // using retry to implement async-like logic
+ if (!policiesFuture.isDone() ||
policiesFuture.isCompletedExceptionally()) {
+ throw new
BrokerServiceException.TopicPoliciesCacheNotInitException();
+ }
+ return policiesFuture.join().orElse(null);
+ }
+ @Override
+ public CompletableFuture<TopicPolicies>
getTopicPoliciesBypassCacheAsync(@Nonnull TopicName topicName) {
+ if (topicName == null) {
+ return failedFuture(new NullPointerException());
+ }
+ return getTopicPoliciesAsync(topicName, false)
+ .thenApply(topicPolicies -> topicPolicies.orElse(null));
+ }
+
+ @Override
+ public @Nonnull CompletableFuture<Optional<TopicPolicies>>
getTopicPoliciesAsync(@Nonnull TopicName topicName,
+
boolean isGlobal) {
+ final var ns = topicName.getNamespaceObject();
+ if (NamespaceService.isHeartbeatNamespace(ns) ||
SystemTopicNames.isSystemTopic(topicName)) {
+ return completedFuture(Optional.empty());
+ }
+ final var viewFuture = getOrInitViewAsync(ns);
+ final CompletableFuture<Optional<TopicPolicies>> policiesFuture =
viewFuture.thenApply(view -> {
+ final var event = view.get(topicName.getPartitionedTopicName());
+ if (event == null || event.getEventType() !=
EventType.TOPIC_POLICY) {
+ return Optional.empty();
+ }
+ final var topicPoliciesEvent = event.getTopicPoliciesEvent();
+ final TopicPolicies policies;
+ if (topicPoliciesEvent == null || (policies =
topicPoliciesEvent.getPolicies()) == null) {
+ return Optional.empty();
+ }
+ if (policies.isGlobalPolicies() != isGlobal) {
+ return Optional.empty();
+ }
+ return Optional.of(policies);
+ });
+ policiesFuture.exceptionally(ex -> {
+ cleanupView(ns).exceptionally(innerEx -> {
+ log.warn("Exception occur while trying auto-cleanup bad status
view. namespace: {}", ns, ex);
+ return null;
+ });
+ return Optional.empty();
+ });
+ return policiesFuture;
+ }
+
+ @Override
+ public @Nonnull CompletableFuture<Void>
addOwnedNamespaceBundleAsync(@Nonnull NamespaceBundle bundle) {
+ if (bundle == null) {
+ return failedFuture(new NullPointerException());
+ }
+ final var ns = bundle.getNamespaceObject();
+
+ if (NamespaceService.isHeartbeatNamespace(ns)) {
+ return completedFuture(null);
+ }
+ return getOrInitViewAsync(ns).thenApply(__ -> null);
+ }
+
+ @Override
+ public @Nonnull CompletableFuture<Void>
removeOwnedNamespaceBundleAsync(@Nonnull NamespaceBundle bundle) {
+ final var ns = bundle.getNamespaceObject();
+ return namespaceService.getOwnedBundles(ns)
+ .thenCompose(ownedBundles -> {
+ if (!ownedBundles.isEmpty()) {
+ // don't need to clean up view
+ return completedFuture(null);
+ }
+ final List<CompletableFuture<Void>> container = new
ArrayList<>(2);
+ container.add(cleanupView(ns));
+ container.add(cleanupWriter(ns));
+ return CompletableFuture.allOf(container.toArray(new
CompletableFuture[]{}));
+ });
+ }
+
+ @Override
+ public void start() {
+ namespaceService.addNamespaceBundleOwnershipListener(
+ new NamespaceBundleOwnershipListener() {
+ @Override
+ public void onLoad(NamespaceBundle bundle) {
+ // switch thread to avoid deadlock causing topic load
timeout
+ supplyAsync(() -> null)
+ .thenCompose(__ ->
addOwnedNamespaceBundleAsync(bundle))
+ .exceptionally(ex -> {
+ log.warn("Exception occur while trying
init "
+ + "topic policies by namespace
onload event. namespace: {}",
+ bundle.getNamespaceObject(), ex);
+ return null;
+ });
+
+ }
+
+ @Override
+ public void unLoad(NamespaceBundle bundle) {
+ // switch thread to avoid deadlock causing topic load
timeout
+ supplyAsync(() -> null)
+ .thenCompose(__ ->
removeOwnedNamespaceBundleAsync(bundle))
+ .exceptionally(ex -> {
+ log.warn("Exception occur while trying
cleanup "
+ + "topic policies by namespace
unloading event. namespace: {}",
+ bundle.getNamespaceObject(), ex);
+ return null;
+ });
+ }
+
+ @Override
+ public boolean test(NamespaceBundle namespaceBundle) {
+ return true;
+ }
+ });
+ }
+
+ @Override
+ public void registerListener(@Nonnull TopicName topicName, @Nonnull
TopicPolicyListener<TopicPolicies> listener) {
+ requireNonNull(topicName);
+ requireNonNull(listener);
+ listeners.computeIfAbsent(topicName, (tp) -> new
CopyOnWriteArrayList<>()).add(listener);
+ }
+
+ @Override
+ public void unregisterListener(@Nonnull TopicName topicName, @Nonnull
TopicPolicyListener<TopicPolicies> listener) {
+ requireNonNull(topicName);
+ requireNonNull(listener);
+ listeners.computeIfPresent(topicName, (tp, listeners) -> {
+ listeners.remove(listener);
+ if (listeners.isEmpty()) { // cleanup the listeners
+ return null;
+ }
+ return listeners;
+ });
+ }
+
+ private void listenerProcessor(@Nonnull String key, @Nullable PulsarEvent
event) {
+ // Note: Table view will call this method internally, If there are
performance problem, we can try
+ // moving the listener out of table view then use multi thread
to invoke callback.
+ try {
+ if (event == null) {
+ return;
+ }
+ if (event.getEventType() != EventType.TOPIC_POLICY) {
+ return;
+ }
+ if (event.getTopicPoliciesEvent() == null) {
+ return;
+ }
+ final var partitionParentTopicName = TopicName.get(key);
+ var listenerList = listeners.get(partitionParentTopicName);
+ if (listenerList == null || listenerList.isEmpty()) {
+ return;
+ }
+ listenerList.forEach(listener -> {
+ try {
+
listener.onUpdate(event.getTopicPoliciesEvent().getPolicies());
+ } catch (Throwable ex) {
+ // avoid listener affect all of listeners
+ log.warn("Error occur while trying callback listener.
event_key: {},"
+ + " event: {} listener :{}", key, event,
listener.toString());
+ }
+ });
+ } catch (Throwable ex) {
+ // avoid listener affect broker
+ log.warn("Error occur while trying callback listener. event_key:
{}, event: {}",
+ key, event);
+ }
+ }
+
+ private @Nonnull CompletableFuture<TableView<PulsarEvent>>
getOrInitViewAsync(@Nonnull NamespaceName ns) {
+ return views.computeIfAbsent(ns, (namespace) ->
+
clientSupplier.get().newTableView(Schema.AVRO(PulsarEvent.class))
+ .topic(getEventTopic(namespace))
+ .createAsync()
+ .thenApply(view -> {
+ view.forEachAndListen(this::listenerProcessor);
+ return view;
+ }));
+ }
+
+ private @Nonnull CompletableFuture<Producer<PulsarEvent>>
getOrInitWriterAsync(@Nonnull NamespaceName ns) {
+ return writers.computeIfAbsent(ns, (namespace) ->
+
clientSupplier.get().newProducer(Schema.AVRO(PulsarEvent.class))
+ .topic(getEventTopic(namespace))
+ .enableBatching(false)
+ .createAsync());
+ }
+
+ private @Nonnull CompletableFuture<Void> cleanupView(@Nonnull
NamespaceName ns) {
Review Comment:
why need `@Nonnull` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]