This is an automated email from the ASF dual-hosted git repository.

kw2542 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 702fd815f SAMZA-2753: Consolidate Clocks (#1617)
702fd815f is described below

commit 702fd815fe40bfb23e8d3412e5e8848b585a5d69
Author: Ke Wu <[email protected]>
AuthorDate: Thu Jul 21 11:06:28 2022 -0700

    SAMZA-2753: Consolidate Clocks (#1617)
    
    * SAMZA-2753: Consolidate Clocks
    
    Issues: Improve the usability of clock related interface/classes
    
    Changes:
    1. Move SystemClock from scala to java
    2. Enrich SystemHighResolutionClock for ease of use
    3. Add comments to Clock
    
    API Changes: N/A
    Upgrade Instructions: N/A
    Usage Instructions: N/A
    
    * Fix constructions of SystemClock
    
    * Fix checkstyle
---
 .../src/main/java/org/apache/samza/util/Clock.java   | 12 +++++++++++-
 .../org/apache/samza/storage/StorageRecovery.java    |  2 +-
 .../org/apache/samza/util/HighResolutionClock.java   |  2 +-
 .../org/apache/samza/util/SystemClock.java}          | 19 +++++++++++++------
 .../apache/samza/util/SystemHighResolutionClock.java | 14 +++++++++++++-
 .../org/apache/samza/util/ThrottlingExecutor.java    |  2 +-
 .../org/apache/samza/util/ThrottlingScheduler.java   |  2 +-
 .../org/apache/samza/container/SamzaContainer.scala  |  4 ++--
 .../org/apache/samza/operators/TestJoinOperator.java | 20 ++++++++++----------
 .../samza/storage/TestContainerStorageManager.java   |  4 ++--
 .../samza/system/kafka/KafkaSystemFactory.scala      |  2 +-
 .../samza/storage/TestTaskStorageManager.scala       |  2 +-
 .../samza/system/kafka/TestKafkaSystemConsumer.java  |  2 +-
 13 files changed, 58 insertions(+), 29 deletions(-)

diff --git a/samza-api/src/main/java/org/apache/samza/util/Clock.java 
b/samza-api/src/main/java/org/apache/samza/util/Clock.java
index db92114a0..34b4de5ae 100644
--- a/samza-api/src/main/java/org/apache/samza/util/Clock.java
+++ b/samza-api/src/main/java/org/apache/samza/util/Clock.java
@@ -20,8 +20,18 @@
 package org.apache.samza.util;
 
 /**
- * Mockable interface for tracking time.
+ * An object that can provide time points (useful for getting the elapsed time 
between two time
+ * points).
+ * <p>
+ * Instances of this interface must be thread-safe.
  */
 public interface Clock {
+  /**
+   * Returns a time point that can be used to calculate the difference in 
milliseconds with another
+   * time point. Resolution of the timer is platform dependent and not 
guaranteed to actually
+   * operate at millisecond precision.
+   *
+   * @return current time point in milliseconds
+   */
   long currentTimeMillis();
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index 6b657d59d..bd4a537df 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -252,7 +252,7 @@ public class StorageRecovery {
               storeBaseDir,
               storeBaseDir,
               null,
-              new SystemClock());
+              SystemClock.instance());
       this.containerStorageManagers.put(containerModel.getId(), 
containerStorageManager);
     }
   }
diff --git 
a/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java 
b/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java
index 6d40149d9..f52b17ade 100644
--- a/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java
+++ b/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java
@@ -21,7 +21,7 @@ package org.apache.samza.util;
 
 /**
  * An object that can provide time points (useful for getting the elapsed time 
between two time
- * points) and can sleep for a specified period of time.
+ * points).
  * <p>
  * Instances of this interface must be thread-safe.
  */
diff --git a/samza-core/src/main/scala/org/apache/samza/util/SystemClock.scala 
b/samza-core/src/main/java/org/apache/samza/util/SystemClock.java
similarity index 73%
rename from samza-core/src/main/scala/org/apache/samza/util/SystemClock.scala
rename to samza-core/src/main/java/org/apache/samza/util/SystemClock.java
index 62253d63c..bc184c709 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/SystemClock.scala
+++ b/samza-core/src/main/java/org/apache/samza/util/SystemClock.java
@@ -17,16 +17,23 @@
  * under the License.
  */
 
-package org.apache.samza.util
+package org.apache.samza.util;
 
 /**
  * Default implementation of the Clock interface, which uses the real
  * system clock.
  */
