This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6a66f7a [FLINK-11825][StateBackends] Resolve name class of StateTTL
TimeCharacteristic class.
6a66f7a is described below
commit 6a66f7acf370e12ad65ee24293ed47d2c5db225c
Author: klion26 <[email protected]>
AuthorDate: Thu Mar 7 15:07:19 2019 +0800
[FLINK-11825][StateBackends] Resolve name class of StateTTL
TimeCharacteristic class.
We can not remove the class StateTtlConfig#TimeCharacteristic and use
org.apache.flink.streaming.api.TimeCharacteristic directly,
because StateTtlConfig locates in module flink-core and
org.apache.flink.streaming.api.TimeCharacteristic locates in
flink-streaming-java,
so we choice to rename StateTtlConfig#TimeCharacteristic.
changes include:
- Deprecated the StateTtlConfig#TimeCharacteristic class (for
backward-compatibility).
- Introduce a new class named StateTtlConfig#TtlTimeCharacteristic.
---
.../flink/api/common/state/StateTtlConfig.java | 62 ++++++++++++++++------
1 file changed, 46 insertions(+), 16 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
index 5bb44d1..2a78f19 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.common.state;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.util.Preconditions;
@@ -29,8 +30,10 @@ import java.io.Serializable;
import java.util.EnumMap;
import static
org.apache.flink.api.common.state.StateTtlConfig.StateVisibility.NeverReturnExpired;
-import static
org.apache.flink.api.common.state.StateTtlConfig.TimeCharacteristic.ProcessingTime;
+import static
org.apache.flink.api.common.state.StateTtlConfig.TtlTimeCharacteristic.ProcessingTime;
import static
org.apache.flink.api.common.state.StateTtlConfig.UpdateType.OnCreateAndWrite;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Configuration of state TTL logic.
@@ -41,6 +44,7 @@ import static
org.apache.flink.api.common.state.StateTtlConfig.UpdateType.OnCrea
* it can be wrapped with {@link
org.apache.flink.api.java.typeutils.runtime.NullableSerializer}
* at the cost of an extra byte in the serialized form.
*/
+@PublicEvolving
public class StateTtlConfig implements Serializable {
private static final long serialVersionUID = -7592693245044289793L;
@@ -72,31 +76,41 @@ public class StateTtlConfig implements Serializable {
/**
* This option configures time scale to use for ttl.
+ *
+ * @deprecated will be removed in a future version in favor of {@link
TtlTimeCharacteristic}
*/
+ @Deprecated
public enum TimeCharacteristic {
/** Processing time, see also
<code>TimeCharacteristic.ProcessingTime</code>. */
ProcessingTime
}
+ /**
+ * This option configures time scale to use for ttl.
+ */
+ public enum TtlTimeCharacteristic {
+ /** Processing time, see also
<code>org.apache.flink.streaming.api.TimeCharacteristic.ProcessingTime</code>.
*/
+ ProcessingTime
+ }
+
private final UpdateType updateType;
private final StateVisibility stateVisibility;
- private final TimeCharacteristic timeCharacteristic;
+ private final TtlTimeCharacteristic ttlTimeCharacteristic;
private final Time ttl;
private final CleanupStrategies cleanupStrategies;
private StateTtlConfig(
UpdateType updateType,
StateVisibility stateVisibility,
- TimeCharacteristic timeCharacteristic,
+ TtlTimeCharacteristic ttlTimeCharacteristic,
Time ttl,
CleanupStrategies cleanupStrategies) {
- this.updateType = Preconditions.checkNotNull(updateType);
- this.stateVisibility =
Preconditions.checkNotNull(stateVisibility);
- this.timeCharacteristic =
Preconditions.checkNotNull(timeCharacteristic);
- this.ttl = Preconditions.checkNotNull(ttl);
+ this.updateType = checkNotNull(updateType);
+ this.stateVisibility = checkNotNull(stateVisibility);
+ this.ttlTimeCharacteristic =
checkNotNull(ttlTimeCharacteristic);
+ this.ttl = checkNotNull(ttl);
this.cleanupStrategies = cleanupStrategies;
- Preconditions.checkArgument(ttl.toMilliseconds() > 0,
- "TTL is expected to be positive");
+ checkArgument(ttl.toMilliseconds() > 0, "TTL is expected to be
positive.");
}
@Nonnull
@@ -115,8 +129,8 @@ public class StateTtlConfig implements Serializable {
}
@Nonnull
- public TimeCharacteristic getTimeCharacteristic() {
- return timeCharacteristic;
+ public TtlTimeCharacteristic getTtlTimeCharacteristic() {
+ return ttlTimeCharacteristic;
}
public boolean isEnabled() {
@@ -133,7 +147,7 @@ public class StateTtlConfig implements Serializable {
return "StateTtlConfig{" +
"updateType=" + updateType +
", stateVisibility=" + stateVisibility +
- ", timeCharacteristic=" + timeCharacteristic +
+ ", ttlTimeCharacteristic=" + ttlTimeCharacteristic +
", ttl=" + ttl +
'}';
}
@@ -150,7 +164,7 @@ public class StateTtlConfig implements Serializable {
private UpdateType updateType = OnCreateAndWrite;
private StateVisibility stateVisibility = NeverReturnExpired;
- private TimeCharacteristic timeCharacteristic = ProcessingTime;
+ private TtlTimeCharacteristic ttlTimeCharacteristic =
ProcessingTime;
private Time ttl;
private CleanupStrategies cleanupStrategies = new
CleanupStrategies();
@@ -204,16 +218,32 @@ public class StateTtlConfig implements Serializable {
* Sets the time characteristic.
*
* @param timeCharacteristic The time characteristic configures
time scale to use for ttl.
+ *
+ * @deprecated will be removed in a future version in favor of
{@link #setTtlTimeCharacteristic}
*/
+ @Deprecated
@Nonnull
public Builder setTimeCharacteristic(@Nonnull
TimeCharacteristic timeCharacteristic) {
- this.timeCharacteristic = timeCharacteristic;
+
checkArgument(timeCharacteristic.equals(TimeCharacteristic.ProcessingTime),
+ "Only support
TimeCharacteristic.ProcessingTime, this function has replaced by
setTtlTimeCharacteristic.");
+
setTtlTimeCharacteristic(TtlTimeCharacteristic.ProcessingTime);
+ return this;
+ }
+
+ /**
+ * Sets the time characteristic.
+ *
+ * @param ttlTimeCharacteristic The time characteristic
configures time scale to use for ttl.
+ */
+ @Nonnull
+ public Builder setTtlTimeCharacteristic(@Nonnull
TtlTimeCharacteristic ttlTimeCharacteristic) {
+ this.ttlTimeCharacteristic = ttlTimeCharacteristic;
return this;
}
@Nonnull
public Builder useProcessingTime() {
- return
setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+ return setTtlTimeCharacteristic(ProcessingTime);
}
/** Cleanup expired state in full snapshot on checkpoint. */
@@ -312,7 +342,7 @@ public class StateTtlConfig implements Serializable {
return new StateTtlConfig(
updateType,
stateVisibility,
- timeCharacteristic,
+ ttlTimeCharacteristic,
ttl,
cleanupStrategies);
}