merlimat closed pull request #1642: Use java.time.Clock in managed ledger
URL: https://github.com/apache/incubator-pulsar/pull/1642
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 3cb3fa9209..f3add7c4fc 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -22,6 +22,7 @@
import com.google.common.annotations.Beta;
import com.google.common.base.Charsets;
+import java.time.Clock;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.api.DigestType;
@@ -57,6 +58,7 @@
private DigestType digestType = DigestType.CRC32C;
private byte[] password = "".getBytes(Charsets.UTF_8);
private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
+ private Clock clock = Clock.systemUTC();
public boolean isCreateIfMissing() {
return createIfMissing;
@@ -445,4 +447,23 @@ public ManagedLedgerConfig
setLedgerOffloader(LedgerOffloader offloader) {
this.ledgerOffloader = offloader;
return this;
}
+
+ /**
+ * Get clock to use to time operations
+ *
+ * @return a clock
+ */
+ public Clock getClock() {
+ return clock;
+ }
+
+ /**
+ * Set clock to use for time operations
+ *
+ * @param clock the clock to use
+ */
+ public ManagedLedgerConfig setClock(Clock clock) {
+ this.clock = clock;
+ return this;
+ }
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 6d9e0de284..695f22dcd6 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -38,6 +38,7 @@
import com.google.common.util.concurrent.RateLimiter;
import com.google.protobuf.InvalidProtocolBufferException;
+import java.time.Clock;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
@@ -155,6 +156,7 @@ public MarkDeleteEntry(PositionImpl newPosition,
Map<String, Long> properties,
@SuppressWarnings("unused")
private volatile int pendingMarkDeletedSubmittedCount = 0;
private long lastLedgerSwitchTimestamp;
+ private final Clock clock;
enum State {
Uninitialized, // Cursor is being initialized
@@ -186,7 +188,8 @@ public MarkDeleteEntry(PositionImpl newPosition,
Map<String, Long> properties,
PENDING_READ_OPS_UPDATER.set(this, 0);
RESET_CURSOR_IN_PROGRESS_UPDATER.set(this, FALSE);
WAITING_READ_OP_UPDATER.set(this, null);
- this.lastLedgerSwitchTimestamp = System.currentTimeMillis();
+ this.clock = config.getClock();
+ this.lastLedgerSwitchTimestamp = this.clock.millis();
if (config.getThrottleMarkDelete() > 0.0) {
markDeleteLimiter =
RateLimiter.create(config.getThrottleMarkDelete());
@@ -2110,7 +2113,7 @@ public void operationFailed(MetaStoreException e) {
}
boolean shouldCloseLedger(LedgerHandle lh) {
- long now = System.currentTimeMillis();
+ long now = clock.millis();
if ((lh.getLastAddConfirmed() >=
config.getMetadataMaxEntriesPerLedger()
|| lastLedgerSwitchTimestamp < (now -
config.getLedgerRolloverTimeout() * 1000))
&& STATE_UPDATER.get(this) != State.Closed) {
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 50bc85d445..59f4ad7b12 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -34,6 +34,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import java.time.Clock;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -209,6 +210,7 @@
private final OrderedExecutor executor;
final ManagedLedgerFactoryImpl factory;
protected final ManagedLedgerMBeanImpl mbean;
+ private final Clock clock;
/**
* Queue of pending entries to be added to the managed ledger. Typically
entries are queued when a new ledger is
@@ -239,6 +241,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory,
BookKeeper bookKeeper
this.waitingCursors = Queues.newConcurrentLinkedQueue();
this.uninitializedCursors = Maps.newHashMap();
this.updateCursorRateLimit = RateLimiter.create(1);
+ this.clock = config.getClock();
// Get the next rollover time. Add a random value upto 5% to avoid
rollover multiple ledgers at the same time
this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs()
* (1 + random.nextDouble() * 5 / 100.0));
@@ -274,7 +277,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo,
Stat stat) {
if (rc == BKException.Code.OK) {
LedgerInfo info =
LedgerInfo.newBuilder().setLedgerId(id)
.setEntries(lh.getLastAddConfirmed() +
1).setSize(lh.getLength())
-
.setTimestamp(System.currentTimeMillis()).build();
+ .setTimestamp(clock.millis()).build();
ledgers.put(id, info);
initializeBookKeeper(callback);
} else if (rc ==
BKException.Code.NoSuchLedgerExistsException) {
@@ -367,7 +370,7 @@ public void operationFailed(MetaStoreException e) {
log.info("[{}] Created ledger {}", name, lh.getId());
STATE_UPDATER.set(this, State.LedgerOpened);
- lastLedgerCreatedTimestamp =
System.currentTimeMillis();
+ lastLedgerCreatedTimestamp = clock.millis();
currentLedger = lh;
lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
@@ -541,7 +544,7 @@ private synchronized void internalAsyncAddEntry(OpAddEntry
addOperation) {
log.debug("[{}] Queue addEntry request", name);
}
} else if (state == State.ClosedLedger) {
- long now = System.currentTimeMillis();
+ long now = clock.millis();
if (now < lastLedgerCreationFailureTimestamp +
WaitTimeAfterLedgerCreationFailureMs) {
// Deny the write request, since we haven't waited enough time
since last attempt to create a new ledger
pendingAddEntries.remove(addOperation);
@@ -860,7 +863,7 @@ public void readEntryComplete(Entry entry, Object ctx) {
MessageMetadata msgMetadata = null;
try {
msgMetadata =
Commands.parseMessageMetadata(entry.getDataBuffer());
- long msgTimeSincePublish =
(System.currentTimeMillis() - msgMetadata.getPublishTime());
+ long msgTimeSincePublish = (clock.millis() -
msgMetadata.getPublishTime());
if (msgTimeSincePublish >
maxMessageCacheRetentionTimeMillis) {
cursor.setInactive();
}
@@ -1136,7 +1139,7 @@ public synchronized void createComplete(int rc, final
LedgerHandle lh, Object ct
// Empty the list of pending requests and make all of them fail
clearPendingAddEntries(status);
- lastLedgerCreationFailureTimestamp = System.currentTimeMillis();
+ lastLedgerCreationFailureTimestamp = clock.millis();
STATE_UPDATER.set(this, State.ClosedLedger);
} else {
log.info("[{}] Created new ledger {}", name, lh.getId());
@@ -1189,7 +1192,7 @@ public void operationFailed(MetaStoreException e) {
ledgersListMutex.unlock();
synchronized (ManagedLedgerImpl.this) {
- lastLedgerCreationFailureTimestamp =
System.currentTimeMillis();
+ lastLedgerCreationFailureTimestamp = clock.millis();
STATE_UPDATER.set(ManagedLedgerImpl.this,
State.ClosedLedger);
clearPendingAddEntries(e);
}
@@ -1215,7 +1218,7 @@ private void
updateLedgersListAfterRollover(MetaStoreCallback<Void> callback) {
public synchronized void updateLedgersIdsComplete(Stat stat) {
STATE_UPDATER.set(this, State.LedgerOpened);
- lastLedgerCreatedTimestamp = System.currentTimeMillis();
+ lastLedgerCreatedTimestamp = clock.millis();
if (log.isDebugEnabled()) {
log.debug("[{}] Resending {} pending messages", name,
pendingAddEntries.size());
@@ -1269,7 +1272,7 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
}
if (entriesInLedger > 0) {
LedgerInfo info =
LedgerInfo.newBuilder().setLedgerId(lh.getId()).setEntries(entriesInLedger)
-
.setSize(lh.getLength()).setTimestamp(System.currentTimeMillis()).build();
+
.setSize(lh.getLength()).setTimestamp(clock.millis()).build();
ledgers.put(lh.getId(), info);
} else {
// The last ledger was empty, so we can discard it
@@ -1546,7 +1549,7 @@ private boolean hasLedgerRetentionExpired(long
ledgerTimestamp) {
return false;
}
- long elapsedMs = System.currentTimeMillis() - ledgerTimestamp;
+ long elapsedMs = clock.millis() - ledgerTimestamp;
return elapsedMs > config.getRetentionTimeMillis();
}
@@ -1610,7 +1613,7 @@ void internalTrimConsumedLedgers() {
log.debug(
"[{}] Checking ledger {} -- time-old: {} sec -- "
+ "expired: {} -- over-quota: {} --
current-ledger: {}",
- name, ls.getLedgerId(), (System.currentTimeMillis() -
ls.getTimestamp()) / 1000.0, expired,
+ name, ls.getLedgerId(), (clock.millis() -
ls.getTimestamp()) / 1000.0, expired,
overRetentionQuota, currentLedger.getId());
}
if (ls.getLedgerId() == currentLedger.getId() || (!expired &&
!overRetentionQuota)) {
@@ -2405,7 +2408,7 @@ private boolean currentLedgerIsFull() {
boolean spaceQuotaReached = (currentLedgerEntries >=
config.getMaxEntriesPerLedger()
|| currentLedgerSize >= (config.getMaxSizePerLedgerMb() *
MegaByte));
- long timeSinceLedgerCreationMs = System.currentTimeMillis() -
lastLedgerCreatedTimestamp;
+ long timeSinceLedgerCreationMs = clock.millis() -
lastLedgerCreatedTimestamp;
boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >=
maximumRolloverTimeMs;
if (spaceQuotaReached || maxLedgerTimeReached) {
@@ -2414,7 +2417,7 @@ private boolean currentLedgerIsFull() {
boolean switchLedger = timeSinceLedgerCreationMs >
config.getMinimumRolloverTimeMs();
if (log.isDebugEnabled()) {
log.debug("Diff: {}, threshold: {} -- switch: {}",
- System.currentTimeMillis() -
lastLedgerCreatedTimestamp, config.getMinimumRolloverTimeMs(),
+ clock.millis() - lastLedgerCreatedTimestamp,
config.getMinimumRolloverTimeMs(),
switchLedger);
}
return switchLedger;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services