-class SystemClock extends Clock {
-  override def currentTimeMillis = System.currentTimeMillis
-}
+public class SystemClock implements Clock {
+  private static final SystemClock INSTANCE = new SystemClock();
+
+  private SystemClock() {}
+
+  public static SystemClock instance() {
+    return INSTANCE;
+  }
 
-object SystemClock {
-  val instance = new SystemClock
+  @Override
+  public long currentTimeMillis() {
+    return System.currentTimeMillis();
+  }
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java 
b/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java
index 6bfe7c6af..cd62a773a 100644
--- 
a/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java
+++ 
b/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java
@@ -19,7 +19,19 @@
 
 package org.apache.samza.util;
 
-class SystemHighResolutionClock implements HighResolutionClock {
+/**
+ * Default implementation of the HighResolutionClock interface, which uses the
+ * real system clock.
+ */
+public class SystemHighResolutionClock implements HighResolutionClock {
+  private static final SystemHighResolutionClock INSTANCE = new 
SystemHighResolutionClock();
+
+  private SystemHighResolutionClock() {}
+
+  public static SystemHighResolutionClock instance() {
+    return INSTANCE;
+  }
+
   @Override
   public long nanoTime() {
     return System.nanoTime();
diff --git 
a/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java 
b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
index fe8dba047..54d4cdf5c 100644
--- a/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
+++ b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
@@ -38,7 +38,7 @@ public class ThrottlingExecutor implements Throttleable, 
Executor {
   private long pendingNanos;
 
   public ThrottlingExecutor(long maxDelayMillis) {
-    this(maxDelayMillis, new SystemHighResolutionClock());
+    this(maxDelayMillis, SystemHighResolutionClock.instance());
   }
 
   ThrottlingExecutor(long maxDelayMillis, HighResolutionClock clock) {
diff --git 
a/samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java 
b/samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java
index 265d46d3a..fe5ac13ad 100644
--- a/samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java
+++ b/samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java
@@ -42,7 +42,7 @@ public class ThrottlingScheduler implements Throttleable {
   public ThrottlingScheduler(long maxDelayMillis) {
     this.maxDelayNanos = TimeUnit.MILLISECONDS.toNanos(maxDelayMillis);
     this.scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
-    this.clock = new SystemHighResolutionClock();
+    this.clock = SystemHighResolutionClock.instance();
   }
 
   ThrottlingScheduler(long maxDelayMillis, ScheduledExecutorService 
scheduledExecutorService,
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index e6c188db5..c157b8776 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -543,7 +543,7 @@ object SamzaContainer extends Logging {
       loggedStorageBaseDir,
       nonLoggedStorageBaseDir,
       serdeManager,
-      new SystemClock)
+      SystemClock.instance())
 
     storeWatchPaths.addAll(containerStorageManager.getStoreDirectoryPaths)
 
@@ -576,7 +576,7 @@ object SamzaContainer extends Logging {
               taskInstanceMetrics.get(taskName).isDefined) 
taskInstanceMetrics.get(taskName).get.registry
             else new MetricsRegistryMap
           val taskBackupManager = factory.getBackupManager(jobContext, 
containerModel,
-            taskModel, commitThreadPool, taskMetricsRegistry, config, new 
SystemClock,
+            taskModel, commitThreadPool, taskMetricsRegistry, config, 
SystemClock.instance(),
             loggedStorageBaseDir, nonLoggedStorageBaseDir)
           taskBackupManagerMap.put(factory.getClass.getName, taskBackupManager)
         }
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java 
b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 907dd7d99..e2c4d72cc 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -76,7 +76,7 @@ public class TestJoinOperator {
   @Test
   public void join() throws Exception {
     StreamApplicationDescriptorImpl streamAppDesc = 
this.getTestJoinStreamGraph(new TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
streamAppDesc);
+    StreamOperatorTask sot = createStreamOperatorTask(SystemClock.instance(), 
streamAppDesc);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -108,14 +108,14 @@ public class TestJoinOperator {
       inStream.join(inStream, new TestJoinFunction(), integerSerde, kvSerde, 
kvSerde, JOIN_TTL, "join");
     }, config);
 
-    createStreamOperatorTask(new SystemClock(), streamAppDesc); // should 
throw an exception
+    createStreamOperatorTask(SystemClock.instance(), streamAppDesc); // should 
throw an exception
   }
 
   @Test
   public void joinFnInitAndClose() throws Exception {
     TestJoinFunction joinFn = new TestJoinFunction();
     StreamApplicationDescriptorImpl streamAppDesc = 
this.getTestJoinStreamGraph(joinFn);
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
streamAppDesc);
+    StreamOperatorTask sot = createStreamOperatorTask(SystemClock.instance(), 
streamAppDesc);
 
     MessageCollector messageCollector = mock(MessageCollector.class);
 
@@ -134,7 +134,7 @@ public class TestJoinOperator {
   @Test
   public void joinReverse() throws Exception {
     StreamApplicationDescriptorImpl streamAppDesc = 
this.getTestJoinStreamGraph(new TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
streamAppDesc);
+    StreamOperatorTask sot = createStreamOperatorTask(SystemClock.instance(), 
streamAppDesc);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -150,7 +150,7 @@ public class TestJoinOperator {
   @Test
   public void joinNoMatch() throws Exception {
     StreamApplicationDescriptorImpl streamAppDesc = 
this.getTestJoinStreamGraph(new TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
streamAppDesc);
+    StreamOperatorTask sot = createStreamOperatorTask(SystemClock.instance(), 
streamAppDesc);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -165,7 +165,7 @@ public class TestJoinOperator {
   @Test
   public void joinNoMatchReverse() throws Exception {
     StreamApplicationDescriptorImpl streamAppDesc = 
this.getTestJoinStreamGraph(new TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
streamAppDesc);
+    StreamOperatorTask sot = createStreamOperatorTask(SystemClock.instance(), 
streamAppDesc);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -180,7 +180,7 @@ public class TestJoinOperator {
   @Test
   public void joinRetainsLatestMessageForKey() throws Exception {
     StreamApplicationDescriptorImpl streamAppDesc = 
this.getTestJoinStreamGraph(new TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
streamAppDesc);
+    StreamOperatorTask sot = createStreamOperatorTask(SystemClock.instance(), 
streamAppDesc);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -198,7 +198,7 @@ public class TestJoinOperator {
   @Test
   public void joinRetainsLatestMessageForKeyReverse() throws Exception {
     StreamApplicationDescriptorImpl streamAppDesc = 
this.getTestJoinStreamGraph(new TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
streamAppDesc);
+    StreamOperatorTask sot = createStreamOperatorTask(SystemClock.instance(), 
streamAppDesc);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -216,7 +216,7 @@ public class TestJoinOperator {
   @Test
   public void joinRetainsMatchedMessages() throws Exception {
     StreamApplicationDescriptorImpl streamAppDesc = 
this.getTestJoinStreamGraph(new TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
streamAppDesc);
+    StreamOperatorTask sot = createStreamOperatorTask(SystemClock.instance(), 
streamAppDesc);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -239,7 +239,7 @@ public class TestJoinOperator {
   @Test
   public void joinRetainsMatchedMessagesReverse() throws Exception {
     StreamApplicationDescriptorImpl streamAppDesc = 
this.getTestJoinStreamGraph(new TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
streamAppDesc);
+    StreamOperatorTask sot = createStreamOperatorTask(SystemClock.instance(), 
streamAppDesc);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
diff --git 
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
 
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
index eed7d5442..810433258 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
+++ 
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -268,7 +268,7 @@ public class TestContainerStorageManager {
         DEFAULT_LOGGED_STORE_BASE_DIR,
         DEFAULT_STORE_BASE_DIR,
         null,
-        new SystemClock());
+        SystemClock.instance());
     this.testContext = new StandbyTestContext();
   }
 
@@ -355,7 +355,7 @@ public class TestContainerStorageManager {
         DEFAULT_LOGGED_STORE_BASE_DIR,
         DEFAULT_STORE_BASE_DIR,
         null,
-        new SystemClock());
+        SystemClock.instance());
 
     containerStorageManager.start();
     containerStorageManager.shutdown();
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index a2773f645..ef54cd935 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -52,7 +52,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
       new KafkaConsumerProxy.BaseFactory[Array[Byte], 
Array[Byte]](kafkaConsumer, systemName, clientId, metrics)
 
     val kafkaSystemConsumer = new KafkaSystemConsumer(kafkaConsumer, 
systemName, config, clientId,
-      kafkaConsumerProxyFactory, metrics, new SystemClock)
+      kafkaConsumerProxyFactory, metrics, SystemClock.instance())
     info("Created samza system consumer for system %s, config %s: %s" 
format(systemName, config, kafkaSystemConsumer))
 
     kafkaSystemConsumer
diff --git 
a/samza-kafka/src/test/java/org/apache/samza/storage/TestTaskStorageManager.scala
 
b/samza-kafka/src/test/java/org/apache/samza/storage/TestTaskStorageManager.scala
index 4c8d8e489..2aeaf12ea 100644
--- 
a/samza-kafka/src/test/java/org/apache/samza/storage/TestTaskStorageManager.scala
+++ 
b/samza-kafka/src/test/java/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -810,7 +810,7 @@ class TaskStorageManagerBuilder extends MockitoSugar {
       loggedStoreBaseDir,
       TaskStorageManagerBuilder.defaultStoreBaseDir,
       null,
-      new SystemClock)
+      SystemClock.instance())
     this
   }
 
diff --git 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
index dd20248d2..13c672900 100644
--- 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
+++ 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
@@ -220,7 +220,7 @@ public class TestKafkaSystemConsumer {
     Mockito.doNothing().when(consumer).seek(new TopicPartition(TEST_STREAM, 
1), 1);
 
     KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxyFactory,
-                                                                      
kafkaSystemConsumerMetrics, new SystemClock());
+                                                                      
kafkaSystemConsumerMetrics, SystemClock.instance());
     kafkaSystemConsumer.register(testSystemStreamPartition1, testOffset);
     kafkaSystemConsumer.register(testSystemStreamPartition2, testOffset);
 

Reply via email to