This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8081ee26d8d [improve][broker][PIP-195] Make bucket merge operation
asynchronous (#19873)
8081ee26d8d is described below
commit 8081ee26d8d3727c720800a3453a798893763fee
Author: Cong Zhao <[email protected]>
AuthorDate: Sat Mar 25 15:39:37 2023 +0800
[improve][broker][PIP-195] Make bucket merge operation asynchronous (#19873)
---
.../bucket/BucketDelayedDeliveryTracker.java | 139 +++++++++++++--------
.../broker/delayed/bucket/ImmutableBucket.java | 17 ++-
.../broker/delayed/bucket/MutableBucket.java | 6 +
.../DelayedMessageIndexBucketSnapshotFormat.proto | 1 +
.../BookkeeperBucketSnapshotStorageTest.java | 3 +
.../bucket/BucketDelayedDeliveryTrackerTest.java | 57 +++++++--
6 files changed, 154 insertions(+), 69 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index 31fdaa6fb76..1cf8bd2b20d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -90,6 +90,8 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
private final Table<Long, Long, ImmutableBucket>
snapshotSegmentLastIndexTable;
+ private static final Long INVALID_BUCKET_ID = -1L;
+
public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers
dispatcher,
Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
@@ -246,12 +248,12 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
immutableBucket);
immutableBucket.getSnapshotCreateFuture().ifPresent(createFuture
-> {
- CompletableFuture<Long> future =
createFuture.whenComplete((__, ex) -> {
+ CompletableFuture<Long> future =
createFuture.handle((bucketId, ex) -> {
if (ex == null) {
immutableBucket.setSnapshotSegments(null);
log.info("[{}] Creat bucket snapshot finish,
bucketKey: {}", dispatcher.getName(),
immutableBucket.bucketKey());
- return;
+ return bucketId;
}
//TODO Record create snapshot failed
@@ -277,6 +279,7 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
snapshotSegmentLastIndexTable.remove(lastDelayedIndex.getLedgerId(),
lastDelayedIndex.getTimestamp());
}
+ return INVALID_BUCKET_ID;
});
immutableBucket.setSnapshotCreateFuture(future);
});
@@ -308,12 +311,7 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
lastMutableBucket.resetLastMutableBucketRange();
if (immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
- try {
- asyncMergeBucketSnapshot().get(2 *
AsyncOperationTimeoutSeconds * MaxRetryTimes, TimeUnit.SECONDS);
- } catch (Exception e) {
- // Ignore exception to merge bucket on the next schedule.
- log.error("[{}] An exception occurs when merge bucket
snapshot.", dispatcher.getName(), e);
- }
+ asyncMergeBucketSnapshot();
}
}
@@ -341,18 +339,26 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() {
List<ImmutableBucket> values =
immutableBuckets.asMapOfRanges().values().stream().toList();
long minNumberMessages = Long.MAX_VALUE;
+ long minScheduleTimestamp = Long.MAX_VALUE;
int minIndex = -1;
for (int i = 0; i + 1 < values.size(); i++) {
ImmutableBucket bucketL = values.get(i);
ImmutableBucket bucketR = values.get(i + 1);
- long numberMessages = bucketL.numberBucketDelayedMessages +
bucketR.numberBucketDelayedMessages;
- if (numberMessages < minNumberMessages) {
- minNumberMessages = (int) numberMessages;
- if (bucketL.lastSegmentEntryId >
bucketL.getCurrentSegmentEntryId()
- && bucketR.lastSegmentEntryId >
bucketR.getCurrentSegmentEntryId()
- &&
bucketL.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone()
- &&
bucketR.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone()) {
- minIndex = i;
+ // We should skip the bucket which last segment already been load
to memory, avoid record replicated index.
+ if (bucketL.lastSegmentEntryId > bucketL.getCurrentSegmentEntryId()
+ && bucketR.lastSegmentEntryId >
bucketR.getCurrentSegmentEntryId()
+ // Skip the bucket that is merging
+ && !bucketL.merging && !bucketR.merging){
+ long scheduleTimestamp =
+
Math.min(bucketL.firstScheduleTimestamps.get(bucketL.currentSegmentEntryId + 1),
+
bucketR.firstScheduleTimestamps.get(bucketR.currentSegmentEntryId + 1));
+ long numberMessages = bucketL.numberBucketDelayedMessages +
bucketR.numberBucketDelayedMessages;
+ if (scheduleTimestamp <= minScheduleTimestamp) {
+ minScheduleTimestamp = scheduleTimestamp;
+ if (numberMessages < minNumberMessages) {
+ minNumberMessages = numberMessages;
+ minIndex = i;
+ }
}
}
}
@@ -369,7 +375,14 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
log.info("[{}] Merging bucket snapshot, bucketAKey: {},
bucketBKey: {}", dispatcher.getName(),
immutableBucketA.bucketKey(),
immutableBucketB.bucketKey());
}
+
+ immutableBucketA.merging = true;
+ immutableBucketB.merging = true;
return asyncMergeBucketSnapshot(immutableBucketA,
immutableBucketB).whenComplete((__, ex) -> {
+ synchronized (this) {
+ immutableBucketA.merging = false;
+ immutableBucketB.merging = false;
+ }
if (ex != null) {
log.error("[{}] Failed to merge bucket snapshot, bucketAKey:
{}, bucketBKey: {}",
dispatcher.getName(), immutableBucketA.bucketKey(),
immutableBucketB.bucketKey(), ex);
@@ -382,46 +395,58 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
private synchronized CompletableFuture<Void>
asyncMergeBucketSnapshot(ImmutableBucket bucketA,
ImmutableBucket bucketB) {
-
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>
futureA =
- bucketA.getRemainSnapshotSegment();
-
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>
futureB =
- bucketB.getRemainSnapshotSegment();
- return futureA.thenCombine(futureB,
CombinedSegmentDelayedIndexQueue::wrap)
- .thenAccept(combinedDelayedIndexQueue -> {
- Pair<ImmutableBucket, DelayedIndex>
immutableBucketDelayedIndexPair =
-
lastMutableBucket.createImmutableBucketAndAsyncPersistent(
- timeStepPerBucketSnapshotSegmentInMillis,
maxIndexesPerBucketSnapshotSegment,
- sharedBucketPriorityQueue,
combinedDelayedIndexQueue, bucketA.startLedgerId,
- bucketB.endLedgerId);
-
- // Merge bit map to new bucket
- Map<Long, RoaringBitmap> delayedIndexBitMapA =
bucketA.getDelayedIndexBitMap();
- Map<Long, RoaringBitmap> delayedIndexBitMapB =
bucketB.getDelayedIndexBitMap();
- Map<Long, RoaringBitmap> delayedIndexBitMap = new
HashMap<>(delayedIndexBitMapA);
- delayedIndexBitMapB.forEach((ledgerId, bitMapB) -> {
- delayedIndexBitMap.compute(ledgerId, (k, bitMapA) -> {
- if (bitMapA == null) {
- return bitMapB;
- }
+ CompletableFuture<Long> createAFuture =
bucketA.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE);
+ CompletableFuture<Long> createBFuture =
bucketB.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE);
- bitMapA.or(bitMapB);
- return bitMapA;
- });
- });
-
immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap);
-
-
afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
+ return CompletableFuture.allOf(createAFuture,
createBFuture).thenCompose(bucketId -> {
+ if (INVALID_BUCKET_ID.equals(createAFuture.join()) ||
INVALID_BUCKET_ID.equals(createBFuture.join())) {
+ return FutureUtil.failedFuture(new RuntimeException("Can't
merge buckets due to bucket create failed"));
+ }
-
immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture()
- .orElse(NULL_LONG_PROMISE).thenCompose(___ -> {
- CompletableFuture<Void> removeAFuture =
bucketA.asyncDeleteBucketSnapshot();
- CompletableFuture<Void> removeBFuture =
bucketB.asyncDeleteBucketSnapshot();
- return CompletableFuture.allOf(removeAFuture,
removeBFuture);
+
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>
futureA =
+ bucketA.getRemainSnapshotSegment();
+
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>
futureB =
+ bucketB.getRemainSnapshotSegment();
+ return futureA.thenCombine(futureB,
CombinedSegmentDelayedIndexQueue::wrap)
+ .thenAccept(combinedDelayedIndexQueue -> {
+ synchronized (BucketDelayedDeliveryTracker.this) {
+ Pair<ImmutableBucket, DelayedIndex>
immutableBucketDelayedIndexPair =
+
lastMutableBucket.createImmutableBucketAndAsyncPersistent(
+
timeStepPerBucketSnapshotSegmentInMillis,
+ maxIndexesPerBucketSnapshotSegment,
+ sharedBucketPriorityQueue,
combinedDelayedIndexQueue, bucketA.startLedgerId,
+ bucketB.endLedgerId);
+
+ // Merge bit map to new bucket
+ Map<Long, RoaringBitmap> delayedIndexBitMapA =
bucketA.getDelayedIndexBitMap();
+ Map<Long, RoaringBitmap> delayedIndexBitMapB =
bucketB.getDelayedIndexBitMap();
+ Map<Long, RoaringBitmap> delayedIndexBitMap = new
HashMap<>(delayedIndexBitMapA);
+ delayedIndexBitMapB.forEach((ledgerId, bitMapB) ->
{
+ delayedIndexBitMap.compute(ledgerId, (k,
bitMapA) -> {
+ if (bitMapA == null) {
+ return bitMapB;
+ }
+
+ bitMapA.or(bitMapB);
+ return bitMapA;
+ });
+ });
+
immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap);
+
+
afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
+
+
immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture()
+ .orElse(NULL_LONG_PROMISE).thenCompose(___
-> {
+ CompletableFuture<Void> removeAFuture
= bucketA.asyncDeleteBucketSnapshot();
+ CompletableFuture<Void> removeBFuture
= bucketB.asyncDeleteBucketSnapshot();
+ return
CompletableFuture.allOf(removeAFuture, removeBFuture);
+ });
+
+
immutableBuckets.remove(Range.closed(bucketA.startLedgerId,
bucketA.endLedgerId));
+
immutableBuckets.remove(Range.closed(bucketB.startLedgerId,
bucketB.endLedgerId));
+ }
});
-
-
immutableBuckets.remove(Range.closed(bucketA.startLedgerId,
bucketA.endLedgerId));
-
immutableBuckets.remove(Range.closed(bucketB.startLedgerId,
bucketB.endLedgerId));
- });
+ });
}
@Override
@@ -477,6 +502,12 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
ImmutableBucket bucket =
snapshotSegmentLastIndexTable.get(ledgerId, entryId);
if (bucket != null &&
immutableBuckets.asMapOfRanges().containsValue(bucket)) {
+ if (bucket.merging) {
+ log.info("[{}] Skip load to wait for bucket snapshot merge
finish, bucketKey:{}",
+ dispatcher.getName(), bucket.bucketKey());
+ break;
+ }
+
final int preSegmentEntryId = bucket.currentSegmentEntryId;
if (log.isDebugEnabled()) {
log.debug("[{}] Loading next bucket snapshot segment,
bucketKey: {}, nextSegmentEntryId: {}",
@@ -525,7 +556,6 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
dispatcher.getName(), bucket.bucketKey(),
bucket.currentSegmentEntryId);
}
}).get(AsyncOperationTimeoutSeconds * MaxRetryTimes,
TimeUnit.SECONDS);
- snapshotSegmentLastIndexTable.remove(ledgerId, entryId);
} catch (Exception e) {
// Ignore exception to reload this segment on the next
schedule.
log.error("[{}] An exception occurs when load next bucket
snapshot, bucketKey:{}",
@@ -533,6 +563,7 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
break;
}
}
+ snapshotSegmentLastIndexTable.remove(ledgerId, entryId);
positions.add(new PositionImpl(ledgerId, entryId));
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
index c9223efa092..ab1c285011d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
@@ -22,6 +22,7 @@ import static
org.apache.bookkeeper.mledger.util.Futures.executeWithRetry;
import static
org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.AsyncOperationTimeoutSeconds;
import static
org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.NULL_LONG_PROMISE;
import com.google.protobuf.ByteString;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -44,7 +45,12 @@ import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
class ImmutableBucket extends Bucket {
@Setter
- private volatile
List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> snapshotSegments;
+ private List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>
snapshotSegments;
+
+ boolean merging = false;
+
+ @Setter
+ List<Long> firstScheduleTimestamps = new ArrayList<>();
ImmutableBucket(String dispatcherName, ManagedCursor cursor,
BucketSnapshotStorage storage, long startLedgerId, long
endLedgerId) {
@@ -92,6 +98,9 @@ class ImmutableBucket extends Bucket {
this.setLastSegmentEntryId(metadataList.size());
this.recoverDelayedIndexBitMapAndNumber(nextSnapshotEntryIndex, metadataList);
+ List<Long> firstScheduleTimestamps =
metadataList.stream().map(
+
SnapshotSegmentMetadata::getMinScheduleTimestamp).toList();
+
this.setFirstScheduleTimestamps(firstScheduleTimestamps);
return nextSnapshotEntryIndex + 1;
});
@@ -157,8 +166,10 @@ class ImmutableBucket extends Bucket {
return
bucketSnapshotStorage.getBucketSnapshotSegment(getAndUpdateBucketId(),
nextSegmentEntryId,
lastSegmentEntryId).whenComplete((__, ex) -> {
if (ex != null) {
- log.warn("[{}] Failed to get remain bucket snapshot
segment, bucketKey: {}.",
- dispatcherName, bucketKey(), ex);
+ log.warn(
+ "[{}] Failed to get remain bucket snapshot
segment, bucketKey: {},"
+ + " nextSegmentEntryId: {},
lastSegmentEntryId: {}",
+ dispatcherName, bucketKey(), nextSegmentEntryId,
lastSegmentEntryId, ex);
}
});
}, BucketSnapshotPersistenceException.class, MaxRetryTimes);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
index e743f39e692..1577bf8fa51 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -75,11 +75,15 @@ class MutableBucket extends Bucket implements AutoCloseable
{
SnapshotSegment.Builder snapshotSegmentBuilder =
SnapshotSegment.newBuilder();
SnapshotSegmentMetadata.Builder segmentMetadataBuilder =
SnapshotSegmentMetadata.newBuilder();
+ List<Long> firstScheduleTimestamps = new ArrayList<>();
long currentTimestampUpperLimit = 0;
+ long currentFirstTimestamp = 0L;
while (!delayedIndexQueue.isEmpty()) {
DelayedIndex delayedIndex = delayedIndexQueue.peek();
long timestamp = delayedIndex.getTimestamp();
if (currentTimestampUpperLimit == 0) {
+ currentFirstTimestamp = timestamp;
+ firstScheduleTimestamps.add(currentFirstTimestamp);
currentTimestampUpperLimit = timestamp +
timeStepPerBucketSnapshotSegment - 1;
}
@@ -104,6 +108,7 @@ class MutableBucket extends Bucket implements AutoCloseable
{
|| (maxIndexesPerBucketSnapshotSegment != -1
&& snapshotSegmentBuilder.getIndexesCount() >=
maxIndexesPerBucketSnapshotSegment)) {
segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
+
segmentMetadataBuilder.setMinScheduleTimestamp(currentFirstTimestamp);
currentTimestampUpperLimit = 0;
Iterator<Map.Entry<Long, RoaringBitmap>> iterator =
bitMap.entrySet().iterator();
@@ -134,6 +139,7 @@ class MutableBucket extends Bucket implements AutoCloseable
{
bucket.setCurrentSegmentEntryId(1);
bucket.setNumberBucketDelayedMessages(numMessages);
bucket.setLastSegmentEntryId(lastSegmentEntryId);
+ bucket.setFirstScheduleTimestamps(firstScheduleTimestamps);
// Skip first segment, because it has already been loaded
List<SnapshotSegment> snapshotSegments =
bucketSnapshotSegments.subList(1, bucketSnapshotSegments.size());
diff --git
a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
index 8414a583fe5..6996b860c52 100644
--- a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
+++ b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
@@ -31,6 +31,7 @@ message DelayedIndex {
message SnapshotSegmentMetadata {
map<uint64, bytes> delayed_index_bit_map = 1;
required uint64 max_schedule_timestamp = 2;
+ required uint64 min_schedule_timestamp = 3;
}
message SnapshotSegment {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
index 72052d22b85..a628b58e10d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
@@ -73,6 +73,7 @@ public class BookkeeperBucketSnapshotStorageTest extends
MockedPulsarServiceBase
public void testGetSnapshot() throws ExecutionException,
InterruptedException {
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata
segmentMetadata =
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
+ .setMinScheduleTimestamp(System.currentTimeMillis())
.setMaxScheduleTimestamp(System.currentTimeMillis())
.putDelayedIndexBitMap(100L, ByteString.copyFrom(new
byte[1])).build();
@@ -122,6 +123,7 @@ public class BookkeeperBucketSnapshotStorageTest extends
MockedPulsarServiceBase
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata
segmentMetadata =
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
.setMaxScheduleTimestamp(timeMillis)
+ .setMinScheduleTimestamp(timeMillis)
.putAllDelayedIndexBitMap(map).build();
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata
snapshotMetadata =
@@ -172,6 +174,7 @@ public class BookkeeperBucketSnapshotStorageTest extends
MockedPulsarServiceBase
public void testGetBucketSnapshotLength() throws ExecutionException,
InterruptedException {
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata
segmentMetadata =
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
+ .setMinScheduleTimestamp(System.currentTimeMillis())
.setMaxScheduleTimestamp(System.currentTimeMillis())
.putDelayedIndexBitMap(100L, ByteString.copyFrom(new
byte[1])).build();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
index 74101f00b96..0d53c278fd2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
@@ -52,6 +52,7 @@ import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import org.testcontainers.shaded.org.apache.commons.lang3.mutable.MutableLong;
import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -251,18 +252,28 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
}
@Test(dataProvider = "delayedTracker")
- public void testMergeSnapshot(BucketDelayedDeliveryTracker tracker) {
+ public void testMergeSnapshot(final BucketDelayedDeliveryTracker tracker) {
for (int i = 1; i <= 110; i++) {
tracker.addMessage(i, i, i * 10);
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertTrue(
+
tracker.getImmutableBuckets().asMapOfRanges().values().stream().noneMatch(x ->
x.merging));
+ });
}
assertEquals(110, tracker.getNumberOfDelayedMessages());
int size = tracker.getImmutableBuckets().asMapOfRanges().size();
- assertEquals(10, size);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(10, size);
+ });
tracker.addMessage(111, 1011, 111 * 10);
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertTrue(
+
tracker.getImmutableBuckets().asMapOfRanges().values().stream().noneMatch(x ->
x.merging));
+ });
MutableLong delayedMessagesInSnapshot = new MutableLong();
tracker.getImmutableBuckets().asMapOfRanges().forEach((k, v) -> {
@@ -271,26 +282,28 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
tracker.close();
- tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000,
clock,
- true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), -1,10);
+ BucketDelayedDeliveryTracker tracker2 = new
BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock,
+ true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), -1, 10);
- assertEquals(tracker.getNumberOfDelayedMessages(),
delayedMessagesInSnapshot.getValue());
+ assertEquals(tracker2.getNumberOfDelayedMessages(),
delayedMessagesInSnapshot.getValue());
for (int i = 1; i <= 110; i++) {
- tracker.addMessage(i, i, i * 10);
+ tracker2.addMessage(i, i, i * 10);
}
clockTime.set(110 * 10);
- NavigableSet<PositionImpl> scheduledMessages =
tracker.getScheduledMessages(110);
+ NavigableSet<PositionImpl> scheduledMessages =
tracker2.getScheduledMessages(110);
for (int i = 1; i <= 110; i++) {
PositionImpl position = scheduledMessages.pollFirst();
assertEquals(position, PositionImpl.get(i, i));
}
+
+ tracker2.close();
}
@Test(dataProvider = "delayedTracker")
- public void testWithBkException(BucketDelayedDeliveryTracker tracker) {
+ public void testWithBkException(final BucketDelayedDeliveryTracker
tracker) {
MockBucketSnapshotStorage mockBucketSnapshotStorage =
(MockBucketSnapshotStorage) bucketSnapshotStorage;
mockBucketSnapshotStorage.injectCreateException(
new BucketSnapshotPersistenceException("Bookie operation
timeout, op: Create entry"));
@@ -308,11 +321,25 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
for (int i = 1; i <= 110; i++) {
tracker.addMessage(i, i, i * 10);
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertTrue(
+
tracker.getImmutableBuckets().asMapOfRanges().values().stream().noneMatch(x ->
x.merging));
+ });
}
assertEquals(110, tracker.getNumberOfDelayedMessages());
+ int size = tracker.getImmutableBuckets().asMapOfRanges().size();
+
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(10, size);
+ });
+
tracker.addMessage(111, 1011, 111 * 10);
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertTrue(
+
tracker.getImmutableBuckets().asMapOfRanges().values().stream().noneMatch(x ->
x.merging));
+ });
MutableLong delayedMessagesInSnapshot = new MutableLong();
tracker.getImmutableBuckets().asMapOfRanges().forEach((k, v) -> {
@@ -321,11 +348,11 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
tracker.close();
- tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000,
clock,
+ BucketDelayedDeliveryTracker tracker2 = new
BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock,
true, bucketSnapshotStorage, 5,
TimeUnit.MILLISECONDS.toMillis(10), -1,10);
Long delayedMessagesInSnapshotValue =
delayedMessagesInSnapshot.getValue();
- assertEquals(tracker.getNumberOfDelayedMessages(),
delayedMessagesInSnapshotValue);
+ assertEquals(tracker2.getNumberOfDelayedMessages(),
delayedMessagesInSnapshotValue);
clockTime.set(110 * 10);
@@ -338,14 +365,16 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
mockBucketSnapshotStorage.injectGetSegmentException(
new BucketSnapshotPersistenceException("Bookie operation
timeout4, op: Get entry"));
- assertEquals(tracker.getScheduledMessages(100).size(), 0);
+ assertEquals(tracker2.getScheduledMessages(100).size(), 0);
- assertEquals(tracker.getScheduledMessages(100).size(),
delayedMessagesInSnapshotValue);
+ assertEquals(tracker2.getScheduledMessages(100).size(),
delayedMessagesInSnapshotValue);
assertTrue(mockBucketSnapshotStorage.createExceptionQueue.isEmpty());
assertTrue(mockBucketSnapshotStorage.getMetaDataExceptionQueue.isEmpty());
assertTrue(mockBucketSnapshotStorage.getSegmentExceptionQueue.isEmpty());
assertTrue(mockBucketSnapshotStorage.deleteExceptionQueue.isEmpty());
+
+ tracker2.close();
}
@Test(dataProvider = "delayedTracker")
@@ -390,6 +419,8 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
tracker.getImmutableBuckets().asMapOfRanges().forEach((k, bucket) -> {
assertEquals(bucket.getLastSegmentEntryId(), 4);
});
+
+ tracker.close();
}
@Test(dataProvider = "delayedTracker")
@@ -408,5 +439,7 @@ public class BucketDelayedDeliveryTrackerTest extends
AbstractDeliveryTrackerTes
assertEquals(tracker.getImmutableBuckets().asMapOfRanges().size(), 0);
assertEquals(tracker.getLastMutableBucket().size(), 0);
assertEquals(tracker.getSharedBucketPriorityQueue().size(), 0);
+
+ tracker.close();
}
}