This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5b06e96 MINOR: Use ApiUtils' methods static imported consistently
(#9763)
5b06e96 is described below
commit 5b06e9690b0fba37ac369149f573b27abc39a721
Author: Dongxu Wang <[email protected]>
AuthorDate: Thu Dec 24 00:22:30 2020 +0800
MINOR: Use ApiUtils' methods static imported consistently (#9763)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../main/java/org/apache/kafka/streams/KafkaStreams.java | 4 ++--
.../java/org/apache/kafka/streams/kstream/JoinWindows.java | 10 +++++-----
.../org/apache/kafka/streams/kstream/Materialized.java | 4 ++--
.../org/apache/kafka/streams/kstream/SessionWindows.java | 6 +++---
.../org/apache/kafka/streams/kstream/SlidingWindows.java | 6 +++---
.../java/org/apache/kafka/streams/kstream/TimeWindows.java | 8 ++++----
.../streams/processor/internals/ProcessorContextImpl.java | 4 ++--
.../main/java/org/apache/kafka/streams/state/Stores.java | 14 +++++++-------
8 files changed, 28 insertions(+), 28 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index bd3e60d..9fc492f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -41,7 +41,6 @@ import
org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
-import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
@@ -97,6 +96,7 @@ import java.util.function.Consumer;
import static
org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
import static
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
import static
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
import static
org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
/**
@@ -1215,7 +1215,7 @@ public class KafkaStreams implements AutoCloseable {
*/
public synchronized boolean close(final Duration timeout) throws
IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout,
"timeout");
- final long timeoutMs = ApiUtils.validateMillisecondDuration(timeout,
msgPrefix);
+ final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
if (timeoutMs < 0) {
throw new IllegalArgumentException("Timeout can't be negative.");
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 5a180a2..1019875 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream;
-import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.processor.TimestampExtractor;
import java.time.Duration;
@@ -24,6 +23,7 @@ import java.util.Map;
import java.util.Objects;
import static
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
import static
org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS;
/**
@@ -133,7 +133,7 @@ public final class JoinWindows extends Windows<Window> {
*/
public static JoinWindows of(final Duration timeDifference) throws
IllegalArgumentException {
final String msgPrefix =
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
- return of(ApiUtils.validateMillisecondDuration(timeDifference,
msgPrefix));
+ return of(validateMillisecondDuration(timeDifference, msgPrefix));
}
/**
@@ -164,7 +164,7 @@ public final class JoinWindows extends Windows<Window> {
*/
public JoinWindows before(final Duration timeDifference) throws
IllegalArgumentException {
final String msgPrefix =
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
- return before(ApiUtils.validateMillisecondDuration(timeDifference,
msgPrefix));
+ return before(validateMillisecondDuration(timeDifference, msgPrefix));
}
/**
@@ -195,7 +195,7 @@ public final class JoinWindows extends Windows<Window> {
*/
public JoinWindows after(final Duration timeDifference) throws
IllegalArgumentException {
final String msgPrefix =
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
- return after(ApiUtils.validateMillisecondDuration(timeDifference,
msgPrefix));
+ return after(validateMillisecondDuration(timeDifference, msgPrefix));
}
/**
@@ -227,7 +227,7 @@ public final class JoinWindows extends Windows<Window> {
@SuppressWarnings("deprecation") // removing segments from Windows will
fix this
public JoinWindows grace(final Duration afterWindowEnd) throws
IllegalArgumentException {
final String msgPrefix =
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
- final long afterWindowEndMs =
ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
+ final long afterWindowEndMs =
validateMillisecondDuration(afterWindowEnd, msgPrefix);
if (afterWindowEndMs < 0) {
throw new IllegalArgumentException("Grace period must not be
negative.");
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
index 3804932..0425fb7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -34,6 +33,7 @@ import java.util.Map;
import java.util.Objects;
import static
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
/**
* Used to describe how a {@link StateStore} should be materialized.
@@ -250,7 +250,7 @@ public class Materialized<K, V, S extends StateStore> {
*/
public Materialized<K, V, S> withRetention(final Duration retention)
throws IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(retention,
"retention");
- final long retenationMs =
ApiUtils.validateMillisecondDuration(retention, msgPrefix);
+ final long retenationMs = validateMillisecondDuration(retention,
msgPrefix);
if (retenationMs < 0) {
throw new IllegalArgumentException("Retention must not be
negative.");
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index a67d001..6df393c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream;
-import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
@@ -24,6 +23,7 @@ import java.time.Duration;
import java.util.Objects;
import static
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
import static
org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS;
@@ -110,7 +110,7 @@ public final class SessionWindows {
*/
public static SessionWindows with(final Duration inactivityGap) {
final String msgPrefix =
prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
- return with(ApiUtils.validateMillisecondDuration(inactivityGap,
msgPrefix));
+ return with(validateMillisecondDuration(inactivityGap, msgPrefix));
}
/**
@@ -147,7 +147,7 @@ public final class SessionWindows {
*/
public SessionWindows grace(final Duration afterWindowEnd) throws
IllegalArgumentException {
final String msgPrefix =
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
- final long afterWindowEndMs =
ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
+ final long afterWindowEndMs =
validateMillisecondDuration(afterWindowEnd, msgPrefix);
if (afterWindowEndMs < 0) {
throw new IllegalArgumentException("Grace period must not be
negative.");
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
index 394b58f..189770f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
@@ -16,11 +16,11 @@
*/
package org.apache.kafka.streams.kstream;
-import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.processor.TimestampExtractor;
import java.time.Duration;
import java.util.Objects;
import static
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
/**
* A sliding window used for aggregating events.
@@ -92,12 +92,12 @@ public final class SlidingWindows {
*/
public static SlidingWindows withTimeDifferenceAndGrace(final Duration
timeDifference, final Duration grace) throws IllegalArgumentException {
final String msgPrefixSize =
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
- final long timeDifferenceMs =
ApiUtils.validateMillisecondDuration(timeDifference, msgPrefixSize);
+ final long timeDifferenceMs =
validateMillisecondDuration(timeDifference, msgPrefixSize);
if (timeDifferenceMs < 0) {
throw new IllegalArgumentException("Window time difference must
not be negative.");
}
final String msgPrefixGrace = prepareMillisCheckFailMsgPrefix(grace,
"grace");
- final long graceMs = ApiUtils.validateMillisecondDuration(grace,
msgPrefixGrace);
+ final long graceMs = validateMillisecondDuration(grace,
msgPrefixGrace);
if (graceMs < 0) {
throw new IllegalArgumentException("Window grace period must not
be negative.");
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index ba90fb2..cd52dd5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream;
-import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
@@ -27,6 +26,7 @@ import java.util.Map;
import java.util.Objects;
import static
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
import static
org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS;
/**
@@ -129,7 +129,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
@SuppressWarnings("deprecation") // removing #of(final long sizeMs) will
fix this
public static TimeWindows of(final Duration size) throws
IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
- return of(ApiUtils.validateMillisecondDuration(size, msgPrefix));
+ return of(validateMillisecondDuration(size, msgPrefix));
}
/**
@@ -167,7 +167,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
@SuppressWarnings("deprecation") // removing #advanceBy(final long
advanceMs) will fix this
public TimeWindows advanceBy(final Duration advance) {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(advance,
"advance");
- return advanceBy(ApiUtils.validateMillisecondDuration(advance,
msgPrefix));
+ return advanceBy(validateMillisecondDuration(advance, msgPrefix));
}
@Override
@@ -200,7 +200,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
@SuppressWarnings("deprecation") // will be fixed when we remove segments
from Windows
public TimeWindows grace(final Duration afterWindowEnd) throws
IllegalArgumentException {
final String msgPrefix =
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
- final long afterWindowEndMs =
ApiUtils.validateMillisecondDuration(afterWindowEnd, msgPrefix);
+ final long afterWindowEndMs =
validateMillisecondDuration(afterWindowEnd, msgPrefix);
if (afterWindowEndMs < 0) {
throw new IllegalArgumentException("Grace period must not be
negative.");
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index bac9437..372384f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
@@ -39,6 +38,7 @@ import java.util.List;
import java.util.Map;
import static
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
import static
org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore;
import static
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
@@ -310,7 +310,7 @@ public class ProcessorContextImpl extends
AbstractProcessorContext implements Re
final Punctuator callback) throws
IllegalArgumentException {
throwUnsupportedOperationExceptionIfStandby("schedule");
final String msgPrefix = prepareMillisCheckFailMsgPrefix(interval,
"interval");
- return schedule(ApiUtils.validateMillisecondDuration(interval,
msgPrefix), type, callback);
+ return schedule(validateMillisecondDuration(interval, msgPrefix),
type, callback);
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index c986450..01016b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import
org.apache.kafka.streams.state.internals.InMemorySessionBytesStoreSupplier;
import
org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier;
@@ -37,6 +36,7 @@ import java.time.Duration;
import java.util.Objects;
import static
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
/**
* Factory for creating state stores in Kafka Streams.
@@ -275,9 +275,9 @@ public final class Stores {
final
boolean timestampedStore) {
Objects.requireNonNull(name, "name cannot be null");
final String rpMsgPrefix =
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
- final long retentionMs =
ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
+ final long retentionMs = validateMillisecondDuration(retentionPeriod,
rpMsgPrefix);
final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize,
"windowSize");
- final long windowSizeMs =
ApiUtils.validateMillisecondDuration(windowSize, wsMsgPrefix);
+ final long windowSizeMs = validateMillisecondDuration(windowSize,
wsMsgPrefix);
final long defaultSegmentInterval = Math.max(retentionMs / 2, 60_000L);
@@ -340,13 +340,13 @@ public final class Stores {
Objects.requireNonNull(name, "name cannot be null");
final String repartitionPeriodErrorMessagePrefix =
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
- final long retentionMs =
ApiUtils.validateMillisecondDuration(retentionPeriod,
repartitionPeriodErrorMessagePrefix);
+ final long retentionMs = validateMillisecondDuration(retentionPeriod,
repartitionPeriodErrorMessagePrefix);
if (retentionMs < 0L) {
throw new IllegalArgumentException("retentionPeriod cannot be
negative");
}
final String windowSizeErrorMessagePrefix =
prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
- final long windowSizeMs =
ApiUtils.validateMillisecondDuration(windowSize, windowSizeErrorMessagePrefix);
+ final long windowSizeMs = validateMillisecondDuration(windowSize,
windowSizeErrorMessagePrefix);
if (windowSizeMs < 0L) {
throw new IllegalArgumentException("windowSize cannot be
negative");
}
@@ -393,7 +393,7 @@ public final class Stores {
public static SessionBytesStoreSupplier persistentSessionStore(final
String name,
final
Duration retentionPeriod) {
final String msgPrefix =
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
- return persistentSessionStore(name,
ApiUtils.validateMillisecondDuration(retentionPeriod, msgPrefix));
+ return persistentSessionStore(name,
validateMillisecondDuration(retentionPeriod, msgPrefix));
}
/**
@@ -409,7 +409,7 @@ public final class Stores {
Objects.requireNonNull(name, "name cannot be null");
final String msgPrefix =
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
- final long retentionPeriodMs =
ApiUtils.validateMillisecondDuration(retentionPeriod, msgPrefix);
+ final long retentionPeriodMs =
validateMillisecondDuration(retentionPeriod, msgPrefix);
if (retentionPeriodMs < 0) {
throw new IllegalArgumentException("retentionPeriod cannot be
negative");
}