This is an automated email from the ASF dual-hosted git repository. nicoloboschi pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b46924c3a6e6c2baee9a4dc39f3e8425af33285e Author: Enrico Olivelli <[email protected]> AuthorDate: Fri Jun 3 09:03:05 2022 +0200 Issue 15750: PIP-105: Store Subscription properties (#15757) * PIP-105: Store Subscription properties (cherry picked from commit 23f46a0736e844a2a2fec943ee76d4e1e73ec477) --- .../apache/bookkeeper/mledger/ManagedCursor.java | 15 +++ .../apache/bookkeeper/mledger/ManagedLedger.java | 9 +- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 96 ++++++++++++-- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 15 ++- managed-ledger/src/main/proto/MLDataFormats.proto | 9 ++ .../mledger/impl/ManagedCursorContainerTest.java | 5 + .../mledger/impl/ManagedCursorPropertiesTest.java | 62 ++++++++- .../apache/pulsar/broker/service/Subscription.java | 2 + .../nonpersistent/NonPersistentSubscription.java | 13 +- .../service/persistent/PersistentSubscription.java | 21 +++- .../broker/service/persistent/PersistentTopic.java | 17 +-- .../broker/admin/CreateSubscriptionTest.java | 138 ++++++++++++++------- .../pulsar/broker/service/PersistentTopicTest.java | 8 +- .../pulsar/broker/service/ServerCnxTest.java | 10 +- .../broker/service/plugin/FilterEntryTest.java | 8 +- .../offload/jcloud/impl/MockManagedLedger.java | 6 +- 16 files changed, 339 insertions(+), 95 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index efd183d0bfb..46ca0f14003 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -23,6 +23,7 @@ import com.google.common.collect.Range; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.common.annotation.InterfaceAudience; import org.apache.bookkeeper.common.annotation.InterfaceStability; import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; @@ -79,6 +80,20 @@ public interface ManagedCursor { */ Map<String, Long> getProperties(); + /** + * Return any properties that were associated with the cursor. + */ + Map<String, String> getCursorProperties(); + + /** + * Updates the properties. + * @param cursorProperties + * @return a handle to the result of the operation + */ + default CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) { + return CompletableFuture.completedFuture(null); + } + /** * Add a property associated with the last stored position. */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 1f6e0d3af46..7196a3b4c03 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -240,10 +240,13 @@ public interface ManagedLedger { * @param properties * user defined properties that will be attached to the first position of the cursor, if the open * operation will trigger the creation of the cursor. + * @param cursorProperties + * the properties for the Cursor * @return the ManagedCursor * @throws ManagedLedgerException */ - ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties) + ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties, + Map<String, String> cursorProperties) throws InterruptedException, ManagedLedgerException; /** @@ -337,13 +340,15 @@ public interface ManagedLedger { * @param initialPosition * the cursor will be set at lastest position or not when first created * default is <b>true</b> + * @param cursorProperties + * the properties for the Cursor * @param callback * callback object * @param ctx * opaque context */ void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, Long> properties, - OpenCursorCallback callback, Object ctx); + Map<String, String> cursorProperties, OpenCursorCallback callback, Object ctx); /** * Get a list of all the cursors reading from this ManagedLedger. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index d1fe0df5d3d..1092ca0a89d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -92,6 +92,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet; @@ -108,6 +109,7 @@ public class ManagedCursorImpl implements ManagedCursor { protected final ManagedLedgerConfig config; protected final ManagedLedgerImpl ledger; private final String name; + private volatile Map<String, String> cursorProperties; private final BookKeeper.DigestType digestType; protected volatile PositionImpl markDeletePosition; @@ -278,6 +280,7 @@ public class ManagedCursorImpl implements ManagedCursor { ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName) { this.bookkeeper = bookkeeper; + this.cursorProperties = Collections.emptyMap(); this.config = config; this.ledger = ledger; this.name = cursorName; @@ -313,6 +316,52 @@ public class ManagedCursorImpl implements ManagedCursor { return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap(); } + @Override + public Map<String, String> getCursorProperties() { + return cursorProperties; + } + + @Override + public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) { + CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>(); + ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() { + @Override + public void operationComplete(ManagedCursorInfo info, Stat stat) { + ManagedCursorInfo copy = ManagedCursorInfo + .newBuilder(info) + .clearCursorProperties() + .addAllCursorProperties(buildStringPropertiesMap(cursorProperties)) + .build(); + ledger.getStore().asyncUpdateCursorInfo(ledger.getName(), + name, copy, stat, new MetaStoreCallback<>() { + @Override + public void operationComplete(Void result, Stat stat) { + log.info("[{}] Updated ledger cursor: {} properties {}", ledger.getName(), + name, cursorProperties); + ManagedCursorImpl.this.cursorProperties = cursorProperties; + cursorLedgerStat = stat; + updateCursorPropertiesResult.complete(result); + } + + @Override + public void operationFailed(MetaStoreException e) { + log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(), + name, cursorProperties, e); + updateCursorPropertiesResult.completeExceptionally(e); + } + }); + } + + @Override + public void operationFailed(MetaStoreException e) { + log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(), + name, cursorProperties, e); + updateCursorPropertiesResult.completeExceptionally(e); + } + }); + return updateCursorPropertiesResult; + } + @Override public boolean putProperty(String key, Long value) { if (lastMarkDeleteEntry != null) { @@ -361,6 +410,18 @@ public class ManagedCursorImpl implements ManagedCursor { cursorLedgerStat = stat; lastActive = info.getLastActive() != 0 ? info.getLastActive() : lastActive; + + Map<String, String> recoveredCursorProperties = Collections.emptyMap(); + if (info.getCursorPropertiesCount() > 0) { + // Recover properties map + recoveredCursorProperties = Maps.newHashMap(); + for (int i = 0; i < info.getCursorPropertiesCount(); i++) { + StringProperty property = info.getCursorProperties(i); + recoveredCursorProperties.put(property.getName(), property.getValue()); + } + } + cursorProperties = recoveredCursorProperties; + if (info.getCursorsLedgerId() == -1L) { // There is no cursor ledger to read the last position from. It means the cursor has been properly // closed and the last mark-delete position is stored in the ManagedCursorInfo itself. @@ -380,7 +441,7 @@ public class ManagedCursorImpl implements ManagedCursor { } } - recoveredCursor(recoveredPosition, recoveredProperties, null); + recoveredCursor(recoveredPosition, recoveredProperties, recoveredCursorProperties, null); callback.operationComplete(); } else { // Need to proceed and read the last entry in the specified ledger to find out the last position @@ -410,7 +471,7 @@ public class ManagedCursorImpl implements ManagedCursor { log.error("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name, BKException.getMessage(rc)); // Rewind to oldest entry available - initialize(getRollbackPosition(info), Collections.emptyMap(), callback); + initialize(getRollbackPosition(info), Collections.emptyMap(), Collections.emptyMap(), callback); return; } else if (rc != BKException.Code.OK) { log.warn("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name, @@ -426,7 +487,7 @@ public class ManagedCursorImpl implements ManagedCursor { log.warn("[{}] Error reading from metadata ledger {} for consumer {}: No entries in ledger", ledger.getName(), ledgerId, name); // Rewind to last cursor snapshot available - initialize(getRollbackPosition(info), Collections.emptyMap(), callback); + initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback); return; } @@ -438,7 +499,7 @@ public class ManagedCursorImpl implements ManagedCursor { log.error("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name, BKException.getMessage(rc1)); // Rewind to oldest entry available - initialize(getRollbackPosition(info), Collections.emptyMap(), callback); + initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback); return; } else if (rc1 != BKException.Code.OK) { log.warn("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(), @@ -476,7 +537,7 @@ public class ManagedCursorImpl implements ManagedCursor { && positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) { recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList()); } - recoveredCursor(position, recoveredProperties, lh); + recoveredCursor(position, recoveredProperties, cursorProperties, lh); callback.operationComplete(); }, null); }; @@ -547,6 +608,7 @@ public class ManagedCursorImpl implements ManagedCursor { } private void recoveredCursor(PositionImpl position, Map<String, Long> properties, + Map<String, String> cursorProperties, LedgerHandle recoveredFromCursorLedger) { // if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty), // we need to move to the next existing ledger @@ -564,7 +626,7 @@ public class ManagedCursorImpl implements ManagedCursor { position = ledger.getLastPosition(); } log.info("[{}] Cursor {} recovered to position {}", ledger.getName(), name, position); - + this.cursorProperties = cursorProperties; messagesConsumedCounter = -getNumberOfEntries(Range.openClosed(position, ledger.getLastPosition())); markDeletePosition = position; persistentMarkDeletePosition = position; @@ -577,8 +639,9 @@ public class ManagedCursorImpl implements ManagedCursor { STATE_UPDATER.set(this, State.NoLedger); } - void initialize(PositionImpl position, Map<String, Long> properties, final VoidCallback callback) { - recoveredCursor(position, properties, null); + void initialize(PositionImpl position, Map<String, Long> properties, Map<String, String> cursorProperties, + final VoidCallback callback) { + recoveredCursor(position, properties, cursorProperties, null); if (log.isDebugEnabled()) { log.debug("[{}] Consumer {} cursor initialized with counters: consumed {} mdPos {} rdPos {}", ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition); @@ -2383,6 +2446,7 @@ public class ManagedCursorImpl implements ManagedCursor { .setLastActive(lastActive); // info.addAllProperties(buildPropertiesMap(properties)); + info.addAllCursorProperties(buildStringPropertiesMap(cursorProperties)); if (persistIndividualDeletedMessageRanges) { info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges()); if (config.isDeletionAtBatchIndexLevelEnabled()) { @@ -2589,7 +2653,7 @@ public class ManagedCursorImpl implements ManagedCursor { }, LedgerMetadataUtils.buildAdditionalMetadataForCursor(name)); } - private List<LongProperty> buildPropertiesMap(Map<String, Long> properties) { + private static List<LongProperty> buildPropertiesMap(Map<String, Long> properties) { if (properties.isEmpty()) { return Collections.emptyList(); } @@ -2603,6 +2667,20 @@ public class ManagedCursorImpl implements ManagedCursor { return longProperties; } + private static List<StringProperty> buildStringPropertiesMap(Map<String, String> properties) { + if (properties == null || properties.isEmpty()) { + return Collections.emptyList(); + } + + List<StringProperty> stringProperties = Lists.newArrayList(); + properties.forEach((name, value) -> { + StringProperty sp = StringProperty.newBuilder().setName(name).setValue(value).build(); + stringProperties.add(sp); + }); + + return stringProperties; + } + private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() { lock.readLock().lock(); try { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 0879c48de7d..1c7297d880a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -845,11 +845,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { @Override public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition) throws InterruptedException, ManagedLedgerException { - return openCursor(cursorName, initialPosition, Collections.emptyMap()); + return openCursor(cursorName, initialPosition, Collections.emptyMap(), Collections.emptyMap()); } @Override - public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, Map<String, Long> properties) + public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, Map<String, Long> properties, + Map<String, String> cursorProperties) throws InterruptedException, ManagedLedgerException { final CountDownLatch counter = new CountDownLatch(1); class Result { @@ -858,7 +859,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } final Result result = new Result(); - asyncOpenCursor(cursorName, initialPosition, properties, new OpenCursorCallback() { + asyncOpenCursor(cursorName, initialPosition, properties, cursorProperties, new OpenCursorCallback() { @Override public void openCursorComplete(ManagedCursor cursor, Object ctx) { result.cursor = cursor; @@ -893,12 +894,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { @Override public void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition, final OpenCursorCallback callback, final Object ctx) { - this.asyncOpenCursor(cursorName, initialPosition, Collections.emptyMap(), callback, ctx); + this.asyncOpenCursor(cursorName, initialPosition, Collections.emptyMap(), Collections.emptyMap(), + callback, ctx); } @Override public synchronized void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition, - Map<String, Long> properties, final OpenCursorCallback callback, final Object ctx) { + Map<String, Long> properties, Map<String, String> cursorProperties, + final OpenCursorCallback callback, final Object ctx) { try { checkManagedLedgerIsOpen(); checkFenced(); @@ -932,7 +935,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { CompletableFuture<ManagedCursor> cursorFuture = new CompletableFuture<>(); uninitializedCursors.put(cursorName, cursorFuture); PositionImpl position = InitialPosition.Earliest == initialPosition ? getFirstPosition() : getLastPosition(); - cursor.initialize(position, properties, new VoidCallback() { + cursor.initialize(position, properties, cursorProperties, new VoidCallback() { @Override public void operationComplete() { log.info("[{}] Opened new cursor: {}", name, cursor); diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto index 4671816c1a1..c4e502819fa 100644 --- a/managed-ledger/src/main/proto/MLDataFormats.proto +++ b/managed-ledger/src/main/proto/MLDataFormats.proto @@ -105,6 +105,11 @@ message LongProperty { required int64 value = 2; } +message StringProperty { + required string name = 1; + required string value = 2; +} + message ManagedCursorInfo { // If the ledger id is -1, then the mark-delete position is // the one from the (ledgerId, entryId) snapshot below @@ -123,6 +128,10 @@ message ManagedCursorInfo { // Store which index in the batch message has been deleted repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7; + + // Additional custom properties associated with + // the cursor + repeated StringProperty cursorProperties = 8; } enum CompressionType { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index a654b30e60b..05f34df47c1 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -67,6 +67,11 @@ public class ManagedCursorContainerTest { return Collections.emptyMap(); } + @Override + public Map<String, String> getCursorProperties() { + return Collections.emptyMap(); + } + @Override public boolean putProperty(String key, Long value) { return false; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java index 727f3850ad2..74db9d791f3 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java @@ -23,6 +23,8 @@ import static org.testng.Assert.assertEquals; import java.util.Collections; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; @@ -75,9 +77,15 @@ public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase { @Test(timeOut = 20000) void testPropertiesRecoveryAfterCrash() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig()); - ManagedCursor c1 = ledger.openCursor("c1"); + + Map<String, String> cursorProperties = new TreeMap<>(); + cursorProperties.put("custom1", "one"); + cursorProperties.put("custom2", "two"); + + ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, Collections.emptyMap(), cursorProperties); assertEquals(c1.getProperties(), Collections.emptyMap()); + assertEquals(c1.getCursorProperties(), cursorProperties); ledger.addEntry("entry-1".getBytes()); ledger.addEntry("entry-2".getBytes()); @@ -99,6 +107,7 @@ public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase { assertEquals(c1.getMarkDeletedPosition(), p3); assertEquals(c1.getProperties(), properties); + assertEquals(c1.getCursorProperties(), cursorProperties); factory2.shutdown(); } @@ -148,8 +157,13 @@ public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase { properties.put("b", 2L); properties.put("c", 3L); - ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, properties); + Map<String, String> cursorProperties = new TreeMap<>(); + cursorProperties.put("custom1", "one"); + cursorProperties.put("custom2", "two"); + + ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, properties, cursorProperties); assertEquals(c1.getProperties(), properties); + assertEquals(c1.getCursorProperties(), cursorProperties); ledger.addEntry("entry-1".getBytes()); @@ -160,6 +174,50 @@ public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase { c1 = ledger.openCursor("c1"); assertEquals(c1.getProperties(), properties); + assertEquals(c1.getCursorProperties(), cursorProperties); } + @Test + void testUpdateCursorProperties() throws Exception { + ManagedLedger ledger = factory.open("testUpdateCursorProperties", new ManagedLedgerConfig()); + + Map<String, Long> properties = new TreeMap<>(); + properties.put("a", 1L); + + Map<String, String> cursorProperties = new TreeMap<>(); + cursorProperties.put("custom1", "one"); + cursorProperties.put("custom2", "two"); + + ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, properties, cursorProperties); + assertEquals(c1.getProperties(), properties); + assertEquals(c1.getCursorProperties(), cursorProperties); + + ledger.addEntry("entry-1".getBytes()); + + Map<String, String> cursorPropertiesUpdated = new TreeMap<>(); + cursorPropertiesUpdated.put("custom1", "three"); + cursorPropertiesUpdated.put("custom2", "four"); + + c1.setCursorProperties(cursorPropertiesUpdated).get(10, TimeUnit.SECONDS); + + ledger.close(); + + // Reopen the managed ledger + ledger = factory.open("testUpdateCursorProperties", new ManagedLedgerConfig()); + c1 = ledger.openCursor("c1"); + + assertEquals(c1.getProperties(), properties); + assertEquals(c1.getCursorProperties(), cursorPropertiesUpdated); + + // Create a new factory to force a managed ledger close and recovery + ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); + // Reopen the managed ledger + ledger = factory2.open("testUpdateCursorProperties", new ManagedLedgerConfig()); + c1 = ledger.openCursor("c1"); + + assertEquals(c1.getProperties(), properties); + assertEquals(c1.getCursorProperties(), cursorPropertiesUpdated); + + factory2.shutdown(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index b1ccb4d1eb0..49b906b7959 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -103,6 +103,8 @@ public interface Subscription { Map<String, String> getSubscriptionProperties(); + CompletableFuture<Void> updateSubscriptionProperties(Map<String, String> subscriptionProperties); + default void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) { // Default is no-op } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index ae49b3623ca..a9777f5dd0d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -68,7 +68,7 @@ public class NonPersistentSubscription implements Subscription { private final LongAdder bytesOutFromRemovedConsumers = new LongAdder(); private final LongAdder msgOutFromRemovedConsumer = new LongAdder(); - private final Map<String, String> subscriptionProperties; + private volatile Map<String, String> subscriptionProperties; // If isDurable is false(such as a Reader), remove subscription from topic when closing this subscription. private final boolean isDurable; @@ -526,4 +526,15 @@ public class NonPersistentSubscription implements Subscription { public Map<String, String> getSubscriptionProperties() { return subscriptionProperties; } + + @Override + public CompletableFuture<Void> updateSubscriptionProperties(Map<String, String> subscriptionProperties) { + if (subscriptionProperties == null || subscriptionProperties.isEmpty()) { + this.subscriptionProperties = Collections.emptyMap(); + } else { + this.subscriptionProperties = Collections.unmodifiableMap(subscriptionProperties); + } + return CompletableFuture.completedFuture(null); + } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 3b23e86f9ad..95dee2a3798 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -22,7 +22,6 @@ import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEvent import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -114,7 +113,7 @@ public class PersistentSubscription implements Subscription { private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache; private final PendingAckHandle pendingAckHandle; - private Map<String, String> subscriptionProperties; + private volatile Map<String, String> subscriptionProperties; private final LongAdder bytesOutFromRemovedConsumers = new LongAdder(); private final LongAdder msgOutFromRemovedConsumer = new LongAdder(); @@ -137,7 +136,7 @@ public class PersistentSubscription implements Subscription { } public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor, - boolean replicated, Map<String, String> subscriptionProperties) { + boolean replicated, Map<String, String> subscriptionProperties) { this.topic = topic; this.cursor = cursor; this.topicName = topic.getName(); @@ -146,7 +145,7 @@ public class PersistentSubscription implements Subscription { this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this); this.setReplicated(replicated); this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties) - ? new HashMap<>() : Collections.unmodifiableMap(subscriptionProperties); + ? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties); if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled() && !checkTopicIsEventsNames(TopicName.get(topicName))) { this.pendingAckHandle = new PendingAckHandleImpl(this); @@ -1094,6 +1093,20 @@ public class PersistentSubscription implements Subscription { return subscriptionProperties; } + @Override + public CompletableFuture<Void> updateSubscriptionProperties(Map<String, String> subscriptionProperties) { + Map<String, String> newSubscriptionProperties; + if (subscriptionProperties == null || subscriptionProperties.isEmpty()) { + newSubscriptionProperties = Collections.emptyMap(); + } else { + newSubscriptionProperties = Collections.unmodifiableMap(subscriptionProperties); + } + return cursor.setCursorProperties(newSubscriptionProperties) + .thenRun(() -> { + this.subscriptionProperties = newSubscriptionProperties; + }); + } + /** * Return a merged map that contains the cursor properties specified by used * (eg. when using compaction subscription) and the subscription properties. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index c2d5522f502..568ac5ae063 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -74,7 +74,6 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.net.BookieId; -import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources; @@ -277,7 +276,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } else { final String subscriptionName = Codec.decode(cursor.getName()); subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor, - PersistentSubscription.isCursorFromReplicatedSubscription(cursor), null)); + PersistentSubscription.isCursorFromReplicatedSubscription(cursor), + cursor.getCursorProperties())); // subscription-cursor gets activated by default: deactivate as there is no active subscription right // now subscriptions.get(subscriptionName).deactivateCursor(); @@ -858,7 +858,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal Map<String, Long> properties = PersistentSubscription.getBaseCursorProperties(replicated); - ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, new OpenCursorCallback() { + ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, subscriptionProperties, + new OpenCursorCallback() { @Override public void openCursorComplete(ManagedCursor cursor, Object ctx) { if (log.isDebugEnabled()) { @@ -878,11 +879,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal return; } } - if (MapUtils.isEmpty(subscription.getSubscriptionProperties()) - && MapUtils.isNotEmpty(subscriptionProperties)) { - subscription.getSubscriptionProperties().putAll(subscriptionProperties); - } - if (replicated && !subscription.isReplicated()) { // Flip the subscription state subscription.setReplicated(replicated); @@ -961,11 +957,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal return FutureUtil.failedFuture( new NotAllowedException("Durable subscription with the same name already exists.")); } - - if (MapUtils.isEmpty(subscription.getSubscriptionProperties()) - && MapUtils.isNotEmpty(subscriptionProperties)) { - subscription.getSubscriptionProperties().putAll(subscriptionProperties); - } } if (startMessageRollbackDurationSec > 0) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java index e0f5e0a77a9..12b742a0191 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java @@ -213,10 +213,7 @@ public class CreateSubscriptionTest extends ProducerConsumerBase { PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService() .getTopicReference(topic).get().getSubscription(subName); Map<String, String> properties = subscription.getSubscriptionProperties(); - assertTrue(properties.containsKey("1")); - assertTrue(properties.containsKey("2")); - assertEquals(properties.get("1"), "1"); - assertEquals(properties.get("2"), "2"); + assertEquals(properties, map); // after updating mark delete position, the properties should still exist Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); @@ -232,10 +229,7 @@ public class CreateSubscriptionTest extends ProducerConsumerBase { assertEquals(subscription.getCursor().getMarkDeletedPosition().getEntryId(), messageId.getEntryId()); }); properties = subscription.getSubscriptionProperties(); - assertTrue(properties.containsKey("1")); - assertTrue(properties.containsKey("2")); - assertEquals(properties.get("1"), "1"); - assertEquals(properties.get("2"), "2"); + assertEquals(properties, map); consumer.close(); producer.close(); @@ -249,10 +243,7 @@ public class CreateSubscriptionTest extends ProducerConsumerBase { .getTopicReference(topic).get().getSubscription(subName); Awaitility.await().untilAsserted(() -> { Map<String, String> properties2 = subscription2.getSubscriptionProperties(); - assertTrue(properties2.containsKey("1")); - assertTrue(properties2.containsKey("2")); - assertEquals(properties2.get("1"), "1"); - assertEquals(properties2.get("2"), "2"); + assertEquals(properties2, map); }); consumer2.close(); @@ -264,13 +255,11 @@ public class CreateSubscriptionTest extends ProducerConsumerBase { .receiverQueueSize(1) .subscriptionProperties(map3).subscriptionName(subName).subscribe(); Map<String, String> properties3 = subscription.getSubscriptionProperties(); - assertTrue(properties3.containsKey("1")); - assertTrue(properties3.containsKey("2")); - assertEquals(properties3.get("1"), "1"); - assertEquals(properties3.get("2"), "2"); + assertEquals(properties3, map); consumer3.close(); - //restart and create a new consumer with new properties, the new properties should be updated + //restart and create a new consumer with new properties, the new properties must not be updated + // for a Durable subscription, but for a NonDurable subscription we pick up the new values restartBroker(); Consumer<byte[]> consumer4 = pulsarClient.newConsumer().subscriptionMode(subscriptionMode) .topic(topic).receiverQueueSize(1) @@ -278,10 +267,12 @@ public class CreateSubscriptionTest extends ProducerConsumerBase { PersistentSubscription subscription4 = (PersistentSubscription) pulsar.getBrokerService() .getTopicReference(topic).get().getSubscription(subName); Map<String, String> properties4 = subscription4.getSubscriptionProperties(); - assertTrue(properties4.containsKey("3")); - assertTrue(properties4.containsKey("4")); - assertEquals(properties4.get("3"), "3"); - assertEquals(properties4.get("4"), "4"); + if (subscriptionMode == SubscriptionMode.Durable) { + assertEquals(properties4, map); + } else { + assertEquals(properties4, map3); + + } consumer4.close(); @@ -294,26 +285,28 @@ public class CreateSubscriptionTest extends ProducerConsumerBase { .getTopicReference(topic).get().getSubscription(subName); properties4 = subscription4.getSubscriptionProperties(); if (subscriptionMode == SubscriptionMode.Durable) { - assertTrue(properties4.containsKey("3")); - assertTrue(properties4.containsKey("4")); - assertEquals(properties4.get("3"), "3"); - assertEquals(properties4.get("4"), "4"); + assertEquals(properties4, map); } else { assertTrue(properties4.isEmpty()); } consumer4.close(); - //restart broker, it won't get any properties + //restart broker, properties for Durable subscription are reloaded from Metadata restartBroker(); consumer4 = pulsarClient.newConsumer().topic(topic).subscriptionMode(subscriptionMode) .receiverQueueSize(1) .subscriptionName(subName).subscribe(); subscription4 = (PersistentSubscription) pulsar.getBrokerService() .getTopicReference(topic).get().getSubscription(subName); - assertEquals(subscription4.getSubscriptionProperties().size(), 0); + properties4 = subscription4.getSubscriptionProperties(); + if (subscriptionMode == SubscriptionMode.Durable) { + assertEquals(properties4, map); + } else { + assertTrue(properties4.isEmpty()); + } consumer4.close(); - //restart broker and create a new consumer with new properties, the properties will be updated + //restart broker and create a new consumer with new properties, the properties will not be updated restartBroker(); consumer4 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1) .subscriptionMode(subscriptionMode) @@ -321,16 +314,17 @@ public class CreateSubscriptionTest extends ProducerConsumerBase { .subscriptionName(subName).subscribe(); PersistentSubscription subscription5 = (PersistentSubscription) pulsar.getBrokerService() .getTopicReference(topic).get().getSubscription(subName); - properties = subscription5.getSubscriptionProperties(); - assertTrue(properties.containsKey("1")); - assertTrue(properties.containsKey("2")); - assertEquals(properties.get("1"), "1"); - assertEquals(properties.get("2"), "2"); - consumer4.close(); + properties4 = subscription5.getSubscriptionProperties(); + + // for the NonDurable subscription here we have the same properties because they + // are sent by the Consumer + assertEquals(properties4, map); + consumer4.close(); String subNameShared = "my-sub-shared"; Map<String, String> mapShared = new HashMap<>(); + mapShared.put("6", "7"); // open two consumers with a Shared Subscription Consumer consumerShared1 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1) .subscriptionMode(subscriptionMode) @@ -342,26 +336,25 @@ public class CreateSubscriptionTest extends ProducerConsumerBase { properties = subscriptionShared.getSubscriptionProperties(); assertEquals(properties, mapShared); - // add a new consumer, the properties are updated because they were empty - mapShared = new HashMap<>(); - mapShared.put("6", "7"); - mapShared.put("8", "9"); + // add a new consumer, the properties are not updated + Map<String, String> mapShared2 = new HashMap<>(); + mapShared2.put("8", "9"); Consumer consumerShared2 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1) .subscriptionMode(subscriptionMode) .subscriptionType(SubscriptionType.Shared) - .subscriptionProperties(mapShared) + .subscriptionProperties(mapShared2) .subscriptionName(subNameShared).subscribe(); properties = subscriptionShared.getSubscriptionProperties(); assertEquals(properties, mapShared); - // add a third consumer, the properties are NOT updated because they are not empty - Map<String, String> mapShared2 = new HashMap<>(); - mapShared2.put("10", "11"); + // add a third consumer, the properties are NOT updated + Map<String, String> mapShared3 = new HashMap<>(); + mapShared3.put("10", "11"); Consumer consumerShared3 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1) .subscriptionMode(subscriptionMode) .subscriptionType(SubscriptionType.Shared) - .subscriptionProperties(mapShared2) + .subscriptionProperties(mapShared3) .subscriptionName(subNameShared).subscribe(); properties = subscriptionShared.getSubscriptionProperties(); @@ -373,6 +366,65 @@ public class CreateSubscriptionTest extends ProducerConsumerBase { consumerShared3.close(); } + @Test + public void subscriptionModePersistedTest() throws Exception { + String topic = "persistent://my-property/my-ns/topic" + UUID.randomUUID(); + admin.topics().createNonPartitionedTopic(topic); + Map<String, String> map = new HashMap<>(); + map.put("1", "1"); + map.put("2", "2"); + String subName = "my-sub"; + pulsarClient.newConsumer() + .subscriptionMode(SubscriptionMode.Durable) + .topic(topic) + .subscriptionProperties(map) + .subscriptionName(subName) + .subscribe() + .close(); + PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService() + .getTopicReference(topic).get().getSubscription(subName); + Map<String, String> properties = subscription.getSubscriptionProperties(); + assertTrue(properties.containsKey("1")); + assertTrue(properties.containsKey("2")); + assertEquals(properties.get("1"), "1"); + assertEquals(properties.get("2"), "2"); + + Map<String, String> subscriptionPropertiesFromAdmin = + admin.topics().getStats(topic).getSubscriptions().get(subName).getSubscriptionProperties(); + assertEquals(map, subscriptionPropertiesFromAdmin); + + // unload the topic + admin.topics().unload(topic); + + // verify that the properties are still there + subscriptionPropertiesFromAdmin = + admin.topics().getStats(topic).getSubscriptions().get(subName).getSubscriptionProperties(); + assertEquals(map, subscriptionPropertiesFromAdmin); + + + // create a new subscription, initially properties are empty + String subName2 = "my-sub2"; + admin.topics().createSubscription(topic, subName2, MessageId.latest); + + subscriptionPropertiesFromAdmin = + admin.topics().getStats(topic).getSubscriptions().get(subName2).getSubscriptionProperties(); + assertTrue(subscriptionPropertiesFromAdmin.isEmpty()); + + // create a consumer, this is not allowed to update the properties + pulsarClient.newConsumer() + .subscriptionMode(SubscriptionMode.Durable) + .topic(topic) + .subscriptionProperties(map) + .subscriptionName(subName2) + .subscribe() + .close(); + + // verify that the properties are not changed + subscriptionPropertiesFromAdmin = + admin.topics().getStats(topic).getSubscriptions().get(subName2).getSubscriptionProperties(); + assertTrue(subscriptionPropertiesFromAdmin.isEmpty()); + } + @Test public void createSubscriptionBySpecifyingStringPosition() throws IOException, PulsarAdminException { final int numberOfMessages = 5; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index b75cffde81e..e376ec990fd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -1551,11 +1551,11 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { doAnswer(new Answer<Object>() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null); + ((OpenCursorCallback) invocationOnMock.getArguments()[4]).openCursorComplete(cursorMock, null); return null; } }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class), - any(OpenCursorCallback.class), any()); + any(Map.class), any(OpenCursorCallback.class), any()); doAnswer(new Answer<Object>() { @Override @@ -2204,9 +2204,9 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { return null; }).when(mockLedger).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), any()); doAnswer((Answer<Object>) invocationOnMock -> { - ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(mockCursor, null); + ((OpenCursorCallback) invocationOnMock.getArguments()[4]).openCursorComplete(mockCursor, null); return null; - }).when(mockLedger).asyncOpenCursor(any(), any(), any(), any(), any()); + }).when(mockLedger).asyncOpenCursor(any(), any(), any(), any(), any(), any()); PersistentTopic topic = new PersistentTopic(successTopicName, mockLedger, brokerService); CommandSubscribe cmd = new CommandSubscribe() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 5bef206a44d..afa5d1aad03 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -1654,20 +1654,20 @@ public class ServerCnxTest { doAnswer((Answer<Object>) invocationOnMock -> { Thread.sleep(300); - ((OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null); + ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null); return null; }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any()); doAnswer((Answer<Object>) invocationOnMock -> { Thread.sleep(300); - ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null); + ((OpenCursorCallback) invocationOnMock.getArguments()[4]).openCursorComplete(cursorMock, null); return null; - }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class), + }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class), any(Map.class), any(OpenCursorCallback.class), any()); doAnswer((Answer<Object>) invocationOnMock -> { Thread.sleep(300); - ((OpenCursorCallback) invocationOnMock.getArguments()[2]) + ((OpenCursorCallback) invocationOnMock.getArguments()[3]) .openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null); return null; }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any()); @@ -1677,7 +1677,7 @@ public class ServerCnxTest { ((OpenCursorCallback) invocationOnMock.getArguments()[3]) .openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null); return null; - }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(Map.class), + }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(Map.class), any(Map.class), any(OpenCursorCallback.class), any()); doAnswer((Answer<Object>) invocationOnMock -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java index 81ad811f43c..b2edbda8855 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java @@ -69,10 +69,13 @@ public class FilterEntryTest extends BrokerTestBase { } public void testFilter() throws Exception { - + Map<String, String> map = new HashMap<>(); + map.put("1","1"); + map.put("2","2"); String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID(); String subName = "sub"; Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic) + .subscriptionProperties(map) .subscriptionName(subName).subscribe(); // mock entry filters PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService() @@ -132,9 +135,6 @@ public class FilterEntryTest extends BrokerTestBase { }); consumer.close(); - Map<String, String> map = new HashMap<>(); - map.put("1","1"); - map.put("2","2"); consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionProperties(map) .subscriptionName(subName).subscribe(); for (int i = 0; i < 10; i++) { diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java index 4ad872919e3..3eaf276c3c5 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java @@ -105,7 +105,8 @@ public class MockManagedLedger implements ManagedLedger { @Override public ManagedCursor openCursor(String name, CommandSubscribe.InitialPosition initialPosition, - Map<String, Long> properties) throws InterruptedException, ManagedLedgerException { + Map<String, Long> properties, Map<String, String> cursorProperties) + throws InterruptedException, ManagedLedgerException { return null; } @@ -155,7 +156,8 @@ public class MockManagedLedger implements ManagedLedger { @Override public void asyncOpenCursor(String name, CommandSubscribe.InitialPosition initialPosition, - Map<String, Long> properties, AsyncCallbacks.OpenCursorCallback callback, Object ctx) { + Map<String, Long> properties, Map<String, String> cursorProperties, + AsyncCallbacks.OpenCursorCallback callback, Object ctx) { }
