This is an automated email from the ASF dual-hosted git repository.
kw2542 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 702fd815f SAMZA-2753: Consolidate Clocks (#1617)
702fd815f is described below
commit 702fd815fe40bfb23e8d3412e5e8848b585a5d69
Author: Ke Wu <[email protected]>
AuthorDate: Thu Jul 21 11:06:28 2022 -0700
SAMZA-2753: Consolidate Clocks (#1617)
* SAMZA-2753: Consolidate Clocks
Issues: Improve the usability of clock related interface/classes
Changes:
1. Move SystemClock from scala to java
2. Enrich SystemHighResolutionClock for ease of use
3. Add comments to Clock
API Changes: N/A
Upgrade Instructions: N/A
Usage Instructions: N/A
* Fix constructions of SystemClock
* Fix checkstyle
---
.../src/main/java/org/apache/samza/util/Clock.java | 12 +++++++++++-
.../org/apache/samza/storage/StorageRecovery.java | 2 +-
.../org/apache/samza/util/HighResolutionClock.java | 2 +-
.../org/apache/samza/util/SystemClock.java} | 19 +++++++++++++------
.../apache/samza/util/SystemHighResolutionClock.java | 14 +++++++++++++-
.../org/apache/samza/util/ThrottlingExecutor.java | 2 +-
.../org/apache/samza/util/ThrottlingScheduler.java | 2 +-
.../org/apache/samza/container/SamzaContainer.scala | 4 ++--
.../org/apache/samza/operators/TestJoinOperator.java | 20 ++++++++++----------
.../samza/storage/TestContainerStorageManager.java | 4 ++--
.../samza/system/kafka/KafkaSystemFactory.scala | 2 +-
.../samza/storage/TestTaskStorageManager.scala | 2 +-
.../samza/system/kafka/TestKafkaSystemConsumer.java | 2 +-
13 files changed, 58 insertions(+), 29 deletions(-)
diff --git a/samza-api/src/main/java/org/apache/samza/util/Clock.java
b/samza-api/src/main/java/org/apache/samza/util/Clock.java
index db92114a0..34b4de5ae 100644
--- a/samza-api/src/main/java/org/apache/samza/util/Clock.java
+++ b/samza-api/src/main/java/org/apache/samza/util/Clock.java
@@ -20,8 +20,18 @@
package org.apache.samza.util;
/**
- * Mockable interface for tracking time.
+ * An object that can provide time points (useful for getting the elapsed time
between two time
+ * points).
+ * <p>
+ * Instances of this interface must be thread-safe.
*/
public interface Clock {
+ /**
+ * Returns a time point that can be used to calculate the difference in
milliseconds with another
+ * time point. Resolution of the timer is platform dependent and not
guaranteed to actually
+ * operate at millisecond precision.
+ *
+ * @return current time point in milliseconds
+ */
long currentTimeMillis();
}
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index 6b657d59d..bd4a537df 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -252,7 +252,7 @@ public class StorageRecovery {
storeBaseDir,
storeBaseDir,
null,
- new SystemClock());
+ SystemClock.instance());
this.containerStorageManagers.put(containerModel.getId(),
containerStorageManager);
}
}
diff --git
a/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java
b/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java
index 6d40149d9..f52b17ade 100644
--- a/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java
+++ b/samza-core/src/main/java/org/apache/samza/util/HighResolutionClock.java
@@ -21,7 +21,7 @@ package org.apache.samza.util;
/**
* An object that can provide time points (useful for getting the elapsed time
between two time
- * points) and can sleep for a specified period of time.
+ * points).
* <p>
* Instances of this interface must be thread-safe.
*/
diff --git a/samza-core/src/main/scala/org/apache/samza/util/SystemClock.scala
b/samza-core/src/main/java/org/apache/samza/util/SystemClock.java
similarity index 73%
rename from samza-core/src/main/scala/org/apache/samza/util/SystemClock.scala
rename to samza-core/src/main/java/org/apache/samza/util/SystemClock.java
index 62253d63c..bc184c709 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/SystemClock.scala
+++ b/samza-core/src/main/java/org/apache/samza/util/SystemClock.java
@@ -17,16 +17,23 @@
* under the License.
*/
-package org.apache.samza.util
+package org.apache.samza.util;
/**
* Default implementation of the Clock interface, which uses the real
* system clock.
*/
-class SystemClock extends Clock {
- override def currentTimeMillis = System.currentTimeMillis
-}
+public class SystemClock implements Clock {
+ private static final SystemClock INSTANCE = new SystemClock();
+
+ private SystemClock() {}
+
+ public static SystemClock instance() {
+ return INSTANCE;
+ }
-object SystemClock {
- val instance = new SystemClock
+ @Override
+ public long currentTimeMillis() {
+ return System.currentTimeMillis();
+ }
}
diff --git
a/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java
b/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java
index 6bfe7c6af..cd62a773a 100644
---
a/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java
+++
b/samza-core/src/main/java/org/apache/samza/util/SystemHighResolutionClock.java
@@ -19,7 +19,19 @@
package org.apache.samza.util;
-class SystemHighResolutionClock implements HighResolutionClock {
+/**
+ * Default implementation of the HighResolutionClock interface, which uses the
+ * real system clock.
+ */
+public class SystemHighResolutionClock implements HighResolutionClock {
+ private static final SystemHighResolutionClock INSTANCE = new
SystemHighResolutionClock();
+
+ private SystemHighResolutionClock() {}
+
+ public static SystemHighResolutionClock instance() {
+ return INSTANCE;
+ }
+
@Override
public long nanoTime() {
return System.nanoTime();
diff --git
a/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
index fe8dba047..54d4cdf5c 100644
--- a/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
+++ b/samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
@@ -38,7 +38,7 @@ public class ThrottlingExecutor implements Throttleable,
Executor {
private long pendingNanos;
public ThrottlingExecutor(long maxDelayMillis) {
- this(maxDelayMillis, new SystemHighResolutionClock());
+ this(maxDelayMillis, SystemHighResolutionClock.instance());
}
ThrottlingExecutor(long maxDelayMillis, HighResolutionClock clock) {
diff --git
a/samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java
b/samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java
index 265d46d3a..fe5ac13ad 100644
--- a/samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java
+++ b/samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java
@@ -42,7 +42,7 @@ public class ThrottlingScheduler implements Throttleable {
public ThrottlingScheduler(long maxDelayMillis) {
this.maxDelayNanos = TimeUnit.MILLISECONDS.toNanos(maxDelayMillis);
this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();
- this.clock = new SystemHighResolutionClock();
+ this.clock = SystemHighResolutionClock.instance();
}
ThrottlingScheduler(long maxDelayMillis, ScheduledExecutorService
scheduledExecutorService,
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index e6c188db5..c157b8776 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -543,7 +543,7 @@ object SamzaContainer extends Logging {
loggedStorageBaseDir,
nonLoggedStorageBaseDir,
serdeManager,
- new SystemClock)
+ SystemClock.instance())
storeWatchPaths.addAll(containerStorageManager.getStoreDirectoryPaths)
@@ -576,7 +576,7 @@ object SamzaContainer extends Logging {
taskInstanceMetrics.get(taskName).isDefined)
taskInstanceMetrics.get(taskName).get.registry
else new MetricsRegistryMap
val taskBackupManager = factory.getBackupManager(jobContext,
containerModel,
- taskModel, commitThreadPool, taskMetricsRegistry, config, new
SystemClock,
+ taskModel, commitThreadPool, taskMetricsRegistry, config,
SystemClock.instance(),
loggedStorageBaseDir, nonLoggedStorageBaseDir)
taskBackupManagerMap.put(factory.getClass.getName, taskBackupManager)
}
diff --git
a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 907dd7d99..e2c4d72cc 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -76,7 +76,7 @@ public class TestJoinOperator {
@Test
public void join() throws Exception {
StreamApplicationDescriptorImpl streamAppDesc =
this.getTestJoinStreamGraph(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(),
streamAppDesc);
+ StreamOperatorTask sot = createStreamOperatorTask(SystemClock.instance(),
streamAppDesc);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer)
envelope.getMessage());
@@ -108,14 +108,14 @@ public class TestJoinOperator {
inStream.join(inStream, new TestJoinFunction(), integerSerde, kvSerde,
kvSerde, JOIN_TTL, "join");
}, config);
- createStreamOperatorTask(new SystemClock(), streamAppDesc); // should
throw an exception
+ createStreamOperatorTask(SystemClock.instance(), streamAppDesc); // should
throw an exception
}
@Test
public void joinFnInitAndClose() throws Exception {
TestJoinFunction joinFn = new TestJoinFunction();
StreamApplicationDescriptorImpl streamAppDesc =
this.getTestJoinStreamGraph(joinFn);
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(),
streamAppDesc);
+ StreamOperatorTask sot = createStreamOperatorTask(SystemClock.instance(),
streamAppDesc);
MessageCollector messageCollector = mock(MessageCollector.class);
@@ -134,7 +134,7 @@ public class TestJoinOperator {
@Test
public void joinReverse() throws Exception {
StreamApplicationDescriptorImpl streamAppDesc =
this.getTestJoinStreamGraph(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(),
streamAppDesc);
+ StreamOperatorTask sot = createStreamOperatorTask(SystemClock.instance(),
streamAppDesc);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer)
envelope.getMessage());
@@ -150,7 +150,7 @@ public class TestJoinOperator {
@Test
public void joinNoMatch() throws Exception {
StreamApplicationDescriptorImpl streamAppDesc =
this.getTestJoinStreamGraph(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(),
streamAppDesc);
+ StreamOperatorTask sot = createStreamOperatorTask(SystemClock.instance(),
streamAppDesc);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer)
envelope.getMessage());
@@ -165,7 +165,7 @@ public class TestJoinOperator {
@Test
public void joinNoMatchReverse() throws Exception {
StreamApplicationDescriptorImpl streamAppDesc =
this.getTestJoinStreamGraph(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(),
streamAppDesc);
+ StreamOperatorTask sot = createStreamOperatorTask(SystemClock.instance(),
streamAppDesc);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer)
envelope.getMessage());
@@ -180,7 +180,7 @@ public class TestJoinOperator {
@Test
public void joinRetainsLatestMessageForKey() throws Exception {
StreamApplicationDescriptorImpl streamAppDesc =
this.getTestJoinStreamGraph(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(),
streamAppDesc);
+ StreamOperatorTask sot = createStreamOperatorTask(SystemClock.instance(),
streamAppDesc);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer)
envelope.getMessage());
@@ -198,7 +198,7 @@ public class TestJoinOperator {
@Test
public void joinRetainsLatestMessageForKeyReverse() throws Exception {
StreamApplicationDescriptorImpl streamAppDesc =
this.getTestJoinStreamGraph(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(),
streamAppDesc);
+ StreamOperatorTask sot = createStreamOperatorTask(SystemClock.instance(),
streamAppDesc);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer)
envelope.getMessage());
@@ -216,7 +216,7 @@ public class TestJoinOperator {
@Test
public void joinRetainsMatchedMessages() throws Exception {
StreamApplicationDescriptorImpl streamAppDesc =
this.getTestJoinStreamGraph(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(),
streamAppDesc);
+ StreamOperatorTask sot = createStreamOperatorTask(SystemClock.instance(),
streamAppDesc);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer)
envelope.getMessage());
@@ -239,7 +239,7 @@ public class TestJoinOperator {
@Test
public void joinRetainsMatchedMessagesReverse() throws Exception {
StreamApplicationDescriptorImpl streamAppDesc =
this.getTestJoinStreamGraph(new TestJoinFunction());
- StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(),
streamAppDesc);
+ StreamOperatorTask sot = createStreamOperatorTask(SystemClock.instance(),
streamAppDesc);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer)
envelope.getMessage());
diff --git
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
index eed7d5442..810433258 100644
---
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
+++
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -268,7 +268,7 @@ public class TestContainerStorageManager {
DEFAULT_LOGGED_STORE_BASE_DIR,
DEFAULT_STORE_BASE_DIR,
null,
- new SystemClock());
+ SystemClock.instance());
this.testContext = new StandbyTestContext();
}
@@ -355,7 +355,7 @@ public class TestContainerStorageManager {
DEFAULT_LOGGED_STORE_BASE_DIR,
DEFAULT_STORE_BASE_DIR,
null,
- new SystemClock());
+ SystemClock.instance());
containerStorageManager.start();
containerStorageManager.shutdown();
diff --git
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index a2773f645..ef54cd935 100644
---
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -52,7 +52,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
new KafkaConsumerProxy.BaseFactory[Array[Byte],
Array[Byte]](kafkaConsumer, systemName, clientId, metrics)
val kafkaSystemConsumer = new KafkaSystemConsumer(kafkaConsumer,
systemName, config, clientId,
- kafkaConsumerProxyFactory, metrics, new SystemClock)
+ kafkaConsumerProxyFactory, metrics, SystemClock.instance())
info("Created samza system consumer for system %s, config %s: %s"
format(systemName, config, kafkaSystemConsumer))
kafkaSystemConsumer
diff --git
a/samza-kafka/src/test/java/org/apache/samza/storage/TestTaskStorageManager.scala
b/samza-kafka/src/test/java/org/apache/samza/storage/TestTaskStorageManager.scala
index 4c8d8e489..2aeaf12ea 100644
---
a/samza-kafka/src/test/java/org/apache/samza/storage/TestTaskStorageManager.scala
+++
b/samza-kafka/src/test/java/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -810,7 +810,7 @@ class TaskStorageManagerBuilder extends MockitoSugar {
loggedStoreBaseDir,
TaskStorageManagerBuilder.defaultStoreBaseDir,
null,
- new SystemClock)
+ SystemClock.instance())
this
}
diff --git
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
index dd20248d2..13c672900 100644
---
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
+++
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
@@ -220,7 +220,7 @@ public class TestKafkaSystemConsumer {
Mockito.doNothing().when(consumer).seek(new TopicPartition(TEST_STREAM,
1), 1);
KafkaSystemConsumer kafkaSystemConsumer = new
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID,
kafkaConsumerProxyFactory,
-
kafkaSystemConsumerMetrics, new SystemClock());
+
kafkaSystemConsumerMetrics, SystemClock.instance());
kafkaSystemConsumer.register(testSystemStreamPartition1, testOffset);
kafkaSystemConsumer.register(testSystemStreamPartition2, testOffset);