This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 11648aa  Notify topic of ledger when compaction finishes (#1188)
11648aa is described below

commit 11648aa65e2393874c94e69d5733846bbc9f22fa
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Thu Feb 8 02:31:41 2018 +0000

    Notify topic of ledger when compaction finishes (#1188)
    
    When the compactor finishes compacting a topic, it send a cumulative
    acknowledgement to the broker, containing the ledger ID that the topic
    was compacted into. Hitherto the broker has done nothing with this
    acknowledgement.
    
    This patch introduces the compacted topic interface, and, for now, a
    dummy implementation of this interface. It also introduces a
    specialization of PersistentSubscription which is created for the
    compactor's subscription. When this subscription receives a cumulative
    acknowledgement, it extracts the ledger id for the acknowledgement
    topics, and notifies the compacted topic interface of this newly
    available compacted ledger. For now, the compacted topic implementation
    does nothing with it. This will be in a later patch.
---
 .../service/persistent/CompactorSubscription.java  | 122 +++++++++++++++++++++
 .../service/persistent/PersistentSubscription.java |  10 +-
 .../broker/service/persistent/PersistentTopic.java |  20 +++-
 .../apache/pulsar/compaction/CompactedTopic.java   |  25 +++++
 .../pulsar/compaction/CompactedTopicImpl.java      |  27 +++++
 .../org/apache/pulsar/compaction/Compactor.java    |   4 +-
 .../pulsar/broker/service/PersistentTopicTest.java |  45 ++++++++
 7 files changed, 244 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
new file mode 100644
index 0000000..dae7c4f
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentFindCursorPositionException;
+import 
org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.compaction.CompactedTopic;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.utils.CopyOnWriteArrayList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.MoreObjects;
+
+public class CompactorSubscription extends PersistentSubscription {
+    private CompactedTopic compactedTopic;
+
+    public CompactorSubscription(PersistentTopic topic, CompactedTopic 
compactedTopic,
+                                 String subscriptionName, ManagedCursor 
cursor) {
+        super(topic, subscriptionName, cursor);
+        
checkArgument(subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION));
+        this.compactedTopic = compactedTopic;
+
+        Map<String, Long> properties = cursor.getProperties();
+        if (properties.containsKey(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY)) 
{
+            long compactedLedgerId = 
properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY);
+            compactedTopic.newCompactedLedger(cursor.getMarkDeletedPosition(),
+                                              compactedLedgerId);
+        }
+    }
+
+    @Override
+    public void acknowledgeMessage(PositionImpl position, AckType ackType, 
Map<String,Long> properties) {
+        checkArgument(ackType == AckType.Cumulative);
+        
checkArgument(properties.containsKey(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY));
+        long compactedLedgerId = 
properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY);
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}][{}] Cumulative ack on compactor subscription {}", 
topicName, subName, position);
+        }
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        cursor.asyncMarkDelete(position, properties, new MarkDeleteCallback() {
+                @Override
+                public void markDeleteComplete(Object ctx) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}][{}] Mark deleted messages until 
position on compactor subscription {}",
+                                  topicName, subName, position);
+                    }
+                    future.complete(null);
+                }
+
+                @Override
+                public void markDeleteFailed(ManagedLedgerException exception, 
Object ctx) {
+                    // TODO: cut consumer connection on markDeleteFailed
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}][{}] Failed to mark delete for position 
on compactor subscription {}",
+                                  topicName, subName, ctx, exception);
+                    }
+                }
+            }, null);
+
+        if (topic.getManagedLedger().isTerminated() && 
cursor.getNumberOfEntriesInBacklog() == 0) {
+            // Notify all consumer that the end of topic was reached
+            dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic);
+        }
+
+        // Once properties have been persisted, we can notify the compacted 
topic to use
+        // the new ledger
+        future.thenAccept((v) -> compactedTopic.newCompactedLedger(position, 
compactedLedgerId));
+    }
+
+    private static final Logger log = 
LoggerFactory.getLogger(CompactorSubscription.class);
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 823daa2..e742f14 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -59,11 +59,11 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.MoreObjects;
 
 public class PersistentSubscription implements Subscription {
-    private final PersistentTopic topic;
-    private final ManagedCursor cursor;
-    private volatile Dispatcher dispatcher;
-    private final String topicName;
-    private final String subName;
+    protected final PersistentTopic topic;
+    protected final ManagedCursor cursor;
+    protected volatile Dispatcher dispatcher;
+    protected final String topicName;
+    protected final String subName;
 
     private static final int FALSE = 0;
     private static final int TRUE = 1;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2945059..57d1d75 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
@@ -95,6 +96,9 @@ import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.apache.pulsar.compaction.CompactedTopic;
+import org.apache.pulsar.compaction.CompactedTopicImpl;
+import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.utils.StatsOutputStream;
 import org.apache.zookeeper.KeeperException;
@@ -154,6 +158,7 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
     public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
 
     private final MessageDeduplication messageDeduplication;
+    private final CompactedTopic compactedTopic;
 
     // Whether messages published must be encrypted or not in this topic
     private volatile boolean isEncryptionRequired = false;
@@ -201,6 +206,8 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
 
         this.dispatchRateLimiter = new DispatchRateLimiter(this);
 
+        this.compactedTopic = new CompactedTopicImpl();
+
         for (ManagedCursor cursor : ledger.getCursors()) {
             if (cursor.getName().startsWith(replicatorPrefix)) {
                 String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
@@ -212,7 +219,7 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
                 // to take care of it
             } else {
                 final String subscriptionName = Codec.decode(cursor.getName());
-                subscriptions.put(subscriptionName, new 
PersistentSubscription(this, subscriptionName, cursor));
+                subscriptions.put(subscriptionName, 
createPersistentSubscription(subscriptionName, cursor));
                 // subscription-cursor gets activated by default: deactivate 
as there is no active subscription right
                 // now
                 subscriptions.get(subscriptionName).deactivateCursor();
@@ -233,6 +240,15 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
         }
     }
 
+    private PersistentSubscription createPersistentSubscription(String 
subscriptionName, ManagedCursor cursor) {
+        checkNotNull(compactedTopic);
+        if (subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION)) {
+            return new CompactorSubscription(this, compactedTopic, 
subscriptionName, cursor);
+        } else {
+            return new PersistentSubscription(this, subscriptionName, cursor);
+        }
+    }
+
     @Override
     public void publishMessage(ByteBuf headersAndPayload, PublishContext 
publishContext) {
         if (messageDeduplication.shouldPublishNextMessage(publishContext, 
headersAndPayload)) {
@@ -498,7 +514,7 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
                 }
 
                 
subscriptionFuture.complete(subscriptions.computeIfAbsent(subscriptionName,
-                        name -> new 
PersistentSubscription(PersistentTopic.this, subscriptionName, cursor)));
+                        name -> createPersistentSubscription(subscriptionName, 
cursor)));
             }
 
             @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
