This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 52c2ad0  [FLINK-10531][e2e] Fix unstable TTL end-to-end test.
52c2ad0 is described below

commit 52c2ad04c25f277ed65b164f53ce278a7380f058
Author: kkloudas <kklou...@gmail.com>
AuthorDate: Tue Nov 6 18:02:10 2018 +0100

    [FLINK-10531][e2e] Fix unstable TTL end-to-end test.
    
    This closes #7036.
---
 .../tests/DataStreamStateTTLTestProgram.java       | 21 ++++-
 .../streaming/tests/MonotonicTTLTimeProvider.java  | 73 +++++++++++++++++
 .../flink/streaming/tests/StubStateBackend.java    | 94 ++++++++++++++++++++++
 .../flink/streaming/tests/TtlStateUpdate.java      |  3 +
 .../streaming/tests/TtlStateUpdateSource.java      |  3 +
 .../streaming/tests/TtlVerifyUpdateFunction.java   | 74 +++++++----------
 .../tests/verify/AbstractTtlStateVerifier.java     | 15 ++--
 .../tests/verify/TtlAggregatingStateVerifier.java  |  6 +-
 .../tests/verify/TtlFoldingStateVerifier.java      |  6 +-
 .../tests/verify/TtlListStateVerifier.java         |  2 +-
 .../tests/verify/TtlMapStateVerifier.java          |  2 +-
 .../tests/verify/TtlReducingStateVerifier.java     |  6 +-
 .../streaming/tests/verify/TtlUpdateContext.java   | 35 ++++----
 .../tests/verify/TtlValueStateVerifier.java        |  2 +-
 .../tests/verify/TtlVerificationContext.java       |  8 +-
 .../flink/streaming/tests/verify/ValueWithTs.java  | 31 +++----
 16 files changed, 276 insertions(+), 105 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
index 3b2e474..1a572f3 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
 
