This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 61cb7ca Use java.time.Clock in managed ledger (#1642)
61cb7ca is described below
commit 61cb7caa920f468b6fb53176e2994cefb8cbcae3
Author: Ivan Kelly <[email protected]>
AuthorDate: Thu Apr 26 23:49:33 2018 +0200
Use java.time.Clock in managed ledger (#1642)
Rather than direct calls to currentTimeMillis. This allows us to test
time based functionallity with a mocked clock, and avoid Thread.sleep.
---
.../bookkeeper/mledger/ManagedLedgerConfig.java | 21 +++++++++++++++++
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 7 ++++--
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 27 ++++++++++++----------
3 files changed, 41 insertions(+), 14 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 3cb3fa9..f3add7c 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -22,6 +22,7 @@ import static
com.google.common.base.Preconditions.checkArgument;
import com.google.common.annotations.Beta;
import com.google.common.base.Charsets;
+import java.time.Clock;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.api.DigestType;
@@ -57,6 +58,7 @@ public class ManagedLedgerConfig {
private DigestType digestType = DigestType.CRC32C;
private byte[] password = "".getBytes(Charsets.UTF_8);
private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
+ private Clock clock = Clock.systemUTC();
public boolean isCreateIfMissing() {
return createIfMissing;
@@ -445,4 +447,23 @@ public class ManagedLedgerConfig {
this.ledgerOffloader = offloader;
return this;
}
+
+ /**
+ * Get clock to use to time operations
+ *
+ * @return a clock
+ */
+ public Clock getClock() {
+ return clock;
+ }
+
+ /**
+ * Set clock to use for time operations
+ *
+ * @param clock the clock to use
+ */
+ public ManagedLedgerConfig setClock(Clock clock) {
+ this.clock = clock;
+ return this;
+ }
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 6d9e0de..695f22d 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -38,6 +38,7 @@ import com.google.common.collect.TreeRangeSet;
import com.google.common.util.concurrent.RateLimiter;
import com.google.protobuf.InvalidProtocolBufferException;
+import java.time.Clock;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
@@ -155,6 +156,7 @@ public class ManagedCursorImpl implements ManagedCursor {
@SuppressWarnings("unused")
private volatile int pendingMarkDeletedSubmittedCount = 0;
private long lastLedgerSwitchTimestamp;
+ private final Clock clock;
enum State {
Uninitialized, // Cursor is being initialized
@@ -186,7 +188,8 @@ public class ManagedCursorImpl implements ManagedCursor {
PENDING_READ_OPS_UPDATER.set(this, 0);
RESET_CURSOR_IN_PROGRESS_UPDATER.set(this, FALSE);
WAITING_READ_OP_UPDATER.set(this, null);
- this.lastLedgerSwitchTimestamp = System.currentTimeMillis();
+ this.clock = config.getClock();
+ this.lastLedgerSwitchTimestamp = this.clock.millis();
if (config.getThrottleMarkDelete() > 0.0) {
markDeleteLimiter =
RateLimiter.create(config.getThrottleMarkDelete());
@@ -2110,7 +2113,7 @@ public class ManagedCursorImpl implements ManagedCursor {
}
boolean shouldCloseLedger(LedgerHandle lh) {
- long now = System.currentTimeMillis();
+ long now = clock.millis();
if ((lh.getLastAddConfirmed() >=
config.getMetadataMaxEntriesPerLedger()
|| lastLedgerSwitchTimestamp < (now -
config.getLedgerRolloverTimeout() * 1000))
&& STATE_UPDATER.get(this) != State.Closed) {
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 50bc85d..59f4ad7 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -34,6 +34,7 @@ import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import java.time.Clock;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -209,6 +210,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
private final OrderedExecutor executor;
final ManagedLedgerFactoryImpl factory;
protected final ManagedLedgerMBeanImpl mbean;
+ private final Clock clock;
/**
* Queue of pending entries to be added to the managed ledger. Typically
entries are queued when a new ledger is
@@ -239,6 +241,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
this.waitingCursors = Queues.newConcurrentLinkedQueue();
this.uninitializedCursors = Maps.newHashMap();
this.updateCursorRateLimit = RateLimiter.create(1);
+ this.clock = config.getClock();
// Get the next rollover time. Add a random value upto 5% to avoid
rollover multiple ledgers at the same time
this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs()
* (1 + random.nextDouble() * 5 / 100.0));
@@ -274,7 +277,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
if (rc == BKException.Code.OK) {
LedgerInfo info =
LedgerInfo.newBuilder().setLedgerId(id)
.setEntries(lh.getLastAddConfirmed() +
1).setSize(lh.getLength())
-
.setTimestamp(System.currentTimeMillis()).build();
+ .setTimestamp(clock.millis()).build();
ledgers.put(id, info);
initializeBookKeeper(callback);
} else if (rc ==
BKException.Code.NoSuchLedgerExistsException) {
@@ -367,7 +370,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
log.info("[{}] Created ledger {}", name, lh.getId());
STATE_UPDATER.set(this, State.LedgerOpened);
- lastLedgerCreatedTimestamp =
System.currentTimeMillis();
+ lastLedgerCreatedTimestamp = clock.millis();
currentLedger = lh;
lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
@@ -541,7 +544,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
log.debug("[{}] Queue addEntry request", name);
}
} else if (state == State.ClosedLedger) {
- long now = System.currentTimeMillis();
+ long now = clock.millis();
if (now < lastLedgerCreationFailureTimestamp +
WaitTimeAfterLedgerCreationFailureMs) {
// Deny the write request, since we haven't waited enough time
since last attempt to create a new ledger
pendingAddEntries.remove(addOperation);
@@ -860,7 +863,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
MessageMetadata msgMetadata = null;
try {
msgMetadata =
Commands.parseMessageMetadata(entry.getDataBuffer());
- long msgTimeSincePublish =
(System.currentTimeMillis() - msgMetadata.getPublishTime());
+ long msgTimeSincePublish = (clock.millis() -
msgMetadata.getPublishTime());
if (msgTimeSincePublish >
maxMessageCacheRetentionTimeMillis) {
cursor.setInactive();
}
@@ -1136,7 +1139,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
// Empty the list of pending requests and make all of them fail
clearPendingAddEntries(status);
- lastLedgerCreationFailureTimestamp = System.currentTimeMillis();
+ lastLedgerCreationFailureTimestamp = clock.millis();
STATE_UPDATER.set(this, State.ClosedLedger);
} else {
log.info("[{}] Created new ledger {}", name, lh.getId());
@@ -1189,7 +1192,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
ledgersListMutex.unlock();
synchronized (ManagedLedgerImpl.this) {
- lastLedgerCreationFailureTimestamp =
System.currentTimeMillis();
+ lastLedgerCreationFailureTimestamp = clock.millis();
STATE_UPDATER.set(ManagedLedgerImpl.this,
State.ClosedLedger);
clearPendingAddEntries(e);
}
@@ -1215,7 +1218,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
public synchronized void updateLedgersIdsComplete(Stat stat) {
STATE_UPDATER.set(this, State.LedgerOpened);
- lastLedgerCreatedTimestamp = System.currentTimeMillis();
+ lastLedgerCreatedTimestamp = clock.millis();
if (log.isDebugEnabled()) {
log.debug("[{}] Resending {} pending messages", name,
pendingAddEntries.size());
@@ -1269,7 +1272,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
if (entriesInLedger > 0) {
LedgerInfo info =
LedgerInfo.newBuilder().setLedgerId(lh.getId()).setEntries(entriesInLedger)
-
.setSize(lh.getLength()).setTimestamp(System.currentTimeMillis()).build();
+
.setSize(lh.getLength()).setTimestamp(clock.millis()).build();
ledgers.put(lh.getId(), info);
} else {
// The last ledger was empty, so we can discard it
@@ -1546,7 +1549,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
return false;
}
- long elapsedMs = System.currentTimeMillis() - ledgerTimestamp;
+ long elapsedMs = clock.millis() - ledgerTimestamp;
return elapsedMs > config.getRetentionTimeMillis();
}
@@ -1610,7 +1613,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
log.debug(
"[{}] Checking ledger {} -- time-old: {} sec -- "
+ "expired: {} -- over-quota: {} --
current-ledger: {}",
- name, ls.getLedgerId(), (System.currentTimeMillis() -
ls.getTimestamp()) / 1000.0, expired,
+ name, ls.getLedgerId(), (clock.millis() -
ls.getTimestamp()) / 1000.0, expired,
overRetentionQuota, currentLedger.getId());
}
if (ls.getLedgerId() == currentLedger.getId() || (!expired &&
!overRetentionQuota)) {
@@ -2405,7 +2408,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
boolean spaceQuotaReached = (currentLedgerEntries >=
config.getMaxEntriesPerLedger()
|| currentLedgerSize >= (config.getMaxSizePerLedgerMb() *
MegaByte));
- long timeSinceLedgerCreationMs = System.currentTimeMillis() -
lastLedgerCreatedTimestamp;
+ long timeSinceLedgerCreationMs = clock.millis() -
lastLedgerCreatedTimestamp;
boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >=
maximumRolloverTimeMs;
if (spaceQuotaReached || maxLedgerTimeReached) {
@@ -2414,7 +2417,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
boolean switchLedger = timeSinceLedgerCreationMs >
config.getMinimumRolloverTimeMs();
if (log.isDebugEnabled()) {
log.debug("Diff: {}, threshold: {} -- switch: {}",
- System.currentTimeMillis() -
lastLedgerCreatedTimestamp, config.getMinimumRolloverTimeMs(),
+ clock.millis() - lastLedgerCreatedTimestamp,
config.getMinimumRolloverTimeMs(),
switchLedger);
}
return switchLedger;
--
To stop receiving notification emails like this one, please contact
[email protected].