This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5b06e96  MINOR: Use ApiUtils' methods static imported consistently 
(#9763)
5b06e96 is described below

commit 5b06e9690b0fba37ac369149f573b27abc39a721
Author: Dongxu Wang <[email protected]>
AuthorDate: Thu Dec 24 00:22:30 2020 +0800

    MINOR: Use ApiUtils' methods static imported consistently (#9763)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../main/java/org/apache/kafka/streams/KafkaStreams.java   |  4 ++--
 .../java/org/apache/kafka/streams/kstream/JoinWindows.java | 10 +++++-----
 .../org/apache/kafka/streams/kstream/Materialized.java     |  4 ++--
 .../org/apache/kafka/streams/kstream/SessionWindows.java   |  6 +++---
 .../org/apache/kafka/streams/kstream/SlidingWindows.java   |  6 +++---
 .../java/org/apache/kafka/streams/kstream/TimeWindows.java |  8 ++++----
 .../streams/processor/internals/ProcessorContextImpl.java  |  4 ++--
 .../main/java/org/apache/kafka/streams/state/Stores.java   | 14 +++++++-------
 8 files changed, 28 insertions(+), 28 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index bd3e60d..9fc492f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -41,7 +41,6 @@ import 
org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
 import org.apache.kafka.streams.errors.TopologyException;
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.internals.metrics.ClientMetrics;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -97,6 +96,7 @@ import java.util.function.Consumer;
 import static 
org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
 import static 
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
 import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
 import static 
org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
 
 /**
@@ -1215,7 +1215,7 @@ public class KafkaStreams implements AutoCloseable {
      */
     public synchronized boolean close(final Duration timeout) throws 
IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
-        final long timeoutMs = ApiUtils.validateMillisecondDuration(timeout, 
msgPrefix);
+        final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
         if (timeoutMs < 0) {
             throw new IllegalArgumentException("Timeout can't be negative.");
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 5a180a2..1019875 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream;
 
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 
 import java.time.Duration;
@@ -24,6 +23,7 @@ import java.util.Map;
 import java.util.Objects;
 
 import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
 import static 
org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS;
 
 /**
@@ -133,7 +133,7 @@ public final class JoinWindows extends Windows<Window> {
      */
     public static JoinWindows of(final Duration timeDifference) throws 
IllegalArgumentException {
         final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
-        return of(ApiUtils.validateMillisecondDuration(timeDifference, 
msgPrefix));
+        return of(validateMillisecondDuration(timeDifference, msgPrefix));
     }
 
     /**
@@ -164,7 +164,7 @@ public final class JoinWindows extends Windows<Window> {
      */
     public JoinWindows before(final Duration timeDifference) throws 
IllegalArgumentException {
         final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
-        return before(ApiUtils.validateMillisecondDuration(timeDifference, 
msgPrefix));
+        return before(validateMillisecondDuration(timeDifference, msgPrefix));
     }
 
     /**
@@ -195,7 +195,7 @@ public final class JoinWindows extends Windows<Window> {
      */
     public JoinWindows after(final Duration timeDifference) throws 
IllegalArgumentException {
         final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
-        return after(ApiUtils.validateMillisecondDuration(timeDifference, 
msgPrefix));
+        return after(validateMillisecondDuration(timeDifference, msgPrefix));
     }
 
     /**
@@ -227,7 +227,7 @@ public final class JoinWindows extends Windows<Window> {
     @SuppressWarnings("deprecation") // removing segments from Windows will 
fix this
     public JoinWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
         final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
-        final long afterWindowEndMs = 
ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
+        final long afterWindowEndMs = 
validateMillisecondDuration(afterWindowEnd, msgPrefix);
         if (afterWindowEndMs < 0) {
             throw new IllegalArgumentException("Grace period must not be 
negative.");
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
index 3804932..0425fb7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -34,6 +33,7 @@ import java.util.Map;
 import java.util.Objects;
 
 import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
 
 /**
  * Used to describe how a {@link StateStore} should be materialized.
@@ -250,7 +250,7 @@ public class Materialized<K, V, S extends StateStore> {
      */
     public Materialized<K, V, S> withRetention(final Duration retention) 
throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(retention, 
"retention");
-        final long retenationMs = 
ApiUtils.validateMillisecondDuration(retention, msgPrefix);
+        final long retenationMs = validateMillisecondDuration(retention, 
msgPrefix);
 
         if (retenationMs < 0) {
             throw new IllegalArgumentException("Retention must not be 
negative.");
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index a67d001..6df393c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream;
 
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
 
@@ -24,6 +23,7 @@ import java.time.Duration;
 import java.util.Objects;
 
 import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
 import static 
org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS;
 
 
@@ -110,7 +110,7 @@ public final class SessionWindows {
      */
     public static SessionWindows with(final Duration inactivityGap) {
         final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
-        return with(ApiUtils.validateMillisecondDuration(inactivityGap, 
msgPrefix));
+        return with(validateMillisecondDuration(inactivityGap, msgPrefix));
     }
 
     /**
@@ -147,7 +147,7 @@ public final class SessionWindows {
      */
     public SessionWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
         final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
-        final long afterWindowEndMs = 
ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
+        final long afterWindowEndMs = 
validateMillisecondDuration(afterWindowEnd, msgPrefix);
 
         if (afterWindowEndMs < 0) {
             throw new IllegalArgumentException("Grace period must not be 
negative.");
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
index 394b58f..189770f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
@@ -16,11 +16,11 @@
  */
 package org.apache.kafka.streams.kstream;
 
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import java.time.Duration;
 import java.util.Objects;
 import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
 
 /**
  * A sliding window used for aggregating events.
@@ -92,12 +92,12 @@ public final class SlidingWindows {
      */
     public static SlidingWindows withTimeDifferenceAndGrace(final Duration 
timeDifference, final Duration grace) throws IllegalArgumentException {
         final String msgPrefixSize = 
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
-        final long timeDifferenceMs = 
ApiUtils.validateMillisecondDuration(timeDifference, msgPrefixSize);
+        final long timeDifferenceMs = 
validateMillisecondDuration(timeDifference, msgPrefixSize);
         if (timeDifferenceMs < 0) {
             throw new IllegalArgumentException("Window time difference must 
not be negative.");
         }
         final String msgPrefixGrace = prepareMillisCheckFailMsgPrefix(grace, 
"grace");
-        final long graceMs = ApiUtils.validateMillisecondDuration(grace, 
msgPrefixGrace);
+        final long graceMs = validateMillisecondDuration(grace, 
msgPrefixGrace);
         if (graceMs < 0) {
             throw new IllegalArgumentException("Window grace period must not 
be negative.");
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index ba90fb2..cd52dd5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream;
 
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
@@ -27,6 +26,7 @@ import java.util.Map;
 import java.util.Objects;
 
 import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
 import static 
org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS;
 
 /**
@@ -129,7 +129,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
     @SuppressWarnings("deprecation") // removing #of(final long sizeMs) will 
fix this
     public static TimeWindows of(final Duration size) throws 
IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
-        return of(ApiUtils.validateMillisecondDuration(size, msgPrefix));
+        return of(validateMillisecondDuration(size, msgPrefix));
     }
 
     /**
@@ -167,7 +167,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
     @SuppressWarnings("deprecation") // removing #advanceBy(final long 
advanceMs) will fix this
     public TimeWindows advanceBy(final Duration advance) {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(advance, 
"advance");
-        return advanceBy(ApiUtils.validateMillisecondDuration(advance, 
msgPrefix));
+        return advanceBy(validateMillisecondDuration(advance, msgPrefix));
     }
 
     @Override
@@ -200,7 +200,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
     @SuppressWarnings("deprecation") // will be fixed when we remove segments 
from Windows
     public TimeWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
         final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
-        final long afterWindowEndMs = 
ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
+        final long afterWindowEndMs = 
validateMillisecondDuration(afterWindowEnd, msgPrefix);
         if (afterWindowEndMs < 0) {
             throw new IllegalArgumentException("Grace period must not be 
negative.");
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index bac9437..372384f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
@@ -39,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
 import static 
org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore;
 import static 
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
 
@@ -310,7 +310,7 @@ public class ProcessorContextImpl extends 
AbstractProcessorContext implements Re
                                 final Punctuator callback) throws 
IllegalArgumentException {
         throwUnsupportedOperationExceptionIfStandby("schedule");
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(interval, 
"interval");
-        return schedule(ApiUtils.validateMillisecondDuration(interval, 
msgPrefix), type, callback);
+        return schedule(validateMillisecondDuration(interval, msgPrefix), 
type, callback);
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index c986450..01016b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.state;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 import 
org.apache.kafka.streams.state.internals.InMemorySessionBytesStoreSupplier;
 import 
org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier;
@@ -37,6 +36,7 @@ import java.time.Duration;
 import java.util.Objects;
 
 import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
 
 /**
  * Factory for creating state stores in Kafka Streams.
@@ -275,9 +275,9 @@ public final class Stores {
                                                                   final 
boolean timestampedStore) {
         Objects.requireNonNull(name, "name cannot be null");
         final String rpMsgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
-        final long retentionMs = 
ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
+        final long retentionMs = validateMillisecondDuration(retentionPeriod, 
rpMsgPrefix);
         final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, 
"windowSize");
-        final long windowSizeMs = 
ApiUtils.validateMillisecondDuration(windowSize, wsMsgPrefix);
+        final long windowSizeMs = validateMillisecondDuration(windowSize, 
wsMsgPrefix);
 
         final long defaultSegmentInterval = Math.max(retentionMs / 2, 60_000L);
 
@@ -340,13 +340,13 @@ public final class Stores {
         Objects.requireNonNull(name, "name cannot be null");
 
         final String repartitionPeriodErrorMessagePrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
-        final long retentionMs = 
ApiUtils.validateMillisecondDuration(retentionPeriod, 
repartitionPeriodErrorMessagePrefix);
+        final long retentionMs = validateMillisecondDuration(retentionPeriod, 
repartitionPeriodErrorMessagePrefix);
         if (retentionMs < 0L) {
             throw new IllegalArgumentException("retentionPeriod cannot be 
negative");
         }
 
         final String windowSizeErrorMessagePrefix = 
prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
-        final long windowSizeMs = 
ApiUtils.validateMillisecondDuration(windowSize, windowSizeErrorMessagePrefix);
+        final long windowSizeMs = validateMillisecondDuration(windowSize, 
windowSizeErrorMessagePrefix);
         if (windowSizeMs < 0L) {
             throw new IllegalArgumentException("windowSize cannot be 
negative");
         }
@@ -393,7 +393,7 @@ public final class Stores {
     public static SessionBytesStoreSupplier persistentSessionStore(final 
String name,
                                                                    final 
Duration retentionPeriod) {
         final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
-        return persistentSessionStore(name, 
ApiUtils.validateMillisecondDuration(retentionPeriod, msgPrefix));
+        return persistentSessionStore(name, 
validateMillisecondDuration(retentionPeriod, msgPrefix));
     }
 
     /**
@@ -409,7 +409,7 @@ public final class Stores {
         Objects.requireNonNull(name, "name cannot be null");
 
         final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
-        final long retentionPeriodMs = 
ApiUtils.validateMillisecondDuration(retentionPeriod, msgPrefix);
+        final long retentionPeriodMs = 
validateMillisecondDuration(retentionPeriod, msgPrefix);
         if (retentionPeriodMs < 0) {
             throw new IllegalArgumentException("retentionPeriod cannot be 
negative");
         }

Reply via email to