This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 61cb7ca  Use java.time.Clock in managed ledger (#1642)
61cb7ca is described below

commit 61cb7caa920f468b6fb53176e2994cefb8cbcae3
Author: Ivan Kelly <[email protected]>
AuthorDate: Thu Apr 26 23:49:33 2018 +0200

    Use java.time.Clock in managed ledger (#1642)
    
    Rather than direct calls to currentTimeMillis. This allows us to test
    time based functionallity with a mocked clock, and avoid Thread.sleep.
---
 .../bookkeeper/mledger/ManagedLedgerConfig.java    | 21 +++++++++++++++++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  7 ++++--
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 27 ++++++++++++----------
 3 files changed, 41 insertions(+), 14 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 3cb3fa9..f3add7c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -22,6 +22,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.annotations.Beta;
 import com.google.common.base.Charsets;
+import java.time.Clock;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.client.api.DigestType;
@@ -57,6 +58,7 @@ public class ManagedLedgerConfig {
     private DigestType digestType = DigestType.CRC32C;
     private byte[] password = "".getBytes(Charsets.UTF_8);
     private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
+    private Clock clock = Clock.systemUTC();
 
     public boolean isCreateIfMissing() {
         return createIfMissing;
@@ -445,4 +447,23 @@ public class ManagedLedgerConfig {
         this.ledgerOffloader = offloader;
         return this;
     }
+
+    /**
+     * Get clock to use to time operations
+     *
+     * @return a clock
+     */
+    public Clock getClock() {
+        return clock;
+    }
+
+    /**
+     * Set clock to use for time operations
+     *
+     * @param clock the clock to use
+     */
+    public ManagedLedgerConfig setClock(Clock clock) {
+        this.clock = clock;
+        return this;
+    }
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 6d9e0de..695f22d 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -38,6 +38,7 @@ import com.google.common.collect.TreeRangeSet;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.protobuf.InvalidProtocolBufferException;
 
+import java.time.Clock;
 import java.util.ArrayDeque;
 import java.util.Collections;
 import java.util.List;
@@ -155,6 +156,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     @SuppressWarnings("unused")
     private volatile int pendingMarkDeletedSubmittedCount = 0;
     private long lastLedgerSwitchTimestamp;
+    private final Clock clock;
 
     enum State {
         Uninitialized, // Cursor is being initialized
@@ -186,7 +188,8 @@ public class ManagedCursorImpl implements ManagedCursor {
         PENDING_READ_OPS_UPDATER.set(this, 0);
         RESET_CURSOR_IN_PROGRESS_UPDATER.set(this, FALSE);
         WAITING_READ_OP_UPDATER.set(this, null);
-        this.lastLedgerSwitchTimestamp = System.currentTimeMillis();
+        this.clock = config.getClock();
+        this.lastLedgerSwitchTimestamp = this.clock.millis();
 
         if (config.getThrottleMarkDelete() > 0.0) {
             markDeleteLimiter = 
RateLimiter.create(config.getThrottleMarkDelete());
@@ -2110,7 +2113,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     }
 
     boolean shouldCloseLedger(LedgerHandle lh) {
-        long now = System.currentTimeMillis();
+        long now = clock.millis();
         if ((lh.getLastAddConfirmed() >= 
config.getMetadataMaxEntriesPerLedger()
                 || lastLedgerSwitchTimestamp < (now - 
config.getLedgerRolloverTimeout() * 1000))
                 && STATE_UPDATER.get(this) != State.Closed) {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 50bc85d..59f4ad7 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -34,6 +34,7 @@ import com.google.protobuf.ByteString;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
+import java.time.Clock;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -209,6 +210,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     private final OrderedExecutor executor;
     final ManagedLedgerFactoryImpl factory;
     protected final ManagedLedgerMBeanImpl mbean;
+    private final Clock clock;
 
     /**
      * Queue of pending entries to be added to the managed ledger. Typically 
entries are queued when a new ledger is
@@ -239,6 +241,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         this.waitingCursors = Queues.newConcurrentLinkedQueue();
         this.uninitializedCursors = Maps.newHashMap();
         this.updateCursorRateLimit = RateLimiter.create(1);
+        this.clock = config.getClock();
 
         // Get the next rollover time. Add a random value upto 5% to avoid 
rollover multiple ledgers at the same time
         this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs() 
* (1 + random.nextDouble() * 5 / 100.0));
@@ -274,7 +277,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                             if (rc == BKException.Code.OK) {
                                 LedgerInfo info = 
LedgerInfo.newBuilder().setLedgerId(id)
                                         .setEntries(lh.getLastAddConfirmed() + 
1).setSize(lh.getLength())
-                                        
.setTimestamp(System.currentTimeMillis()).build();
+                                        .setTimestamp(clock.millis()).build();
                                 ledgers.put(id, info);
                                 initializeBookKeeper(callback);
                             } else if (rc == 
BKException.Code.NoSuchLedgerExistsException) {
@@ -367,7 +370,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
                         log.info("[{}] Created ledger {}", name, lh.getId());
                         STATE_UPDATER.set(this, State.LedgerOpened);
-                        lastLedgerCreatedTimestamp = 
System.currentTimeMillis();
+                        lastLedgerCreatedTimestamp = clock.millis();
                         currentLedger = lh;
 
                         lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
@@ -541,7 +544,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 log.debug("[{}] Queue addEntry request", name);
             }
         } else if (state == State.ClosedLedger) {
-            long now = System.currentTimeMillis();
+            long now = clock.millis();
             if (now < lastLedgerCreationFailureTimestamp + 
WaitTimeAfterLedgerCreationFailureMs) {
                 // Deny the write request, since we haven't waited enough time 
since last attempt to create a new ledger
                 pendingAddEntries.remove(addOperation);
@@ -860,7 +863,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                             MessageMetadata msgMetadata = null;
                             try {
                                 msgMetadata = 
Commands.parseMessageMetadata(entry.getDataBuffer());
-                                long msgTimeSincePublish = 
(System.currentTimeMillis() - msgMetadata.getPublishTime());
+                                long msgTimeSincePublish = (clock.millis() - 
msgMetadata.getPublishTime());
                                 if (msgTimeSincePublish > 
maxMessageCacheRetentionTimeMillis) {
                                     cursor.setInactive();
                                 }
@@ -1136,7 +1139,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
             // Empty the list of pending requests and make all of them fail
             clearPendingAddEntries(status);
-            lastLedgerCreationFailureTimestamp = System.currentTimeMillis();
+            lastLedgerCreationFailureTimestamp = clock.millis();
             STATE_UPDATER.set(this, State.ClosedLedger);
         } else {
             log.info("[{}] Created new ledger {}", name, lh.getId());
@@ -1189,7 +1192,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                     ledgersListMutex.unlock();
 
                     synchronized (ManagedLedgerImpl.this) {
-                        lastLedgerCreationFailureTimestamp = 
System.currentTimeMillis();
+                        lastLedgerCreationFailureTimestamp = clock.millis();
                         STATE_UPDATER.set(ManagedLedgerImpl.this, 
State.ClosedLedger);
                         clearPendingAddEntries(e);
                     }
@@ -1215,7 +1218,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
     public synchronized void updateLedgersIdsComplete(Stat stat) {
         STATE_UPDATER.set(this, State.LedgerOpened);
-        lastLedgerCreatedTimestamp = System.currentTimeMillis();
+        lastLedgerCreatedTimestamp = clock.millis();
 
         if (log.isDebugEnabled()) {
             log.debug("[{}] Resending {} pending messages", name, 
pendingAddEntries.size());
@@ -1269,7 +1272,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
         if (entriesInLedger > 0) {
             LedgerInfo info = 
LedgerInfo.newBuilder().setLedgerId(lh.getId()).setEntries(entriesInLedger)
-                    
.setSize(lh.getLength()).setTimestamp(System.currentTimeMillis()).build();
+                    
.setSize(lh.getLength()).setTimestamp(clock.millis()).build();
             ledgers.put(lh.getId(), info);
         } else {
             // The last ledger was empty, so we can discard it
@@ -1546,7 +1549,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             return false;
         }
 
-        long elapsedMs = System.currentTimeMillis() - ledgerTimestamp;
+        long elapsedMs = clock.millis() - ledgerTimestamp;
         return elapsedMs > config.getRetentionTimeMillis();
     }
 
@@ -1610,7 +1613,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                     log.debug(
                         "[{}] Checking ledger {} -- time-old: {} sec -- "
                             + "expired: {} -- over-quota: {} -- 
current-ledger: {}",
-                        name, ls.getLedgerId(), (System.currentTimeMillis() - 
ls.getTimestamp()) / 1000.0, expired,
+                        name, ls.getLedgerId(), (clock.millis() - 
ls.getTimestamp()) / 1000.0, expired,
                         overRetentionQuota, currentLedger.getId());
                 }
                 if (ls.getLedgerId() == currentLedger.getId() || (!expired && 
!overRetentionQuota)) {
@@ -2405,7 +2408,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         boolean spaceQuotaReached = (currentLedgerEntries >= 
config.getMaxEntriesPerLedger()
                 || currentLedgerSize >= (config.getMaxSizePerLedgerMb() * 
MegaByte));
 
-        long timeSinceLedgerCreationMs = System.currentTimeMillis() - 
lastLedgerCreatedTimestamp;
+        long timeSinceLedgerCreationMs = clock.millis() - 
lastLedgerCreatedTimestamp;
         boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= 
maximumRolloverTimeMs;
 
         if (spaceQuotaReached || maxLedgerTimeReached) {
@@ -2414,7 +2417,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 boolean switchLedger = timeSinceLedgerCreationMs > 
config.getMinimumRolloverTimeMs();
                 if (log.isDebugEnabled()) {
                     log.debug("Diff: {}, threshold: {} -- switch: {}",
-                            System.currentTimeMillis() - 
lastLedgerCreatedTimestamp, config.getMinimumRolloverTimeMs(),
+                            clock.millis() - lastLedgerCreatedTimestamp, 
config.getMinimumRolloverTimeMs(),
                             switchLedger);
                 }
                 return switchLedger;

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to