@@ -74,6 +75,8 @@ public class DataStreamStateTTLTestProgram {
 
                setupEnvironment(env, pt);
 
+               final MonotonicTTLTimeProvider ttlTimeProvider = 
setBackendWithCustomTTLTimeProvider(env);
+
                int keySpace = pt.getInt(UPDATE_GENERATOR_SRC_KEYSPACE.key(), 
UPDATE_GENERATOR_SRC_KEYSPACE.defaultValue());
                long sleepAfterElements = 
pt.getLong(UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.key(),
                        
UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.defaultValue());
@@ -90,11 +93,27 @@ public class DataStreamStateTTLTestProgram {
                        .addSource(new TtlStateUpdateSource(keySpace, 
sleepAfterElements, sleepTime))
                        .name("TtlStateUpdateSource")
                        .keyBy(TtlStateUpdate::getKey)
-                       .flatMap(new TtlVerifyUpdateFunction(ttlConfig, 
reportStatAfterUpdatesNum))
+                       .flatMap(new TtlVerifyUpdateFunction(ttlConfig, 
ttlTimeProvider, reportStatAfterUpdatesNum))
                        .name("TtlVerifyUpdateFunction")
                        .addSink(new PrintSinkFunction<>())
                        .name("PrintFailedVerifications");
 
                env.execute("State TTL test job");
        }
+
+       /**
+        * Sets the state backend to a new {@link StubStateBackend} which has a 
{@link MonotonicTTLTimeProvider}.
+        *
+        * @param env The {@link StreamExecutionEnvironment} of the job.
+        * @return The {@link MonotonicTTLTimeProvider}.
+        */
+       private static MonotonicTTLTimeProvider 
setBackendWithCustomTTLTimeProvider(StreamExecutionEnvironment env) {
+               final MonotonicTTLTimeProvider ttlTimeProvider = new 
MonotonicTTLTimeProvider();
+
+               final StateBackend configuredBackend = env.getStateBackend();
+               final StateBackend stubBackend = new 
StubStateBackend(configuredBackend, ttlTimeProvider);
+               env.setStateBackend(stubBackend);
+
+               return ttlTimeProvider;
+       }
 }
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
new file mode 100644
index 0000000..0b5637d
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.Serializable;
+
+/**
+ * A stub implementation of a {@link TtlTimeProvider} which guarantees that
+ * processing time increases monotonically.
+ */
+@NotThreadSafe
+final class MonotonicTTLTimeProvider implements TtlTimeProvider, Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       /*
+        * The following variables are static because the whole TTLTimeProvider 
will go
+        * through serialization and, eventually, the state backend and the 
task executing
+        * the TtlVerifyUpdateFunction will have different instances of it.
+        *
+        * If these were not static, then the TtlVerifyUpdateFunction would 
e.g. freeze
+        * the time, but the backend would not be notified about it, resulting 
in inconsistent
+        * state.
+        *
+        * If the number of task slots per TM changes, then we may need to add 
also synchronization.
+        */
+
+       private static boolean timeIsFrozen = false;
+
+       private static long lastReturnedProcessingTime = Long.MIN_VALUE;
+
+       @Override
+       public long currentTimestamp() {
+               if (timeIsFrozen && lastReturnedProcessingTime != 
Long.MIN_VALUE) {
+                       return lastReturnedProcessingTime;
+               }
+
+               timeIsFrozen = true;
+
+               final long currentProcessingTime = System.currentTimeMillis();
+               if (currentProcessingTime < lastReturnedProcessingTime) {
+                       return lastReturnedProcessingTime;
+               }
+
+               lastReturnedProcessingTime = currentProcessingTime;
+               return lastReturnedProcessingTime;
+       }
+
+       long unfreezeTime() {
+               timeIsFrozen = false;
+               return lastReturnedProcessingTime;
+       }
+}
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java
new file mode 100644
index 0000000..b93fa36
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A stub implementation of the {@link StateBackend} that allows the use of
+ * a custom {@link TtlTimeProvider}.
+ */
+final class StubStateBackend implements StateBackend {
+
+       private static final long serialVersionUID = 1L;
+
+       private final TtlTimeProvider ttlTimeProvider;
+
+       private final StateBackend backend;
+
+       StubStateBackend(final StateBackend wrappedBackend, final 
TtlTimeProvider ttlTimeProvider) {
+               this.backend = checkNotNull(wrappedBackend);
+               this.ttlTimeProvider = checkNotNull(ttlTimeProvider);
+       }
+
+       @Override
+       public CompletedCheckpointStorageLocation resolveCheckpoint(String 
externalPointer) throws IOException {
+               return backend.resolveCheckpoint(externalPointer);
+       }
+
+       @Override
+       public CheckpointStorage createCheckpointStorage(JobID jobId) throws 
IOException {
+               return backend.createCheckpointStorage(jobId);
+       }
+
+       @Override
+       public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+                       Environment env,
+                       JobID jobID,
+                       String operatorIdentifier,
+                       TypeSerializer<K> keySerializer,
+                       int numberOfKeyGroups,
+                       KeyGroupRange keyGroupRange,
+                       TaskKvStateRegistry kvStateRegistry,
+                       TtlTimeProvider ttlTimeProvider,
+                       MetricGroup metricGroup) throws Exception {
+
+               return backend.createKeyedStateBackend(
+                               env,
+                               jobID,
+                               operatorIdentifier,
+                               keySerializer,
+                               numberOfKeyGroups,
+                               keyGroupRange,
+                               kvStateRegistry,
+                               this.ttlTimeProvider,
+                               metricGroup
+               );
+       }
+
+       @Override
+       public OperatorStateBackend createOperatorStateBackend(Environment env, 
String operatorIdentifier) throws Exception {
+               return backend.createOperatorStateBackend(env, 
operatorIdentifier);
+       }
+}
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java
index e89b544..20e21ea 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java
@@ -25,6 +25,9 @@ import java.util.Map;
 
 /** Randomly generated keyed state updates per state type. */
 class TtlStateUpdate implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
        private final int key;
 
        @Nonnull
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java
index 6aff14e..7404adf 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java
@@ -33,6 +33,9 @@ import java.util.stream.Collectors;
  * and waits for {@code sleepTime} to continue generation.
  */
 class TtlStateUpdateSource extends RichParallelSourceFunction<TtlStateUpdate> {
+
+       private static final long serialVersionUID = 1L;
+
        private final int maxKey;
        private final long sleepAfterElements;
        private final long sleepTime;
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
index 3cfb0e2..250041d 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
@@ -33,7 +33,6 @@ import 
org.apache.flink.streaming.tests.verify.TtlUpdateContext;
 import org.apache.flink.streaming.tests.verify.TtlVerificationContext;
 import org.apache.flink.streaming.tests.verify.ValueWithTs;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,12 +40,14 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 
 import java.io.Serializable;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * Update state with TTL for each verifier.
  *
@@ -62,21 +63,26 @@ import java.util.stream.StreamSupport;
  * - verifies last update against previous updates
  * - emits verification context in case of failure
  */
-class TtlVerifyUpdateFunction
-       extends RichFlatMapFunction<TtlStateUpdate, String> implements 
CheckpointedFunction {
+class TtlVerifyUpdateFunction extends RichFlatMapFunction<TtlStateUpdate, 
String> implements CheckpointedFunction {
+
+       private static final long serialVersionUID = 1L;
+
        private static final Logger LOG = 
LoggerFactory.getLogger(TtlVerifyUpdateFunction.class);
 
        @Nonnull
        private final StateTtlConfig ttlConfig;
-       private final long ttl;
+       private final MonotonicTTLTimeProvider ttlTimeProvider;
        private final UpdateStat stat;
 
        private transient Map<String, State> states;
        private transient Map<String, ListState<ValueWithTs<?>>> 
prevUpdatesByVerifierId;
 
-       TtlVerifyUpdateFunction(@Nonnull StateTtlConfig ttlConfig, long 
reportStatAfterUpdatesNum) {
+       TtlVerifyUpdateFunction(
+                       @Nonnull StateTtlConfig ttlConfig,
+                       MonotonicTTLTimeProvider ttlTimeProvider,
+                       long reportStatAfterUpdatesNum) {
                this.ttlConfig = ttlConfig;
-               this.ttl = ttlConfig.getTtl().toMilliseconds();
+               this.ttlTimeProvider = checkNotNull(ttlTimeProvider);
                this.stat = new UpdateStat(reportStatAfterUpdatesNum);
        }
 
@@ -91,17 +97,13 @@ class TtlVerifyUpdateFunction
        }
 
        private TtlVerificationContext<?, ?> 
generateUpdateAndVerificationContext(
-               TtlStateUpdate updates, TtlStateVerifier<?, ?> verifier) throws 
Exception {
+                       TtlStateUpdate updates,
+                       TtlStateVerifier<?, ?> verifier) throws Exception {
+
                List<ValueWithTs<?>> prevUpdates = 
getPrevUpdates(verifier.getId());
                Object update = updates.getUpdate(verifier.getId());
                TtlUpdateContext<?, ?> updateContext = performUpdate(verifier, 
update);
-               boolean clashes = 
updateClashesWithPrevUpdates(updateContext.getUpdateWithTs(), prevUpdates);
-               if (clashes) {
-                       resetState(verifier.getId());
-                       prevUpdates = Collections.emptyList();
-                       updateContext = performUpdate(verifier, update);
-               }
-               stat.update(clashes, prevUpdates.size());
+               stat.update(prevUpdates.size());
                
prevUpdatesByVerifierId.get(verifier.getId()).add(updateContext.getUpdateWithTs());
                return new TtlVerificationContext<>(updates.getKey(), 
verifier.getId(), prevUpdates, updateContext);
        }
@@ -113,33 +115,22 @@ class TtlVerifyUpdateFunction
        }
 
        private TtlUpdateContext<?, ?> performUpdate(
-               TtlStateVerifier<?, ?> verifier, Object update) throws 
Exception {
+                       TtlStateVerifier<?, ?> verifier,
+                       Object update) throws Exception {
+
+               final long timestampBeforeUpdate = 
ttlTimeProvider.currentTimestamp();
                State state = states.get(verifier.getId());
-               long timestampBeforeUpdate = System.currentTimeMillis();
                Object valueBeforeUpdate = verifier.get(state);
                verifier.update(state, update);
                Object updatedValue = verifier.get(state);
-               return new TtlUpdateContext<>(timestampBeforeUpdate,
-                       valueBeforeUpdate, update, updatedValue, 
System.currentTimeMillis());
-       }
+               final long timestampAfterUpdate = 
ttlTimeProvider.unfreezeTime();
 
-       private boolean updateClashesWithPrevUpdates(ValueWithTs<?> update, 
List<ValueWithTs<?>> prevUpdates) {
-               return tooSlow(update) ||
-                       (!prevUpdates.isEmpty() && 
prevUpdates.stream().anyMatch(pu -> updatesClash(pu, update)));
-       }
-
-       private boolean tooSlow(ValueWithTs<?> update) {
-               return update.getTimestampAfterUpdate() - 
update.getTimestampBeforeUpdate() >= ttl;
-       }
+               checkState(
+                               timestampAfterUpdate == timestampBeforeUpdate,
+                               "Timestamps before and after the update do not 
match."
+               );
 
-       private boolean updatesClash(ValueWithTs<?> prevUpdate, ValueWithTs<?> 
nextUpdate) {
-               return prevUpdate.getTimestampAfterUpdate() + ttl >= 
nextUpdate.getTimestampBeforeUpdate() &&
-                       prevUpdate.getTimestampBeforeUpdate() + ttl <= 
nextUpdate.getTimestampAfterUpdate();
-       }
-
-       private void resetState(String verifierId) {
-               states.get(verifierId).clear();
-               prevUpdatesByVerifierId.get(verifierId).clear();
+               return new TtlUpdateContext<>(valueBeforeUpdate, update, 
updatedValue, timestampAfterUpdate);
        }
 
        @Override
@@ -153,7 +144,7 @@ class TtlVerifyUpdateFunction
                        .collect(Collectors.toMap(TtlStateVerifier::getId, v -> 
v.createState(context, ttlConfig)));
                prevUpdatesByVerifierId = TtlStateVerifier.VERIFIERS.stream()
                        .collect(Collectors.toMap(TtlStateVerifier::getId, v -> 
{
-                               Preconditions.checkNotNull(v);
+                               checkNotNull(v);
                                TypeSerializer<ValueWithTs<?>> typeSerializer = 
new ValueWithTs.Serializer(v.getUpdateSerializer());
                                ListStateDescriptor<ValueWithTs<?>> stateDesc = 
new ListStateDescriptor<>(
                                        "TtlPrevValueState_" + v.getId(), 
typeSerializer);
@@ -165,22 +156,17 @@ class TtlVerifyUpdateFunction
        private static class UpdateStat implements Serializable {
                final long reportStatAfterUpdatesNum;
                long updates = 0;
-               long clashes = 0;
                long prevUpdatesNum = 0;
 
                UpdateStat(long reportStatAfterUpdatesNum) {
                        this.reportStatAfterUpdatesNum = 
reportStatAfterUpdatesNum;
                }
 
-               void update(boolean clash, long prevUpdatesSize) {
+               void update(long prevUpdatesSize) {
                        updates++;
-                       if (clash) {
-                               clashes++;
-                       }
                        prevUpdatesNum += prevUpdatesSize;
                        if (updates % reportStatAfterUpdatesNum == 0) {
-                               LOG.info(String.format("Avg update chain 
length: %d, clash stat: %d/%d",
-                                       prevUpdatesNum / updates, clashes, 
updates));
+                               LOG.info(String.format("Avg update chain 
length: %d", prevUpdatesNum / updates));
                        }
                }
        }
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java
index 7b6def2..46becbb 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java
@@ -86,15 +86,18 @@ abstract class AbstractTtlStateVerifier<D extends 
StateDescriptor<S, SV>, S exte
        @Override
        public boolean verify(@Nonnull TtlVerificationContext<?, ?> 
verificationContextRaw) {
                TtlVerificationContext<UV, GV> verificationContext = 
(TtlVerificationContext<UV, GV>) verificationContextRaw;
-               List<ValueWithTs<UV>> updates = new 
ArrayList<>(verificationContext.getPrevUpdates());
-               long currentTimestamp = 
verificationContext.getUpdateContext().getTimestampBeforeUpdate();
-               GV prevValue = expected(updates, currentTimestamp);
+               long currentTimestamp = 
verificationContext.getUpdateContext().getTimestamp();
+
                GV valueBeforeUpdate = 
verificationContext.getUpdateContext().getValueBeforeUpdate();
+               List<ValueWithTs<UV>> updates = new 
ArrayList<>(verificationContext.getPrevUpdates());
+               GV expectedValueBeforeUpdate = expected(updates, 
currentTimestamp);
+
+               GV valueAfterUpdate = 
verificationContext.getUpdateContext().getValueAfterUpdate();
                ValueWithTs<UV> update = 
verificationContext.getUpdateContext().getUpdateWithTs();
-               GV updatedValue = 
verificationContext.getUpdateContext().getUpdatedValue();
                updates.add(update);
-               GV expectedValue = expected(updates, currentTimestamp);
-               return Objects.equals(valueBeforeUpdate, prevValue) && 
Objects.equals(updatedValue, expectedValue);
+               GV expectedValueAfterUpdate = expected(updates, 
currentTimestamp);
+
+               return Objects.equals(valueBeforeUpdate, 
expectedValueBeforeUpdate) && Objects.equals(valueAfterUpdate, 
expectedValueAfterUpdate);
        }
 
        abstract GV expected(@Nonnull List<ValueWithTs<UV>> updates, long 
currentTimestamp);
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java
index 960bbe7..8a62957 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java
@@ -71,13 +71,13 @@ class TtlAggregatingStateVerifier extends 
AbstractTtlStateVerifier<
                        return null;
                }
                long acc = AGG_FUNC.createAccumulator();
-               long lastTs = updates.get(0).getTimestampAfterUpdate();
+               long lastTs = updates.get(0).getTimestamp();
                for (ValueWithTs<Integer> update : updates) {
-                       if (expired(lastTs, update.getTimestampAfterUpdate())) {
+                       if (expired(lastTs, update.getTimestamp())) {
                                acc = AGG_FUNC.createAccumulator();
                        }
                        acc = AGG_FUNC.add(update.getValue(), acc);
-                       lastTs = update.getTimestampAfterUpdate();
+                       lastTs = update.getTimestamp();
                }
                return expired(lastTs, currentTimestamp) ? null : 
AGG_FUNC.getResult(acc);
        }
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java
index c1cc761..bcc8590 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java
@@ -74,13 +74,13 @@ class TtlFoldingStateVerifier extends 
AbstractTtlStateVerifier<
                        return null;
                }
                long acc = INIT_VAL;
-               long lastTs = updates.get(0).getTimestampAfterUpdate();
+               long lastTs = updates.get(0).getTimestamp();
                for (ValueWithTs<Integer> update : updates) {
-                       if (expired(lastTs, update.getTimestampAfterUpdate())) {
+                       if (expired(lastTs, update.getTimestamp())) {
                                acc = INIT_VAL;
                        }
                        acc += update.getValue();
-                       lastTs = update.getTimestampAfterUpdate();
+                       lastTs = update.getTimestamp();
                }
                return expired(lastTs, currentTimestamp) ? null : acc;
        }
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java
index b355aa9..4aed98f 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java
@@ -71,7 +71,7 @@ class TtlListStateVerifier extends AbstractTtlStateVerifier<
        @Nonnull
        List<String> expected(@Nonnull List<ValueWithTs<String>> updates, long 
currentTimestamp) {
                return updates.stream()
-                       .filter(u -> !expired(u.getTimestampAfterUpdate(), 
currentTimestamp))
+                       .filter(u -> !expired(u.getTimestamp(), 
currentTimestamp))
                        .map(ValueWithTs::getValue)
                        .collect(Collectors.toList());
        }
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java
index a9d6b36..eeda78d 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java
@@ -87,7 +87,7 @@ class TtlMapStateVerifier extends AbstractTtlStateVerifier<
                        .collect(Collectors.groupingBy(u -> u.getValue().f0))
                        .entrySet().stream()
                        .map(e -> e.getValue().get(e.getValue().size() - 1))
-                       .filter(u -> !expired(u.getTimestampAfterUpdate(), 
currentTimestamp))
+                       .filter(u -> !expired(u.getTimestamp(), 
currentTimestamp))
                        .map(ValueWithTs::getValue)
                        .collect(Collectors.toMap(u -> u.f0, u -> u.f1));
        }
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java
index 773be05..cd33ed0 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java
@@ -73,13 +73,13 @@ class TtlReducingStateVerifier extends 
AbstractTtlStateVerifier<
                        return null;
                }
                int acc = 0;
-               long lastTs = updates.get(0).getTimestampAfterUpdate();
+               long lastTs = updates.get(0).getTimestamp();
                for (ValueWithTs<Integer> update : updates) {
-                       if (expired(lastTs, update.getTimestampAfterUpdate())) {
+                       if (expired(lastTs, update.getTimestamp())) {
                                acc = 0;
                        }
                        acc += update.getValue();
-                       lastTs = update.getTimestampAfterUpdate();
+                       lastTs = update.getTimestamp();
                }
                return expired(lastTs, currentTimestamp) ? null : acc;
        }
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java
index 959340b..61bf9e2 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java
@@ -24,25 +24,25 @@ import java.io.Serializable;
 
 /** Contains context relevant for state update with TTL. */
 public class TtlUpdateContext<UV, GV> implements Serializable {
-       private final long timestampBeforeUpdate;
+
        private final GV valueBeforeUpdate;
        private final UV update;
-       private final GV updatedValue;
-       private final long timestampAfterUpdate;
+       private final GV valueAfterUpdate;
+       private final long timestamp;
 
        public TtlUpdateContext(
-               long timestampBeforeUpdate,
-               GV valueBeforeUpdate, UV update, GV updatedValue,
-               long timestampAfterUpdate) {
+                       GV valueBeforeUpdate,
+                       UV update,
+                       GV updatedValue,
+                       long timestamp) {
                this.valueBeforeUpdate = valueBeforeUpdate;
                this.update = update;
-               this.updatedValue = updatedValue;
-               this.timestampBeforeUpdate = timestampBeforeUpdate;
-               this.timestampAfterUpdate = timestampAfterUpdate;
+               this.valueAfterUpdate = updatedValue;
+               this.timestamp = timestamp;
        }
 
-       long getTimestampBeforeUpdate() {
-               return timestampBeforeUpdate;
+       long getTimestamp() {
+               return timestamp;
        }
 
        GV getValueBeforeUpdate() {
@@ -51,21 +51,20 @@ public class TtlUpdateContext<UV, GV> implements 
Serializable {
 
        @Nonnull
        public ValueWithTs<UV> getUpdateWithTs() {
-               return new ValueWithTs<>(update, timestampBeforeUpdate, 
timestampAfterUpdate);
+               return new ValueWithTs<>(update, timestamp);
        }
 
-       GV getUpdatedValue() {
-               return updatedValue;
+       GV getValueAfterUpdate() {
+               return valueAfterUpdate;
        }
 
        @Override
        public String toString() {
                return "TtlUpdateContext{" +
-                       "timestampBeforeUpdate=" + timestampBeforeUpdate +
-                       ", valueBeforeUpdate=" + valueBeforeUpdate +
+                       "valueBeforeUpdate=" + valueBeforeUpdate +
                        ", update=" + update +
-                       ", updatedValue=" + updatedValue +
-                       ", timestampAfterUpdate=" + timestampAfterUpdate +
+                       ", valueAfterUpdate=" + valueAfterUpdate +
+                       ", timestamp=" + timestamp +
                        '}';
        }
 }
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java
index fa4929b..d8bdfd4 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java
@@ -61,6 +61,6 @@ class TtlValueStateVerifier
                        return null;
                }
                ValueWithTs<String> lastUpdate = updates.get(updates.size() - 
1);
-               return expired(lastUpdate.getTimestampAfterUpdate(), 
currentTimestamp) ? null : lastUpdate.getValue();
+               return expired(lastUpdate.getTimestamp(), currentTimestamp) ? 
null : lastUpdate.getValue();
        }
 }
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java
index 4c985cd..6d04c4c 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java
@@ -36,10 +36,10 @@ public class TtlVerificationContext<UV, GV> implements 
Serializable {
 
        @SuppressWarnings("unchecked")
        public TtlVerificationContext(
-               int key,
-               @Nonnull String verifierId,
-               @Nonnull List<ValueWithTs<?>> prevUpdates,
-               @Nonnull TtlUpdateContext<?, ?> updateContext) {
+                       int key,
+                       @Nonnull String verifierId,
+                       @Nonnull List<ValueWithTs<?>> prevUpdates,
+                       @Nonnull TtlUpdateContext<?, ?> updateContext) {
                this.key = key;
                this.verifierId = verifierId;
                this.prevUpdates = new ArrayList<>();
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
index 9302377..a4f3080 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
@@ -30,33 +30,26 @@ import java.io.Serializable;
 /** User state value with timestamps before and after update. */
 public class ValueWithTs<V> implements Serializable {
        private final V value;
-       private final long timestampBeforeUpdate;
-       private final long timestampAfterUpdate;
+       private final long timestamp;
 
-       public ValueWithTs(V value, long timestampBeforeUpdate, long 
timestampAfterUpdate) {
+       public ValueWithTs(V value, long timestamp) {
                this.value = value;
-               this.timestampBeforeUpdate = timestampBeforeUpdate;
-               this.timestampAfterUpdate = timestampAfterUpdate;
+               this.timestamp = timestamp;
        }
 
        V getValue() {
                return value;
        }
 
-       public long getTimestampBeforeUpdate() {
-               return timestampBeforeUpdate;
-       }
-
-       public long getTimestampAfterUpdate() {
-               return timestampAfterUpdate;
+       long getTimestamp() {
+               return timestamp;
        }
 
        @Override
        public String toString() {
                return "ValueWithTs{" +
                        "value=" + value +
-                       ", timestampBeforeUpdate=" + timestampBeforeUpdate +
-                       ", timestampAfterUpdate=" + timestampAfterUpdate +
+                       ", timestamp=" + timestamp +
                        '}';
        }
 
@@ -64,7 +57,7 @@ public class ValueWithTs<V> implements Serializable {
        public static class Serializer extends 
CompositeSerializer<ValueWithTs<?>> {
 
                public Serializer(TypeSerializer<?> userValueSerializer) {
-                       super(true, userValueSerializer, 
LongSerializer.INSTANCE, LongSerializer.INSTANCE);
+                       super(true, userValueSerializer, 
LongSerializer.INSTANCE);
                }
 
                @SuppressWarnings("unchecked")
@@ -74,7 +67,7 @@ public class ValueWithTs<V> implements Serializable {
 
                @Override
                public ValueWithTs<?> createInstance(@Nonnull Object ... 
values) {
-                       return new ValueWithTs<>(values[0], (Long) values[1], 
(Long) values[2]);
+                       return new ValueWithTs<>(values[0], (Long) values[1]);
                }
 
                @Override
@@ -88,9 +81,7 @@ public class ValueWithTs<V> implements Serializable {
                                case 0:
                                        return value.getValue();
                                case 1:
-                                       return value.getTimestampBeforeUpdate();
-                               case 2:
-                                       return value.getTimestampAfterUpdate();
+                                       return value.getTimestamp();
                                default:
                                        throw new 
FlinkRuntimeException("Unexpected field index for ValueWithTs");
                        }
@@ -99,8 +90,8 @@ public class ValueWithTs<V> implements Serializable {
                @SuppressWarnings("unchecked")
                @Override
                protected CompositeSerializer<ValueWithTs<?>> 
createSerializerInstance(
-                       PrecomputedParameters precomputed,
-                       TypeSerializer<?>... originalSerializers) {
+                               PrecomputedParameters precomputed,
+                               TypeSerializer<?>... originalSerializers) {
                        return new Serializer(precomputed, 
(TypeSerializer<Object>) originalSerializers[0]);
                }
        }

Reply via email to