This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 11648aa Notify topic of ledger when compaction finishes (#1188) 11648aa is described below commit 11648aa65e2393874c94e69d5733846bbc9f22fa Author: Ivan Kelly <iv...@apache.org> AuthorDate: Thu Feb 8 02:31:41 2018 +0000 Notify topic of ledger when compaction finishes (#1188) When the compactor finishes compacting a topic, it send a cumulative acknowledgement to the broker, containing the ledger ID that the topic was compacted into. Hitherto the broker has done nothing with this acknowledgement. This patch introduces the compacted topic interface, and, for now, a dummy implementation of this interface. It also introduces a specialization of PersistentSubscription which is created for the compactor's subscription. When this subscription receives a cumulative acknowledgement, it extracts the ledger id for the acknowledgement topics, and notifies the compacted topic interface of this newly available compacted ledger. For now, the compacted topic implementation does nothing with it. This will be in a later patch. --- .../service/persistent/CompactorSubscription.java | 122 +++++++++++++++++++++ .../service/persistent/PersistentSubscription.java | 10 +- .../broker/service/persistent/PersistentTopic.java | 20 +++- .../apache/pulsar/compaction/CompactedTopic.java | 25 +++++ .../pulsar/compaction/CompactedTopicImpl.java | 27 +++++ .../org/apache/pulsar/compaction/Compactor.java | 4 +- .../pulsar/broker/service/PersistentTopicTest.java | 45 ++++++++ 7 files changed, 244 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java new file mode 100644 index 0000000..dae7c4f --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import static com.google.common.base.Preconditions.checkArgument; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; +import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; +import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; +import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; +import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentFindCursorPositionException; +import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; +import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; +import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; +import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException; +import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.apache.pulsar.compaction.CompactedTopic; +import org.apache.pulsar.compaction.Compactor; +import org.apache.pulsar.utils.CopyOnWriteArrayList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.MoreObjects; + +public class CompactorSubscription extends PersistentSubscription { + private CompactedTopic compactedTopic; + + public CompactorSubscription(PersistentTopic topic, CompactedTopic compactedTopic, + String subscriptionName, ManagedCursor cursor) { + super(topic, subscriptionName, cursor); + checkArgument(subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION)); + this.compactedTopic = compactedTopic; + + Map<String, Long> properties = cursor.getProperties(); + if (properties.containsKey(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY)) { + long compactedLedgerId = properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY); + compactedTopic.newCompactedLedger(cursor.getMarkDeletedPosition(), + compactedLedgerId); + } + } + + @Override + public void acknowledgeMessage(PositionImpl position, AckType ackType, Map<String,Long> properties) { + checkArgument(ackType == AckType.Cumulative); + checkArgument(properties.containsKey(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY)); + long compactedLedgerId = properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY); + + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Cumulative ack on compactor subscription {}", topicName, subName, position); + } + CompletableFuture<Void> future = new CompletableFuture<>(); + cursor.asyncMarkDelete(position, properties, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Mark deleted messages until position on compactor subscription {}", + topicName, subName, position); + } + future.complete(null); + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + // TODO: cut consumer connection on markDeleteFailed + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Failed to mark delete for position on compactor subscription {}", + topicName, subName, ctx, exception); + } + } + }, null); + + if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog() == 0) { + // Notify all consumer that the end of topic was reached + dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic); + } + + // Once properties have been persisted, we can notify the compacted topic to use + // the new ledger + future.thenAccept((v) -> compactedTopic.newCompactedLedger(position, compactedLedgerId)); + } + + private static final Logger log = LoggerFactory.getLogger(CompactorSubscription.class); +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 823daa2..e742f14 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -59,11 +59,11 @@ import org.slf4j.LoggerFactory; import com.google.common.base.MoreObjects; public class PersistentSubscription implements Subscription { - private final PersistentTopic topic; - private final ManagedCursor cursor; - private volatile Dispatcher dispatcher; - private final String topicName; - private final String subName; + protected final PersistentTopic topic; + protected final ManagedCursor cursor; + protected volatile Dispatcher dispatcher; + protected final String topicName; + protected final String subName; private static final int FALSE = 0; private static final int TRUE = 1; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2945059..57d1d75 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service.persistent; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; @@ -95,6 +96,9 @@ import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; +import org.apache.pulsar.compaction.CompactedTopic; +import org.apache.pulsar.compaction.CompactedTopicImpl; +import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.utils.StatsOutputStream; import org.apache.zookeeper.KeeperException; @@ -154,6 +158,7 @@ public class PersistentTopic implements Topic, AddEntryCallback { public static final int MESSAGE_RATE_BACKOFF_MS = 1000; private final MessageDeduplication messageDeduplication; + private final CompactedTopic compactedTopic; // Whether messages published must be encrypted or not in this topic private volatile boolean isEncryptionRequired = false; @@ -201,6 +206,8 @@ public class PersistentTopic implements Topic, AddEntryCallback { this.dispatchRateLimiter = new DispatchRateLimiter(this); + this.compactedTopic = new CompactedTopicImpl(); + for (ManagedCursor cursor : ledger.getCursors()) { if (cursor.getName().startsWith(replicatorPrefix)) { String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); @@ -212,7 +219,7 @@ public class PersistentTopic implements Topic, AddEntryCallback { // to take care of it } else { final String subscriptionName = Codec.decode(cursor.getName()); - subscriptions.put(subscriptionName, new PersistentSubscription(this, subscriptionName, cursor)); + subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor)); // subscription-cursor gets activated by default: deactivate as there is no active subscription right // now subscriptions.get(subscriptionName).deactivateCursor(); @@ -233,6 +240,15 @@ public class PersistentTopic implements Topic, AddEntryCallback { } } + private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor) { + checkNotNull(compactedTopic); + if (subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION)) { + return new CompactorSubscription(this, compactedTopic, subscriptionName, cursor); + } else { + return new PersistentSubscription(this, subscriptionName, cursor); + } + } + @Override public void publishMessage(ByteBuf headersAndPayload, PublishContext publishContext) { if (messageDeduplication.shouldPublishNextMessage(publishContext, headersAndPayload)) { @@ -498,7 +514,7 @@ public class PersistentTopic implements Topic, AddEntryCallback { } subscriptionFuture.complete(subscriptions.computeIfAbsent(subscriptionName, - name -> new PersistentSubscription(PersistentTopic.this, subscriptionName, cursor))); + name -> createPersistentSubscription(subscriptionName, cursor))); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java new file mode 100644 index 0000000..284cdbf --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import org.apache.bookkeeper.mledger.Position; + +public interface CompactedTopic { + void newCompactedLedger(Position p, long compactedLedgerId); +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java new file mode 100644 index 0000000..068ab49 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import org.apache.bookkeeper.mledger.Position; + +public class CompactedTopicImpl implements CompactedTopic { + @Override + public void newCompactedLedger(Position p, long compactedLedgerId) {} +} + diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java index 62e5fcd..06ed302 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java @@ -37,8 +37,8 @@ import org.slf4j.LoggerFactory; */ public abstract class Compactor { private static final Logger log = LoggerFactory.getLogger(Compactor.class); - private static final String COMPACTION_SUBSCRIPTION = "__compaction"; - private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger"; + public static final String COMPACTION_SUBSCRIPTION = "__compaction"; + public static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger"; static BookKeeper.DigestType COMPACTED_TOPIC_LEDGER_DIGEST_TYPE = BookKeeper.DigestType.CRC32; static byte[] COMPACTED_TOPIC_LEDGER_PASSWORD = "".getBytes(UTF_8); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 29a9f79..f76bd90 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -38,11 +38,14 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.google.common.collect.ImmutableMap; + import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.URL; import java.util.ArrayList; import java.util.Collections; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -59,6 +62,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback; +import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -75,6 +79,7 @@ import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator; +import org.apache.pulsar.broker.service.persistent.CompactorSubscription; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; @@ -83,12 +88,15 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.compaction.CompactedTopic; +import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.zookeeper.ZooKeeperDataCache; import org.apache.zookeeper.ZooKeeper; import org.mockito.ArgumentCaptor; @@ -749,6 +757,12 @@ public class PersistentTopicTest { return null; } }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), anyObject()); + + doAnswer((invokactionOnMock) -> { + ((MarkDeleteCallback) invokactionOnMock.getArguments()[2]) + .markDeleteComplete(invokactionOnMock.getArguments()[3]); + return null; + }).when(cursorMock).asyncMarkDelete(anyObject(), anyObject(), any(MarkDeleteCallback.class), anyObject()); } @Test @@ -978,4 +992,35 @@ public class PersistentTopicTest { verify(clientImpl, Mockito.times(2)).createProducerAsync(globalTopicName, replicator.getProducerConfiguration()); } + @Test + public void testCompactorSubscription() throws Exception { + PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + CompactedTopic compactedTopic = mock(CompactedTopic.class); + PersistentSubscription sub = new CompactorSubscription(topic, compactedTopic, + Compactor.COMPACTION_SUBSCRIPTION, + cursorMock); + PositionImpl position = new PositionImpl(1, 1); + long ledgerId = 0xc0bfefeL; + sub.acknowledgeMessage(position, AckType.Cumulative, + ImmutableMap.of(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY, ledgerId)); + verify(compactedTopic, Mockito.times(1)).newCompactedLedger(position, ledgerId); + } + + + @Test + public void testCompactorSubscriptionUpdatedOnInit() throws Exception { + long ledgerId = 0xc0bfefeL; + Map<String, Long> properties = ImmutableMap.of(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY, ledgerId); + PositionImpl position = new PositionImpl(1, 1); + + doAnswer((invokactionOnMock) -> properties).when(cursorMock).getProperties(); + doAnswer((invokactionOnMock) -> position).when(cursorMock).getMarkDeletedPosition(); + + PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + CompactedTopic compactedTopic = mock(CompactedTopic.class); + PersistentSubscription sub = new CompactorSubscription(topic, compactedTopic, + Compactor.COMPACTION_SUBSCRIPTION, + cursorMock); + verify(compactedTopic, Mockito.times(1)).newCompactedLedger(position, ledgerId); + } } -- To stop receiving notification emails like this one, please contact mme...@apache.org.