[
https://issues.apache.org/jira/browse/FLINK-10531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16689921#comment-16689921
]
ASF GitHub Bot commented on FLINK-10531:
----------------------------------------
asfgit closed pull request #7036: [FLINK-10531][e2e] Fix unstable TTL
end-to-end test.
URL: https://github.com/apache/flink/pull/7036
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
index 3b2e4746b62..1a572f314c5 100644
---
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
+++
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
@@ -23,6 +23,7 @@
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
@@ -74,6 +75,8 @@ public static void main(String[] args) throws Exception {
setupEnvironment(env, pt);
+ final MonotonicTTLTimeProvider ttlTimeProvider =
setBackendWithCustomTTLTimeProvider(env);
+
int keySpace = pt.getInt(UPDATE_GENERATOR_SRC_KEYSPACE.key(),
UPDATE_GENERATOR_SRC_KEYSPACE.defaultValue());
long sleepAfterElements =
pt.getLong(UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.key(),
UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.defaultValue());
@@ -90,11 +93,27 @@ public static void main(String[] args) throws Exception {
.addSource(new TtlStateUpdateSource(keySpace,
sleepAfterElements, sleepTime))
.name("TtlStateUpdateSource")
.keyBy(TtlStateUpdate::getKey)
- .flatMap(new TtlVerifyUpdateFunction(ttlConfig,
reportStatAfterUpdatesNum))
+ .flatMap(new TtlVerifyUpdateFunction(ttlConfig,
ttlTimeProvider, reportStatAfterUpdatesNum))
.name("TtlVerifyUpdateFunction")
.addSink(new PrintSinkFunction<>())
.name("PrintFailedVerifications");
env.execute("State TTL test job");
}
+
+ /**
+ * Sets the state backend to a new {@link StubStateBackend} which has a
{@link MonotonicTTLTimeProvider}.
+ *
+ * @param env The {@link StreamExecutionEnvironment} of the job.
+ * @return The {@link MonotonicTTLTimeProvider}.
+ */
+ private static MonotonicTTLTimeProvider
setBackendWithCustomTTLTimeProvider(StreamExecutionEnvironment env) {
+ final MonotonicTTLTimeProvider ttlTimeProvider = new
MonotonicTTLTimeProvider();
+
+ final StateBackend configuredBackend = env.getStateBackend();
+ final StateBackend stubBackend = new
StubStateBackend(configuredBackend, ttlTimeProvider);
+ env.setStateBackend(stubBackend);
+
+ return ttlTimeProvider;
+ }
}
diff --git
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
new file mode 100644
index 00000000000..0b5637d36c4
--- /dev/null
+++
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.Serializable;
+
+/**
+ * A stub implementation of a {@link TtlTimeProvider} which guarantees that
+ * processing time increases monotonically.
+ */
+@NotThreadSafe
+final class MonotonicTTLTimeProvider implements TtlTimeProvider, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /*
+ * The following variables are static because the whole TTLTimeProvider
will go
+ * through serialization and, eventually, the state backend and the
task executing
+ * the TtlVerifyUpdateFunction will have different instances of it.
+ *
+ * If these were not static, then the TtlVerifyUpdateFunction would
e.g. freeze
+ * the time, but the backend would not be notified about it, resulting
in inconsistent
+ * state.
+ *
+ * If the number of task slots per TM changes, then we may need to add
also synchronization.
+ */
+
+ private static boolean timeIsFrozen = false;
+
+ private static long lastReturnedProcessingTime = Long.MIN_VALUE;
+
+ @Override
+ public long currentTimestamp() {
+ if (timeIsFrozen && lastReturnedProcessingTime !=
Long.MIN_VALUE) {
+ return lastReturnedProcessingTime;
+ }
+
+ timeIsFrozen = true;
+
+ final long currentProcessingTime = System.currentTimeMillis();
+ if (currentProcessingTime < lastReturnedProcessingTime) {
+ return lastReturnedProcessingTime;
+ }
+
+ lastReturnedProcessingTime = currentProcessingTime;
+ return lastReturnedProcessingTime;
+ }
+
+ long unfreezeTime() {
+ timeIsFrozen = false;
+ return lastReturnedProcessingTime;
+ }
+}
diff --git
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java
new file mode 100644
index 00000000000..b93fa362d7f
--- /dev/null
+++
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A stub implementation of the {@link StateBackend} that allows the use of
+ * a custom {@link TtlTimeProvider}.
+ */
+final class StubStateBackend implements StateBackend {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TtlTimeProvider ttlTimeProvider;
+
+ private final StateBackend backend;
+
+ StubStateBackend(final StateBackend wrappedBackend, final
TtlTimeProvider ttlTimeProvider) {
+ this.backend = checkNotNull(wrappedBackend);
+ this.ttlTimeProvider = checkNotNull(ttlTimeProvider);
+ }
+
+ @Override
+ public CompletedCheckpointStorageLocation resolveCheckpoint(String
externalPointer) throws IOException {
+ return backend.resolveCheckpoint(externalPointer);
+ }
+
+ @Override
+ public CheckpointStorage createCheckpointStorage(JobID jobId) throws
IOException {
+ return backend.createCheckpointStorage(jobId);
+ }
+
+ @Override
+ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+ Environment env,
+ JobID jobID,
+ String operatorIdentifier,
+ TypeSerializer<K> keySerializer,
+ int numberOfKeyGroups,
+ KeyGroupRange keyGroupRange,
+ TaskKvStateRegistry kvStateRegistry,
+ TtlTimeProvider ttlTimeProvider,
+ MetricGroup metricGroup) throws Exception {
+
+ return backend.createKeyedStateBackend(
+ env,
+ jobID,
+ operatorIdentifier,
+ keySerializer,
+ numberOfKeyGroups,
+ keyGroupRange,
+ kvStateRegistry,
+ this.ttlTimeProvider,
+ metricGroup
+ );
+ }
+
+ @Override
+ public OperatorStateBackend createOperatorStateBackend(Environment env,
String operatorIdentifier) throws Exception {
+ return backend.createOperatorStateBackend(env,
operatorIdentifier);
+ }
+}
diff --git
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java
index e89b544cc66..20e21ead233 100644
---
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java
+++
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java
@@ -25,6 +25,9 @@
/** Randomly generated keyed state updates per state type. */
class TtlStateUpdate implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
private final int key;
@Nonnull
diff --git
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java
index 6aff14e1cd3..7404adf7771 100644
---
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java
+++
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java
@@ -33,6 +33,9 @@
* and waits for {@code sleepTime} to continue generation.
*/
class TtlStateUpdateSource extends RichParallelSourceFunction<TtlStateUpdate> {
+
+ private static final long serialVersionUID = 1L;
+
private final int maxKey;
private final long sleepAfterElements;
private final long sleepTime;
diff --git
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
index 3cfb0e2d86e..250041d0e34 100644
---
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
+++
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
@@ -33,7 +33,6 @@
import org.apache.flink.streaming.tests.verify.TtlVerificationContext;
import org.apache.flink.streaming.tests.verify.ValueWithTs;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,12 +40,14 @@
import javax.annotation.Nonnull;
import java.io.Serializable;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
/**
* Update state with TTL for each verifier.
*
@@ -62,21 +63,26 @@
* - verifies last update against previous updates
* - emits verification context in case of failure
*/
-class TtlVerifyUpdateFunction
- extends RichFlatMapFunction<TtlStateUpdate, String> implements
CheckpointedFunction {
+class TtlVerifyUpdateFunction extends RichFlatMapFunction<TtlStateUpdate,
String> implements CheckpointedFunction {
+
+ private static final long serialVersionUID = 1L;
+
private static final Logger LOG =
LoggerFactory.getLogger(TtlVerifyUpdateFunction.class);
@Nonnull
private final StateTtlConfig ttlConfig;
- private final long ttl;
+ private final MonotonicTTLTimeProvider ttlTimeProvider;
private final UpdateStat stat;
private transient Map<String, State> states;
private transient Map<String, ListState<ValueWithTs<?>>>
prevUpdatesByVerifierId;
- TtlVerifyUpdateFunction(@Nonnull StateTtlConfig ttlConfig, long
reportStatAfterUpdatesNum) {
+ TtlVerifyUpdateFunction(
+ @Nonnull StateTtlConfig ttlConfig,
+ MonotonicTTLTimeProvider ttlTimeProvider,
+ long reportStatAfterUpdatesNum) {
this.ttlConfig = ttlConfig;
- this.ttl = ttlConfig.getTtl().toMilliseconds();
+ this.ttlTimeProvider = checkNotNull(ttlTimeProvider);
this.stat = new UpdateStat(reportStatAfterUpdatesNum);
}
@@ -91,17 +97,13 @@ public void flatMap(TtlStateUpdate updates,
Collector<String> out) throws Except
}
private TtlVerificationContext<?, ?>
generateUpdateAndVerificationContext(
- TtlStateUpdate updates, TtlStateVerifier<?, ?> verifier) throws
Exception {
+ TtlStateUpdate updates,
+ TtlStateVerifier<?, ?> verifier) throws Exception {
+
List<ValueWithTs<?>> prevUpdates =
getPrevUpdates(verifier.getId());
Object update = updates.getUpdate(verifier.getId());
TtlUpdateContext<?, ?> updateContext = performUpdate(verifier,
update);
- boolean clashes =
updateClashesWithPrevUpdates(updateContext.getUpdateWithTs(), prevUpdates);
- if (clashes) {
- resetState(verifier.getId());
- prevUpdates = Collections.emptyList();
- updateContext = performUpdate(verifier, update);
- }
- stat.update(clashes, prevUpdates.size());
+ stat.update(prevUpdates.size());
prevUpdatesByVerifierId.get(verifier.getId()).add(updateContext.getUpdateWithTs());
return new TtlVerificationContext<>(updates.getKey(),
verifier.getId(), prevUpdates, updateContext);
}
@@ -113,33 +115,22 @@ public void flatMap(TtlStateUpdate updates,
Collector<String> out) throws Except
}
private TtlUpdateContext<?, ?> performUpdate(
- TtlStateVerifier<?, ?> verifier, Object update) throws
Exception {
+ TtlStateVerifier<?, ?> verifier,
+ Object update) throws Exception {
+
+ final long timestampBeforeUpdate =
ttlTimeProvider.currentTimestamp();
State state = states.get(verifier.getId());
- long timestampBeforeUpdate = System.currentTimeMillis();
Object valueBeforeUpdate = verifier.get(state);
verifier.update(state, update);
Object updatedValue = verifier.get(state);
- return new TtlUpdateContext<>(timestampBeforeUpdate,
- valueBeforeUpdate, update, updatedValue,
System.currentTimeMillis());
- }
+ final long timestampAfterUpdate =
ttlTimeProvider.unfreezeTime();
- private boolean updateClashesWithPrevUpdates(ValueWithTs<?> update,
List<ValueWithTs<?>> prevUpdates) {
- return tooSlow(update) ||
- (!prevUpdates.isEmpty() &&
prevUpdates.stream().anyMatch(pu -> updatesClash(pu, update)));
- }
-
- private boolean tooSlow(ValueWithTs<?> update) {
- return update.getTimestampAfterUpdate() -
update.getTimestampBeforeUpdate() >= ttl;
- }
+ checkState(
+ timestampAfterUpdate == timestampBeforeUpdate,
+ "Timestamps before and after the update do not
match."
+ );
- private boolean updatesClash(ValueWithTs<?> prevUpdate, ValueWithTs<?>
nextUpdate) {
- return prevUpdate.getTimestampAfterUpdate() + ttl >=
nextUpdate.getTimestampBeforeUpdate() &&
- prevUpdate.getTimestampBeforeUpdate() + ttl <=
nextUpdate.getTimestampAfterUpdate();
- }
-
- private void resetState(String verifierId) {
- states.get(verifierId).clear();
- prevUpdatesByVerifierId.get(verifierId).clear();
+ return new TtlUpdateContext<>(valueBeforeUpdate, update,
updatedValue, timestampAfterUpdate);
}
@Override
@@ -153,7 +144,7 @@ public void initializeState(FunctionInitializationContext
context) {
.collect(Collectors.toMap(TtlStateVerifier::getId, v ->
v.createState(context, ttlConfig)));
prevUpdatesByVerifierId = TtlStateVerifier.VERIFIERS.stream()
.collect(Collectors.toMap(TtlStateVerifier::getId, v ->
{
- Preconditions.checkNotNull(v);
+ checkNotNull(v);
TypeSerializer<ValueWithTs<?>> typeSerializer =
new ValueWithTs.Serializer(v.getUpdateSerializer());
ListStateDescriptor<ValueWithTs<?>> stateDesc =
new ListStateDescriptor<>(
"TtlPrevValueState_" + v.getId(),
typeSerializer);
@@ -165,22 +156,17 @@ public void initializeState(FunctionInitializationContext
context) {
private static class UpdateStat implements Serializable {
final long reportStatAfterUpdatesNum;
long updates = 0;
- long clashes = 0;
long prevUpdatesNum = 0;
UpdateStat(long reportStatAfterUpdatesNum) {
this.reportStatAfterUpdatesNum =
reportStatAfterUpdatesNum;
}
- void update(boolean clash, long prevUpdatesSize) {
+ void update(long prevUpdatesSize) {
updates++;
- if (clash) {
- clashes++;
- }
prevUpdatesNum += prevUpdatesSize;
if (updates % reportStatAfterUpdatesNum == 0) {
- LOG.info(String.format("Avg update chain
length: %d, clash stat: %d/%d",
- prevUpdatesNum / updates, clashes,
updates));
+ LOG.info(String.format("Avg update chain
length: %d", prevUpdatesNum / updates));
}
}
}
diff --git
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java
index 7b6def24204..46becbbbc18 100644
---
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java
+++
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java
@@ -86,15 +86,18 @@ public void update(@Nonnull State state, Object update)
throws Exception {
@Override
public boolean verify(@Nonnull TtlVerificationContext<?, ?>
verificationContextRaw) {
TtlVerificationContext<UV, GV> verificationContext =
(TtlVerificationContext<UV, GV>) verificationContextRaw;
- List<ValueWithTs<UV>> updates = new
ArrayList<>(verificationContext.getPrevUpdates());
- long currentTimestamp =
verificationContext.getUpdateContext().getTimestampBeforeUpdate();
- GV prevValue = expected(updates, currentTimestamp);
+ long currentTimestamp =
verificationContext.getUpdateContext().getTimestamp();
+
GV valueBeforeUpdate =
verificationContext.getUpdateContext().getValueBeforeUpdate();
+ List<ValueWithTs<UV>> updates = new
ArrayList<>(verificationContext.getPrevUpdates());
+ GV expectedValueBeforeUpdate = expected(updates,
currentTimestamp);
+
+ GV valueAfterUpdate =
verificationContext.getUpdateContext().getValueAfterUpdate();
ValueWithTs<UV> update =
verificationContext.getUpdateContext().getUpdateWithTs();
- GV updatedValue =
verificationContext.getUpdateContext().getUpdatedValue();
updates.add(update);
- GV expectedValue = expected(updates, currentTimestamp);
- return Objects.equals(valueBeforeUpdate, prevValue) &&
Objects.equals(updatedValue, expectedValue);
+ GV expectedValueAfterUpdate = expected(updates,
currentTimestamp);
+
+ return Objects.equals(valueBeforeUpdate,
expectedValueBeforeUpdate) && Objects.equals(valueAfterUpdate,
expectedValueAfterUpdate);
}
abstract GV expected(@Nonnull List<ValueWithTs<UV>> updates, long
currentTimestamp);
diff --git
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java
index 960bbe72401..8a629578679 100644
---
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java
+++
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java
@@ -71,13 +71,13 @@ String expected(@Nonnull List<ValueWithTs<Integer>>
updates, long currentTimesta
return null;
}
long acc = AGG_FUNC.createAccumulator();
- long lastTs = updates.get(0).getTimestampAfterUpdate();
+ long lastTs = updates.get(0).getTimestamp();
for (ValueWithTs<Integer> update : updates) {
- if (expired(lastTs, update.getTimestampAfterUpdate())) {
+ if (expired(lastTs, update.getTimestamp())) {
acc = AGG_FUNC.createAccumulator();
}
acc = AGG_FUNC.add(update.getValue(), acc);
- lastTs = update.getTimestampAfterUpdate();
+ lastTs = update.getTimestamp();
}
return expired(lastTs, currentTimestamp) ? null :
AGG_FUNC.getResult(acc);
}
diff --git
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java
index c1cc761b0f3..bcc85905863 100644
---
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java
+++
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java
@@ -74,13 +74,13 @@ Long expected(@Nonnull List<ValueWithTs<Integer>> updates,
long currentTimestamp
return null;
}
long acc = INIT_VAL;
- long lastTs = updates.get(0).getTimestampAfterUpdate();
+ long lastTs = updates.get(0).getTimestamp();
for (ValueWithTs<Integer> update : updates) {
- if (expired(lastTs, update.getTimestampAfterUpdate())) {
+ if (expired(lastTs, update.getTimestamp())) {
acc = INIT_VAL;
}
acc += update.getValue();
- lastTs = update.getTimestampAfterUpdate();
+ lastTs = update.getTimestamp();
}
return expired(lastTs, currentTimestamp) ? null : acc;
}
diff --git
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java
index b355aa9861a..4aed98ffd2f 100644
---
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java
+++
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java
@@ -71,7 +71,7 @@ void updateInternal(@Nonnull ListState<String> state, String
update) throws Exce
@Nonnull
List<String> expected(@Nonnull List<ValueWithTs<String>> updates, long
currentTimestamp) {
return updates.stream()
- .filter(u -> !expired(u.getTimestampAfterUpdate(),
currentTimestamp))
+ .filter(u -> !expired(u.getTimestamp(),
currentTimestamp))
.map(ValueWithTs::getValue)
.collect(Collectors.toList());
}
diff --git
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java
index a9d6b36f62f..eeda78d73c2 100644
---
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java
+++
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java
@@ -87,7 +87,7 @@ void updateInternal(@Nonnull MapState<String, String> state,
Tuple2<String, Stri
.collect(Collectors.groupingBy(u -> u.getValue().f0))
.entrySet().stream()
.map(e -> e.getValue().get(e.getValue().size() - 1))
- .filter(u -> !expired(u.getTimestampAfterUpdate(),
currentTimestamp))
+ .filter(u -> !expired(u.getTimestamp(),
currentTimestamp))
.map(ValueWithTs::getValue)
.collect(Collectors.toMap(u -> u.f0, u -> u.f1));
}
diff --git
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java
index 773be05e04b..cd33ed08f4e 100644
---
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java
+++
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java
@@ -73,13 +73,13 @@ Integer expected(@Nonnull List<ValueWithTs<Integer>>
updates, long currentTimest
return null;
}
int acc = 0;
- long lastTs = updates.get(0).getTimestampAfterUpdate();
+ long lastTs = updates.get(0).getTimestamp();
for (ValueWithTs<Integer> update : updates) {
- if (expired(lastTs, update.getTimestampAfterUpdate())) {
+ if (expired(lastTs, update.getTimestamp())) {
acc = 0;
}
acc += update.getValue();
- lastTs = update.getTimestampAfterUpdate();
+ lastTs = update.getTimestamp();
}
return expired(lastTs, currentTimestamp) ? null : acc;
}
diff --git
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java
index 959340b408d..61bf9e22c63 100644
---
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java
+++
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java
@@ -24,25 +24,25 @@
/** Contains context relevant for state update with TTL. */
public class TtlUpdateContext<UV, GV> implements Serializable {
- private final long timestampBeforeUpdate;
+
private final GV valueBeforeUpdate;
private final UV update;
- private final GV updatedValue;
- private final long timestampAfterUpdate;
+ private final GV valueAfterUpdate;
+ private final long timestamp;
public TtlUpdateContext(
- long timestampBeforeUpdate,
- GV valueBeforeUpdate, UV update, GV updatedValue,
- long timestampAfterUpdate) {
+ GV valueBeforeUpdate,
+ UV update,
+ GV updatedValue,
+ long timestamp) {
this.valueBeforeUpdate = valueBeforeUpdate;
this.update = update;
- this.updatedValue = updatedValue;
- this.timestampBeforeUpdate = timestampBeforeUpdate;
- this.timestampAfterUpdate = timestampAfterUpdate;
+ this.valueAfterUpdate = updatedValue;
+ this.timestamp = timestamp;
}
- long getTimestampBeforeUpdate() {
- return timestampBeforeUpdate;
+ long getTimestamp() {
+ return timestamp;
}
GV getValueBeforeUpdate() {
@@ -51,21 +51,20 @@ GV getValueBeforeUpdate() {
@Nonnull
public ValueWithTs<UV> getUpdateWithTs() {
- return new ValueWithTs<>(update, timestampBeforeUpdate,
timestampAfterUpdate);
+ return new ValueWithTs<>(update, timestamp);
}
- GV getUpdatedValue() {
- return updatedValue;
+ GV getValueAfterUpdate() {
+ return valueAfterUpdate;
}
@Override
public String toString() {
return "TtlUpdateContext{" +
- "timestampBeforeUpdate=" + timestampBeforeUpdate +
- ", valueBeforeUpdate=" + valueBeforeUpdate +
+ "valueBeforeUpdate=" + valueBeforeUpdate +
", update=" + update +
- ", updatedValue=" + updatedValue +
- ", timestampAfterUpdate=" + timestampAfterUpdate +
+ ", valueAfterUpdate=" + valueAfterUpdate +
+ ", timestamp=" + timestamp +
'}';
}
}
diff --git
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java
index fa4929b933f..d8bdfd4e79d 100644
---
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java
+++
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java
@@ -61,6 +61,6 @@ String expected(@Nonnull List<ValueWithTs<String>> updates,
long currentTimestam
return null;
}
ValueWithTs<String> lastUpdate = updates.get(updates.size() -
1);
- return expired(lastUpdate.getTimestampAfterUpdate(),
currentTimestamp) ? null : lastUpdate.getValue();
+ return expired(lastUpdate.getTimestamp(), currentTimestamp) ?
null : lastUpdate.getValue();
}
}
diff --git
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java
index 4c985cd525b..6d04c4cb219 100644
---
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java
+++
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java
@@ -36,10 +36,10 @@
@SuppressWarnings("unchecked")
public TtlVerificationContext(
- int key,
- @Nonnull String verifierId,
- @Nonnull List<ValueWithTs<?>> prevUpdates,
- @Nonnull TtlUpdateContext<?, ?> updateContext) {
+ int key,
+ @Nonnull String verifierId,
+ @Nonnull List<ValueWithTs<?>> prevUpdates,
+ @Nonnull TtlUpdateContext<?, ?> updateContext) {
this.key = key;
this.verifierId = verifierId;
this.prevUpdates = new ArrayList<>();
diff --git
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
index 9302377d9ed..a4f30804e4f 100644
---
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
+++
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
@@ -30,33 +30,26 @@
/** User state value with timestamps before and after update. */
public class ValueWithTs<V> implements Serializable {
private final V value;
- private final long timestampBeforeUpdate;
- private final long timestampAfterUpdate;
+ private final long timestamp;
- public ValueWithTs(V value, long timestampBeforeUpdate, long
timestampAfterUpdate) {
+ public ValueWithTs(V value, long timestamp) {
this.value = value;
- this.timestampBeforeUpdate = timestampBeforeUpdate;
- this.timestampAfterUpdate = timestampAfterUpdate;
+ this.timestamp = timestamp;
}
V getValue() {
return value;
}
- public long getTimestampBeforeUpdate() {
- return timestampBeforeUpdate;
- }
-
- public long getTimestampAfterUpdate() {
- return timestampAfterUpdate;
+ long getTimestamp() {
+ return timestamp;
}
@Override
public String toString() {
return "ValueWithTs{" +
"value=" + value +
- ", timestampBeforeUpdate=" + timestampBeforeUpdate +
- ", timestampAfterUpdate=" + timestampAfterUpdate +
+ ", timestamp=" + timestamp +
'}';
}
@@ -64,7 +57,7 @@ public String toString() {
public static class Serializer extends
CompositeSerializer<ValueWithTs<?>> {
public Serializer(TypeSerializer<?> userValueSerializer) {
- super(true, userValueSerializer,
LongSerializer.INSTANCE, LongSerializer.INSTANCE);
+ super(true, userValueSerializer,
LongSerializer.INSTANCE);
}
@SuppressWarnings("unchecked")
@@ -74,7 +67,7 @@ public Serializer(TypeSerializer<?> userValueSerializer) {
@Override
public ValueWithTs<?> createInstance(@Nonnull Object ...
values) {
- return new ValueWithTs<>(values[0], (Long) values[1],
(Long) values[2]);
+ return new ValueWithTs<>(values[0], (Long) values[1]);
}
@Override
@@ -88,9 +81,7 @@ protected Object getField(@Nonnull ValueWithTs<?> value, int
index) {
case 0:
return value.getValue();
case 1:
- return value.getTimestampBeforeUpdate();
- case 2:
- return value.getTimestampAfterUpdate();
+ return value.getTimestamp();
default:
throw new
FlinkRuntimeException("Unexpected field index for ValueWithTs");
}
@@ -99,8 +90,8 @@ protected Object getField(@Nonnull ValueWithTs<?> value, int
index) {
@SuppressWarnings("unchecked")
@Override
protected CompositeSerializer<ValueWithTs<?>>
createSerializerInstance(
- PrecomputedParameters precomputed,
- TypeSerializer<?>... originalSerializers) {
+ PrecomputedParameters precomputed,
+ TypeSerializer<?>... originalSerializers) {
return new Serializer(precomputed,
(TypeSerializer<Object>) originalSerializers[0]);
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> State TTL RocksDb backend end-to-end test failed on Travis
> ----------------------------------------------------------
>
> Key: FLINK-10531
> URL: https://issues.apache.org/jira/browse/FLINK-10531
> Project: Flink
> Issue Type: Bug
> Components: Tests
> Affects Versions: 1.6.1
> Reporter: Till Rohrmann
> Assignee: Kostas Kloudas
> Priority: Critical
> Labels: pull-request-available, test-stability
> Fix For: 1.7.0
>
>
> The {{State TTL RocksDb backend end-to-end test}} end-to-end test failed on
> Travis.
> https://travis-ci.org/apache/flink/jobs/438226190
> https://api.travis-ci.org/v3/job/438226190/log.txt
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)