This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new 63df4cd Perform periodic flush of ManagedCursor mark-delete
posistions (#8634)
63df4cd is described below
commit 63df4cdbdfe2234d140b21ecee3a5f71b6a8f7f9
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Nov 19 16:30:14 2020 -0800
Perform periodic flush of ManagedCursor mark-delete posistions (#8634)
---
conf/broker.conf | 4 ++
conf/standalone.conf | 4 ++
.../mledger/ManagedLedgerFactoryConfig.java | 5 ++
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 22 +++++++++
.../mledger/impl/ManagedLedgerFactoryImpl.java | 15 ++++++
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 53 +++++++++++++++++++++-
.../apache/pulsar/broker/ServiceConfiguration.java | 5 ++
.../pulsar/broker/ManagedLedgerClientFactory.java | 1 +
8 files changed, 107 insertions(+), 2 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index ca50d27..d1a1238 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -726,6 +726,10 @@ managedLedgerDefaultWriteQuorum=2
# Number of guaranteed copies (acks to wait before write is complete)
managedLedgerDefaultAckQuorum=2
+# How frequently to flush the cursor positions that were accumulated due to
rate limiting. (seconds).
+# Default is 60 seconds
+managedLedgerCursorPositionFlushSeconds = 60
+
# Default type of checksum to use when writing to BookKeeper. Default is
"CRC32C"
# Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum).
managedLedgerDigestType=CRC32C
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 656cb52..8e41f84 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -508,6 +508,10 @@ managedLedgerDefaultWriteQuorum=1
# Number of guaranteed copies (acks to wait before write is complete)
managedLedgerDefaultAckQuorum=1
+# How frequently to flush the cursor positions that were accumulated due to
rate limiting. (seconds).
+# Default is 60 seconds
+managedLedgerCursorPositionFlushSeconds = 60
+
# Default type of checksum to use when writing to BookKeeper. Default is
"CRC32C"
# Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum).
managedLedgerDigestType=CRC32C
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
index e42befc..ebfa23c 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
@@ -63,6 +63,11 @@ public class ManagedLedgerFactoryConfig {
private int prometheusStatsLatencyRolloverSeconds = 60;
/**
+ * How frequently to flush the cursor positions that were accumulated due
to rate limiting.
+ */
+ private int cursorPositionFlushSeconds = 60;
+
+ /**
* cluster name for prometheus stats
*/
private String clusterName;
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index d752466..5865d7e 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -170,6 +170,9 @@ public class ManagedCursorImpl implements ManagedCursor {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private RateLimiter markDeleteLimiter;
+ // The cursor is considered "dirty" when there are mark-delete updates
that are only applied in memory,
+ // because of the rate limiting.
+ private volatile boolean isDirty = false;
private boolean alwaysInactive = false;
@@ -1633,6 +1636,7 @@ public class ManagedCursorImpl implements ManagedCursor {
// Apply rate limiting to mark-delete operations
if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
+ isDirty = true;
lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, properties,
null, null);
callback.markDeleteComplete(ctx);
return;
@@ -2877,6 +2881,24 @@ public class ManagedCursorImpl implements ManagedCursor {
this.entriesReadSize += readEntriesSize;
}
+ void flush() {
+ if (!isDirty) {
+ return;
+ }
+
+ isDirty = false;
+ asyncMarkDelete(lastMarkDeleteEntry.newPosition,
lastMarkDeleteEntry.properties, new MarkDeleteCallback() {
+ @Override
+ public void markDeleteComplete(Object ctx) {
+ }
+
+ @Override
+ public void markDeleteFailed(ManagedLedgerException exception,
Object ctx) {
+ log.warn("[{}][{}] Failed to flush mark-delete position",
ledger.getName(), name, exception);
+ }
+ }, null);
+ }
+
private int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
if (maxSizeBytes == NO_MAX_SIZE_LIMIT) {
return maxEntries;
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 8b470c2..0faba71 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -108,6 +108,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
private long lastStatTimestamp = System.nanoTime();
private final ScheduledFuture<?> statsTask;
+ private final ScheduledFuture<?> flushCursorsTask;
private final long cacheEvictionTimeThresholdNanos;
private final MetadataStore metadataStore;
@@ -202,6 +203,8 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new EntryCacheManager(this);
this.statsTask =
scheduledExecutor.scheduleAtFixedRate(this::refreshStats, 0,
StatsPeriodSeconds, TimeUnit.SECONDS);
+ this.flushCursorsTask =
scheduledExecutor.scheduleAtFixedRate(this::flushCursors,
+ config.getCursorPositionFlushSeconds(),
config.getCursorPositionFlushSeconds(), TimeUnit.SECONDS);
this.cacheEvictionTimeThresholdNanos = TimeUnit.MILLISECONDS
@@ -230,6 +233,17 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
}
}
+ private synchronized void flushCursors() {
+ ledgers.values().forEach(mlfuture -> {
+ if (mlfuture.isDone() && !mlfuture.isCompletedExceptionally()) {
+ ManagedLedgerImpl ml = mlfuture.getNow(null);
+ if (ml != null) {
+ ml.getCursors().forEach(c -> ((ManagedCursorImpl)
c).flush());
+ }
+ }
+ });
+ }
+
private synchronized void refreshStats() {
long now = System.nanoTime();
long period = now - lastStatTimestamp;
@@ -483,6 +497,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
@Override
public void shutdown() throws InterruptedException, ManagedLedgerException
{
statsTask.cancel(true);
+ flushCursorsTask.cancel(true);
int numLedgers = ledgers.size();
final CountDownLatch latch = new CountDownLatch(numLedgers);
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index f0d37ad..a4f8aa5 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -3202,8 +3202,9 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
}
return result;
}
-
- void testReadEntriesOrWaitWithMaxSize() throws Exception {
+
+ @Test
+ public void testReadEntriesOrWaitWithMaxSize() throws Exception {
ManagedLedger ledger =
factory.open("testReadEntriesOrWaitWithMaxSize");
ManagedCursor c = ledger.openCursor("c");
@@ -3227,5 +3228,53 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
entries.forEach(e -> e.release());
}
+ @Test
+ public void testFlushCursorAfterInactivity() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setThrottleMarkDelete(1.0);
+
+ ManagedLedgerFactoryConfig factoryConfig = new
ManagedLedgerFactoryConfig();
+ factoryConfig.setCursorPositionFlushSeconds(1);
+ ManagedLedgerFactory factory1 = new ManagedLedgerFactoryImpl(bkc,
bkc.getZkHandle(), factoryConfig);
+ ManagedLedger ledger1 =
factory1.open("testFlushCursorAfterInactivity", config);
+ ManagedCursor c1 = ledger1.openCursor("c");
+ List<Position> positions = new ArrayList<Position>();
+
+ for (int i = 0; i < 20; i++) {
+ positions.add(ledger1.addEntry(new byte[1024]));
+ }
+
+ CountDownLatch latch = new CountDownLatch(positions.size());
+
+ positions.forEach(p -> c1.asyncMarkDelete(p, new MarkDeleteCallback() {
+ @Override
+ public void markDeleteComplete(Object ctx) {
+ latch.countDown();
+ }
+
+ @Override
+ public void markDeleteFailed(ManagedLedgerException exception,
Object ctx) {
+ throw new RuntimeException(exception);
+ }
+ }, null));
+
+ latch.await();
+
+ assertEquals(c1.getMarkDeletedPosition(),
positions.get(positions.size() - 1));
+
+ // Give chance to the flush to be automatically triggered.
+ Thread.sleep(3000);
+
+ // Abruptly re-open the managed ledger without graceful close
+ ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc,
bkc.getZkHandle());
+ ManagedLedger ledger2 =
factory2.open("testFlushCursorAfterInactivity", config);
+ ManagedCursor c2 = ledger2.openCursor("c");
+
+ assertEquals(c2.getMarkDeletedPosition(),
positions.get(positions.size() - 1));
+
+ factory1.shutdown();
+ factory2.shutdown();
+ }
+
private static final Logger log =
LoggerFactory.getLogger(ManagedCursorTest.class);
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 48b07ff..5a03630 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1113,6 +1113,11 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private int managedLedgerDefaultAckQuorum = 2;
+ @FieldContext(minValue = 1,
+ category = CATEGORY_STORAGE_ML,
+ doc = "How frequently to flush the cursor positions that were
accumulated due to rate limiting. (seconds). Default is 60 seconds")
+ private int managedLedgerCursorPositionFlushSeconds = 60;
+
//
//
@FieldContext(
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index d96b32b..7254acb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -65,6 +65,7 @@ public class ManagedLedgerClientFactory implements Closeable {
managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries());
managedLedgerFactoryConfig.setPrometheusStatsLatencyRolloverSeconds(conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution());
+
managedLedgerFactoryConfig.setCursorPositionFlushSeconds(conf.getManagedLedgerCursorPositionFlushSeconds());
Configuration configuration = new ClientConfiguration();
if (conf.isBookkeeperClientExposeStatsToPrometheus()) {