This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 52c2ad0 [FLINK-10531][e2e] Fix unstable TTL end-to-end test. 52c2ad0 is described below commit 52c2ad04c25f277ed65b164f53ce278a7380f058 Author: kkloudas <kklou...@gmail.com> AuthorDate: Tue Nov 6 18:02:10 2018 +0100 [FLINK-10531][e2e] Fix unstable TTL end-to-end test. This closes #7036. --- .../tests/DataStreamStateTTLTestProgram.java | 21 ++++- .../streaming/tests/MonotonicTTLTimeProvider.java | 73 +++++++++++++++++ .../flink/streaming/tests/StubStateBackend.java | 94 ++++++++++++++++++++++ .../flink/streaming/tests/TtlStateUpdate.java | 3 + .../streaming/tests/TtlStateUpdateSource.java | 3 + .../streaming/tests/TtlVerifyUpdateFunction.java | 74 +++++++---------- .../tests/verify/AbstractTtlStateVerifier.java | 15 ++-- .../tests/verify/TtlAggregatingStateVerifier.java | 6 +- .../tests/verify/TtlFoldingStateVerifier.java | 6 +- .../tests/verify/TtlListStateVerifier.java | 2 +- .../tests/verify/TtlMapStateVerifier.java | 2 +- .../tests/verify/TtlReducingStateVerifier.java | 6 +- .../streaming/tests/verify/TtlUpdateContext.java | 35 ++++---- .../tests/verify/TtlValueStateVerifier.java | 2 +- .../tests/verify/TtlVerificationContext.java | 8 +- .../flink/streaming/tests/verify/ValueWithTs.java | 31 +++---- 16 files changed, 276 insertions(+), 105 deletions(-) diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java index 3b2e474..1a572f3 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; @@ -74,6 +75,8 @@ public class DataStreamStateTTLTestProgram { setupEnvironment(env, pt); + final MonotonicTTLTimeProvider ttlTimeProvider = setBackendWithCustomTTLTimeProvider(env); + int keySpace = pt.getInt(UPDATE_GENERATOR_SRC_KEYSPACE.key(), UPDATE_GENERATOR_SRC_KEYSPACE.defaultValue()); long sleepAfterElements = pt.getLong(UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.key(), UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.defaultValue()); @@ -90,11 +93,27 @@ public class DataStreamStateTTLTestProgram { .addSource(new TtlStateUpdateSource(keySpace, sleepAfterElements, sleepTime)) .name("TtlStateUpdateSource") .keyBy(TtlStateUpdate::getKey) - .flatMap(new TtlVerifyUpdateFunction(ttlConfig, reportStatAfterUpdatesNum)) + .flatMap(new TtlVerifyUpdateFunction(ttlConfig, ttlTimeProvider, reportStatAfterUpdatesNum)) .name("TtlVerifyUpdateFunction") .addSink(new PrintSinkFunction<>()) .name("PrintFailedVerifications"); env.execute("State TTL test job"); } + + /** + * Sets the state backend to a new {@link StubStateBackend} which has a {@link MonotonicTTLTimeProvider}. + * + * @param env The {@link StreamExecutionEnvironment} of the job. + * @return The {@link MonotonicTTLTimeProvider}. + */ + private static MonotonicTTLTimeProvider setBackendWithCustomTTLTimeProvider(StreamExecutionEnvironment env) { + final MonotonicTTLTimeProvider ttlTimeProvider = new MonotonicTTLTimeProvider(); + + final StateBackend configuredBackend = env.getStateBackend(); + final StateBackend stubBackend = new StubStateBackend(configuredBackend, ttlTimeProvider); + env.setStateBackend(stubBackend); + + return ttlTimeProvider; + } } diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java new file mode 100644 index 0000000..0b5637d --- /dev/null +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/MonotonicTTLTimeProvider.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.Serializable; + +/** + * A stub implementation of a {@link TtlTimeProvider} which guarantees that + * processing time increases monotonically. + */ +@NotThreadSafe +final class MonotonicTTLTimeProvider implements TtlTimeProvider, Serializable { + + private static final long serialVersionUID = 1L; + + /* + * The following variables are static because the whole TTLTimeProvider will go + * through serialization and, eventually, the state backend and the task executing + * the TtlVerifyUpdateFunction will have different instances of it. + * + * If these were not static, then the TtlVerifyUpdateFunction would e.g. freeze + * the time, but the backend would not be notified about it, resulting in inconsistent + * state. + * + * If the number of task slots per TM changes, then we may need to add also synchronization. + */ + + private static boolean timeIsFrozen = false; + + private static long lastReturnedProcessingTime = Long.MIN_VALUE; + + @Override + public long currentTimestamp() { + if (timeIsFrozen && lastReturnedProcessingTime != Long.MIN_VALUE) { + return lastReturnedProcessingTime; + } + + timeIsFrozen = true; + + final long currentProcessingTime = System.currentTimeMillis(); + if (currentProcessingTime < lastReturnedProcessingTime) { + return lastReturnedProcessingTime; + } + + lastReturnedProcessingTime = currentProcessingTime; + return lastReturnedProcessingTime; + } + + long unfreezeTime() { + timeIsFrozen = false; + return lastReturnedProcessingTime; + } +} diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java new file mode 100644 index 0000000..b93fa36 --- /dev/null +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/StubStateBackend.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStorage; +import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A stub implementation of the {@link StateBackend} that allows the use of + * a custom {@link TtlTimeProvider}. + */ +final class StubStateBackend implements StateBackend { + + private static final long serialVersionUID = 1L; + + private final TtlTimeProvider ttlTimeProvider; + + private final StateBackend backend; + + StubStateBackend(final StateBackend wrappedBackend, final TtlTimeProvider ttlTimeProvider) { + this.backend = checkNotNull(wrappedBackend); + this.ttlTimeProvider = checkNotNull(ttlTimeProvider); + } + + @Override + public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException { + return backend.resolveCheckpoint(externalPointer); + } + + @Override + public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException { + return backend.createCheckpointStorage(jobId); + } + + @Override + public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( + Environment env, + JobID jobID, + String operatorIdentifier, + TypeSerializer<K> keySerializer, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + TaskKvStateRegistry kvStateRegistry, + TtlTimeProvider ttlTimeProvider, + MetricGroup metricGroup) throws Exception { + + return backend.createKeyedStateBackend( + env, + jobID, + operatorIdentifier, + keySerializer, + numberOfKeyGroups, + keyGroupRange, + kvStateRegistry, + this.ttlTimeProvider, + metricGroup + ); + } + + @Override + public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception { + return backend.createOperatorStateBackend(env, operatorIdentifier); + } +} diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java index e89b544..20e21ea 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java @@ -25,6 +25,9 @@ import java.util.Map; /** Randomly generated keyed state updates per state type. */ class TtlStateUpdate implements Serializable { + + private static final long serialVersionUID = 1L; + private final int key; @Nonnull diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java index 6aff14e..7404adf 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java @@ -33,6 +33,9 @@ import java.util.stream.Collectors; * and waits for {@code sleepTime} to continue generation. */ class TtlStateUpdateSource extends RichParallelSourceFunction<TtlStateUpdate> { + + private static final long serialVersionUID = 1L; + private final int maxKey; private final long sleepAfterElements; private final long sleepTime; diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java index 3cfb0e2..250041d 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java @@ -33,7 +33,6 @@ import org.apache.flink.streaming.tests.verify.TtlUpdateContext; import org.apache.flink.streaming.tests.verify.TtlVerificationContext; import org.apache.flink.streaming.tests.verify.ValueWithTs; import org.apache.flink.util.Collector; -import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,12 +40,14 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import java.io.Serializable; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + /** * Update state with TTL for each verifier. * @@ -62,21 +63,26 @@ import java.util.stream.StreamSupport; * - verifies last update against previous updates * - emits verification context in case of failure */ -class TtlVerifyUpdateFunction - extends RichFlatMapFunction<TtlStateUpdate, String> implements CheckpointedFunction { +class TtlVerifyUpdateFunction extends RichFlatMapFunction<TtlStateUpdate, String> implements CheckpointedFunction { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(TtlVerifyUpdateFunction.class); @Nonnull private final StateTtlConfig ttlConfig; - private final long ttl; + private final MonotonicTTLTimeProvider ttlTimeProvider; private final UpdateStat stat; private transient Map<String, State> states; private transient Map<String, ListState<ValueWithTs<?>>> prevUpdatesByVerifierId; - TtlVerifyUpdateFunction(@Nonnull StateTtlConfig ttlConfig, long reportStatAfterUpdatesNum) { + TtlVerifyUpdateFunction( + @Nonnull StateTtlConfig ttlConfig, + MonotonicTTLTimeProvider ttlTimeProvider, + long reportStatAfterUpdatesNum) { this.ttlConfig = ttlConfig; - this.ttl = ttlConfig.getTtl().toMilliseconds(); + this.ttlTimeProvider = checkNotNull(ttlTimeProvider); this.stat = new UpdateStat(reportStatAfterUpdatesNum); } @@ -91,17 +97,13 @@ class TtlVerifyUpdateFunction } private TtlVerificationContext<?, ?> generateUpdateAndVerificationContext( - TtlStateUpdate updates, TtlStateVerifier<?, ?> verifier) throws Exception { + TtlStateUpdate updates, + TtlStateVerifier<?, ?> verifier) throws Exception { + List<ValueWithTs<?>> prevUpdates = getPrevUpdates(verifier.getId()); Object update = updates.getUpdate(verifier.getId()); TtlUpdateContext<?, ?> updateContext = performUpdate(verifier, update); - boolean clashes = updateClashesWithPrevUpdates(updateContext.getUpdateWithTs(), prevUpdates); - if (clashes) { - resetState(verifier.getId()); - prevUpdates = Collections.emptyList(); - updateContext = performUpdate(verifier, update); - } - stat.update(clashes, prevUpdates.size()); + stat.update(prevUpdates.size()); prevUpdatesByVerifierId.get(verifier.getId()).add(updateContext.getUpdateWithTs()); return new TtlVerificationContext<>(updates.getKey(), verifier.getId(), prevUpdates, updateContext); } @@ -113,33 +115,22 @@ class TtlVerifyUpdateFunction } private TtlUpdateContext<?, ?> performUpdate( - TtlStateVerifier<?, ?> verifier, Object update) throws Exception { + TtlStateVerifier<?, ?> verifier, + Object update) throws Exception { + + final long timestampBeforeUpdate = ttlTimeProvider.currentTimestamp(); State state = states.get(verifier.getId()); - long timestampBeforeUpdate = System.currentTimeMillis(); Object valueBeforeUpdate = verifier.get(state); verifier.update(state, update); Object updatedValue = verifier.get(state); - return new TtlUpdateContext<>(timestampBeforeUpdate, - valueBeforeUpdate, update, updatedValue, System.currentTimeMillis()); - } + final long timestampAfterUpdate = ttlTimeProvider.unfreezeTime(); - private boolean updateClashesWithPrevUpdates(ValueWithTs<?> update, List<ValueWithTs<?>> prevUpdates) { - return tooSlow(update) || - (!prevUpdates.isEmpty() && prevUpdates.stream().anyMatch(pu -> updatesClash(pu, update))); - } - - private boolean tooSlow(ValueWithTs<?> update) { - return update.getTimestampAfterUpdate() - update.getTimestampBeforeUpdate() >= ttl; - } + checkState( + timestampAfterUpdate == timestampBeforeUpdate, + "Timestamps before and after the update do not match." + ); - private boolean updatesClash(ValueWithTs<?> prevUpdate, ValueWithTs<?> nextUpdate) { - return prevUpdate.getTimestampAfterUpdate() + ttl >= nextUpdate.getTimestampBeforeUpdate() && - prevUpdate.getTimestampBeforeUpdate() + ttl <= nextUpdate.getTimestampAfterUpdate(); - } - - private void resetState(String verifierId) { - states.get(verifierId).clear(); - prevUpdatesByVerifierId.get(verifierId).clear(); + return new TtlUpdateContext<>(valueBeforeUpdate, update, updatedValue, timestampAfterUpdate); } @Override @@ -153,7 +144,7 @@ class TtlVerifyUpdateFunction .collect(Collectors.toMap(TtlStateVerifier::getId, v -> v.createState(context, ttlConfig))); prevUpdatesByVerifierId = TtlStateVerifier.VERIFIERS.stream() .collect(Collectors.toMap(TtlStateVerifier::getId, v -> { - Preconditions.checkNotNull(v); + checkNotNull(v); TypeSerializer<ValueWithTs<?>> typeSerializer = new ValueWithTs.Serializer(v.getUpdateSerializer()); ListStateDescriptor<ValueWithTs<?>> stateDesc = new ListStateDescriptor<>( "TtlPrevValueState_" + v.getId(), typeSerializer); @@ -165,22 +156,17 @@ class TtlVerifyUpdateFunction private static class UpdateStat implements Serializable { final long reportStatAfterUpdatesNum; long updates = 0; - long clashes = 0; long prevUpdatesNum = 0; UpdateStat(long reportStatAfterUpdatesNum) { this.reportStatAfterUpdatesNum = reportStatAfterUpdatesNum; } - void update(boolean clash, long prevUpdatesSize) { + void update(long prevUpdatesSize) { updates++; - if (clash) { - clashes++; - } prevUpdatesNum += prevUpdatesSize; if (updates % reportStatAfterUpdatesNum == 0) { - LOG.info(String.format("Avg update chain length: %d, clash stat: %d/%d", - prevUpdatesNum / updates, clashes, updates)); + LOG.info(String.format("Avg update chain length: %d", prevUpdatesNum / updates)); } } } diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java index 7b6def2..46becbb 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java @@ -86,15 +86,18 @@ abstract class AbstractTtlStateVerifier<D extends StateDescriptor<S, SV>, S exte @Override public boolean verify(@Nonnull TtlVerificationContext<?, ?> verificationContextRaw) { TtlVerificationContext<UV, GV> verificationContext = (TtlVerificationContext<UV, GV>) verificationContextRaw; - List<ValueWithTs<UV>> updates = new ArrayList<>(verificationContext.getPrevUpdates()); - long currentTimestamp = verificationContext.getUpdateContext().getTimestampBeforeUpdate(); - GV prevValue = expected(updates, currentTimestamp); + long currentTimestamp = verificationContext.getUpdateContext().getTimestamp(); + GV valueBeforeUpdate = verificationContext.getUpdateContext().getValueBeforeUpdate(); + List<ValueWithTs<UV>> updates = new ArrayList<>(verificationContext.getPrevUpdates()); + GV expectedValueBeforeUpdate = expected(updates, currentTimestamp); + + GV valueAfterUpdate = verificationContext.getUpdateContext().getValueAfterUpdate(); ValueWithTs<UV> update = verificationContext.getUpdateContext().getUpdateWithTs(); - GV updatedValue = verificationContext.getUpdateContext().getUpdatedValue(); updates.add(update); - GV expectedValue = expected(updates, currentTimestamp); - return Objects.equals(valueBeforeUpdate, prevValue) && Objects.equals(updatedValue, expectedValue); + GV expectedValueAfterUpdate = expected(updates, currentTimestamp); + + return Objects.equals(valueBeforeUpdate, expectedValueBeforeUpdate) && Objects.equals(valueAfterUpdate, expectedValueAfterUpdate); } abstract GV expected(@Nonnull List<ValueWithTs<UV>> updates, long currentTimestamp); diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java index 960bbe7..8a62957 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java @@ -71,13 +71,13 @@ class TtlAggregatingStateVerifier extends AbstractTtlStateVerifier< return null; } long acc = AGG_FUNC.createAccumulator(); - long lastTs = updates.get(0).getTimestampAfterUpdate(); + long lastTs = updates.get(0).getTimestamp(); for (ValueWithTs<Integer> update : updates) { - if (expired(lastTs, update.getTimestampAfterUpdate())) { + if (expired(lastTs, update.getTimestamp())) { acc = AGG_FUNC.createAccumulator(); } acc = AGG_FUNC.add(update.getValue(), acc); - lastTs = update.getTimestampAfterUpdate(); + lastTs = update.getTimestamp(); } return expired(lastTs, currentTimestamp) ? null : AGG_FUNC.getResult(acc); } diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java index c1cc761..bcc8590 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java @@ -74,13 +74,13 @@ class TtlFoldingStateVerifier extends AbstractTtlStateVerifier< return null; } long acc = INIT_VAL; - long lastTs = updates.get(0).getTimestampAfterUpdate(); + long lastTs = updates.get(0).getTimestamp(); for (ValueWithTs<Integer> update : updates) { - if (expired(lastTs, update.getTimestampAfterUpdate())) { + if (expired(lastTs, update.getTimestamp())) { acc = INIT_VAL; } acc += update.getValue(); - lastTs = update.getTimestampAfterUpdate(); + lastTs = update.getTimestamp(); } return expired(lastTs, currentTimestamp) ? null : acc; } diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java index b355aa9..4aed98f 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java @@ -71,7 +71,7 @@ class TtlListStateVerifier extends AbstractTtlStateVerifier< @Nonnull List<String> expected(@Nonnull List<ValueWithTs<String>> updates, long currentTimestamp) { return updates.stream() - .filter(u -> !expired(u.getTimestampAfterUpdate(), currentTimestamp)) + .filter(u -> !expired(u.getTimestamp(), currentTimestamp)) .map(ValueWithTs::getValue) .collect(Collectors.toList()); } diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java index a9d6b36..eeda78d 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java @@ -87,7 +87,7 @@ class TtlMapStateVerifier extends AbstractTtlStateVerifier< .collect(Collectors.groupingBy(u -> u.getValue().f0)) .entrySet().stream() .map(e -> e.getValue().get(e.getValue().size() - 1)) - .filter(u -> !expired(u.getTimestampAfterUpdate(), currentTimestamp)) + .filter(u -> !expired(u.getTimestamp(), currentTimestamp)) .map(ValueWithTs::getValue) .collect(Collectors.toMap(u -> u.f0, u -> u.f1)); } diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java index 773be05..cd33ed0 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java @@ -73,13 +73,13 @@ class TtlReducingStateVerifier extends AbstractTtlStateVerifier< return null; } int acc = 0; - long lastTs = updates.get(0).getTimestampAfterUpdate(); + long lastTs = updates.get(0).getTimestamp(); for (ValueWithTs<Integer> update : updates) { - if (expired(lastTs, update.getTimestampAfterUpdate())) { + if (expired(lastTs, update.getTimestamp())) { acc = 0; } acc += update.getValue(); - lastTs = update.getTimestampAfterUpdate(); + lastTs = update.getTimestamp(); } return expired(lastTs, currentTimestamp) ? null : acc; } diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java index 959340b..61bf9e2 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java @@ -24,25 +24,25 @@ import java.io.Serializable; /** Contains context relevant for state update with TTL. */ public class TtlUpdateContext<UV, GV> implements Serializable { - private final long timestampBeforeUpdate; + private final GV valueBeforeUpdate; private final UV update; - private final GV updatedValue; - private final long timestampAfterUpdate; + private final GV valueAfterUpdate; + private final long timestamp; public TtlUpdateContext( - long timestampBeforeUpdate, - GV valueBeforeUpdate, UV update, GV updatedValue, - long timestampAfterUpdate) { + GV valueBeforeUpdate, + UV update, + GV updatedValue, + long timestamp) { this.valueBeforeUpdate = valueBeforeUpdate; this.update = update; - this.updatedValue = updatedValue; - this.timestampBeforeUpdate = timestampBeforeUpdate; - this.timestampAfterUpdate = timestampAfterUpdate; + this.valueAfterUpdate = updatedValue; + this.timestamp = timestamp; } - long getTimestampBeforeUpdate() { - return timestampBeforeUpdate; + long getTimestamp() { + return timestamp; } GV getValueBeforeUpdate() { @@ -51,21 +51,20 @@ public class TtlUpdateContext<UV, GV> implements Serializable { @Nonnull public ValueWithTs<UV> getUpdateWithTs() { - return new ValueWithTs<>(update, timestampBeforeUpdate, timestampAfterUpdate); + return new ValueWithTs<>(update, timestamp); } - GV getUpdatedValue() { - return updatedValue; + GV getValueAfterUpdate() { + return valueAfterUpdate; } @Override public String toString() { return "TtlUpdateContext{" + - "timestampBeforeUpdate=" + timestampBeforeUpdate + - ", valueBeforeUpdate=" + valueBeforeUpdate + + "valueBeforeUpdate=" + valueBeforeUpdate + ", update=" + update + - ", updatedValue=" + updatedValue + - ", timestampAfterUpdate=" + timestampAfterUpdate + + ", valueAfterUpdate=" + valueAfterUpdate + + ", timestamp=" + timestamp + '}'; } } diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java index fa4929b..d8bdfd4 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java @@ -61,6 +61,6 @@ class TtlValueStateVerifier return null; } ValueWithTs<String> lastUpdate = updates.get(updates.size() - 1); - return expired(lastUpdate.getTimestampAfterUpdate(), currentTimestamp) ? null : lastUpdate.getValue(); + return expired(lastUpdate.getTimestamp(), currentTimestamp) ? null : lastUpdate.getValue(); } } diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java index 4c985cd..6d04c4c 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java @@ -36,10 +36,10 @@ public class TtlVerificationContext<UV, GV> implements Serializable { @SuppressWarnings("unchecked") public TtlVerificationContext( - int key, - @Nonnull String verifierId, - @Nonnull List<ValueWithTs<?>> prevUpdates, - @Nonnull TtlUpdateContext<?, ?> updateContext) { + int key, + @Nonnull String verifierId, + @Nonnull List<ValueWithTs<?>> prevUpdates, + @Nonnull TtlUpdateContext<?, ?> updateContext) { this.key = key; this.verifierId = verifierId; this.prevUpdates = new ArrayList<>(); diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java index 9302377..a4f3080 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java @@ -30,33 +30,26 @@ import java.io.Serializable; /** User state value with timestamps before and after update. */ public class ValueWithTs<V> implements Serializable { private final V value; - private final long timestampBeforeUpdate; - private final long timestampAfterUpdate; + private final long timestamp; - public ValueWithTs(V value, long timestampBeforeUpdate, long timestampAfterUpdate) { + public ValueWithTs(V value, long timestamp) { this.value = value; - this.timestampBeforeUpdate = timestampBeforeUpdate; - this.timestampAfterUpdate = timestampAfterUpdate; + this.timestamp = timestamp; } V getValue() { return value; } - public long getTimestampBeforeUpdate() { - return timestampBeforeUpdate; - } - - public long getTimestampAfterUpdate() { - return timestampAfterUpdate; + long getTimestamp() { + return timestamp; } @Override public String toString() { return "ValueWithTs{" + "value=" + value + - ", timestampBeforeUpdate=" + timestampBeforeUpdate + - ", timestampAfterUpdate=" + timestampAfterUpdate + + ", timestamp=" + timestamp + '}'; } @@ -64,7 +57,7 @@ public class ValueWithTs<V> implements Serializable { public static class Serializer extends CompositeSerializer<ValueWithTs<?>> { public Serializer(TypeSerializer<?> userValueSerializer) { - super(true, userValueSerializer, LongSerializer.INSTANCE, LongSerializer.INSTANCE); + super(true, userValueSerializer, LongSerializer.INSTANCE); } @SuppressWarnings("unchecked") @@ -74,7 +67,7 @@ public class ValueWithTs<V> implements Serializable { @Override public ValueWithTs<?> createInstance(@Nonnull Object ... values) { - return new ValueWithTs<>(values[0], (Long) values[1], (Long) values[2]); + return new ValueWithTs<>(values[0], (Long) values[1]); } @Override @@ -88,9 +81,7 @@ public class ValueWithTs<V> implements Serializable { case 0: return value.getValue(); case 1: - return value.getTimestampBeforeUpdate(); - case 2: - return value.getTimestampAfterUpdate(); + return value.getTimestamp(); default: throw new FlinkRuntimeException("Unexpected field index for ValueWithTs"); } @@ -99,8 +90,8 @@ public class ValueWithTs<V> implements Serializable { @SuppressWarnings("unchecked") @Override protected CompositeSerializer<ValueWithTs<?>> createSerializerInstance( - PrecomputedParameters precomputed, - TypeSerializer<?>... originalSerializers) { + PrecomputedParameters precomputed, + TypeSerializer<?>... originalSerializers) { return new Serializer(precomputed, (TypeSerializer<Object>) originalSerializers[0]); } }