new file mode 100644
index 0000000..284cdbf
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import org.apache.bookkeeper.mledger.Position;
+
+public interface CompactedTopic {
+    void newCompactedLedger(Position p, long compactedLedgerId);
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
new file mode 100644
index 0000000..068ab49
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import org.apache.bookkeeper.mledger.Position;
+
+public class CompactedTopicImpl implements CompactedTopic {
+    @Override
+    public void newCompactedLedger(Position p, long compactedLedgerId) {}
+}
+
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
index 62e5fcd..06ed302 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
@@ -37,8 +37,8 @@ import org.slf4j.LoggerFactory;
 */
 public abstract class Compactor {
     private static final Logger log = LoggerFactory.getLogger(Compactor.class);
-    private static final String COMPACTION_SUBSCRIPTION = "__compaction";
-    private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = 
"CompactedTopicLedger";
+    public static final String COMPACTION_SUBSCRIPTION = "__compaction";
+    public static final String COMPACTED_TOPIC_LEDGER_PROPERTY = 
"CompactedTopicLedger";
     static BookKeeper.DigestType COMPACTED_TOPIC_LEDGER_DIGEST_TYPE = 
BookKeeper.DigestType.CRC32;
     static byte[] COMPACTED_TOPIC_LEDGER_PASSWORD = "".getBytes(UTF_8);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 29a9f79..f76bd90 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -38,11 +38,14 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.collect.ImmutableMap;
+
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -59,6 +62,7 @@ import 
org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -75,6 +79,7 @@ import 
org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator;
+import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
@@ -83,12 +88,15 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.compaction.CompactedTopic;
+import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.zookeeper.ZooKeeper;
 import org.mockito.ArgumentCaptor;
@@ -749,6 +757,12 @@ public class PersistentTopicTest {
                 return null;
             }
         }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), 
any(DeleteCursorCallback.class), anyObject());
+
+        doAnswer((invokactionOnMock) -> {
+                ((MarkDeleteCallback) invokactionOnMock.getArguments()[2])
+                    .markDeleteComplete(invokactionOnMock.getArguments()[3]);
+                return null;
+            }).when(cursorMock).asyncMarkDelete(anyObject(), anyObject(), 
any(MarkDeleteCallback.class), anyObject());
     }
 
     @Test
@@ -978,4 +992,35 @@ public class PersistentTopicTest {
         verify(clientImpl, 
Mockito.times(2)).createProducerAsync(globalTopicName, 
replicator.getProducerConfiguration());
     }
 
+    @Test
+    public void testCompactorSubscription() throws Exception {
+        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        CompactedTopic compactedTopic = mock(CompactedTopic.class);
+        PersistentSubscription sub = new CompactorSubscription(topic, 
compactedTopic,
+                                                               
Compactor.COMPACTION_SUBSCRIPTION,
+                                                               cursorMock);
+        PositionImpl position = new PositionImpl(1, 1);
+        long ledgerId = 0xc0bfefeL;
+        sub.acknowledgeMessage(position, AckType.Cumulative,
+                               
ImmutableMap.of(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY, ledgerId));
+        verify(compactedTopic, Mockito.times(1)).newCompactedLedger(position, 
ledgerId);
+    }
+
+
+    @Test
+    public void testCompactorSubscriptionUpdatedOnInit() throws Exception {
+        long ledgerId = 0xc0bfefeL;
+        Map<String, Long> properties = 
ImmutableMap.of(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY, ledgerId);
+        PositionImpl position = new PositionImpl(1, 1);
+
+        doAnswer((invokactionOnMock) -> 
properties).when(cursorMock).getProperties();
+        doAnswer((invokactionOnMock) -> 
position).when(cursorMock).getMarkDeletedPosition();
+
+        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        CompactedTopic compactedTopic = mock(CompactedTopic.class);
+        PersistentSubscription sub = new CompactorSubscription(topic, 
compactedTopic,
+                                                               
Compactor.COMPACTION_SUBSCRIPTION,
+                                                               cursorMock);
+        verify(compactedTopic, Mockito.times(1)).newCompactedLedger(position, 
ledgerId);
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to