Repository: flink Updated Branches: refs/heads/release-1.6 36ae5cd3c -> 6a36afd3a
[FLINK-9858][tests] State TTL End-to-End Test This closes #6361. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6a36afd3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6a36afd3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6a36afd3 Branch: refs/heads/release-1.6 Commit: 6a36afd3a3db3a8c7b0c27aca525218793882239 Parents: 36ae5cd Author: Andrey Zagrebin <[email protected]> Authored: Fri Jul 13 19:27:35 2018 +0200 Committer: Stefan Richter <[email protected]> Committed: Fri Jul 20 10:28:31 2018 +0200 ---------------------------------------------------------------------- .../flink-stream-state-ttl-test/pom.xml | 104 +++++++++++ .../tests/DataStreamStateTTLTestProgram.java | 100 ++++++++++ .../flink/streaming/tests/TtlStateUpdate.java | 45 +++++ .../streaming/tests/TtlStateUpdateSource.java | 78 ++++++++ .../tests/TtlVerifyUpdateFunction.java | 187 +++++++++++++++++++ .../tests/verify/AbstractTtlStateVerifier.java | 105 +++++++++++ .../verify/TtlAggregatingStateVerifier.java | 107 +++++++++++ .../tests/verify/TtlFoldingStateVerifier.java | 87 +++++++++ .../tests/verify/TtlListStateVerifier.java | 78 ++++++++ .../tests/verify/TtlMapStateVerifier.java | 94 ++++++++++ .../tests/verify/TtlReducingStateVerifier.java | 86 +++++++++ .../tests/verify/TtlStateVerifier.java | 60 ++++++ .../tests/verify/TtlUpdateContext.java | 71 +++++++ .../tests/verify/TtlValueStateVerifier.java | 66 +++++++ .../tests/verify/TtlVerificationContext.java | 69 +++++++ .../streaming/tests/verify/ValueWithTs.java | 107 +++++++++++ flink-end-to-end-tests/pom.xml | 1 + flink-end-to-end-tests/run-nightly-tests.sh | 3 + flink-end-to-end-tests/test-scripts/common.sh | 37 ++-- .../test-scripts/test_stream_state_ttl.sh | 83 ++++++++ 20 files changed, 1555 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6a36afd3/flink-end-to-end-tests/flink-stream-state-ttl-test/pom.xml ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/pom.xml b/flink-end-to-end-tests/flink-stream-state-ttl-test/pom.xml new file mode 100644 index 0000000..38bfcee --- /dev/null +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/pom.xml @@ -0,0 +1,104 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-end-to-end-tests</artifactId> + <version>1.6-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-stream-state-ttl-test</artifactId> + <name>flink-stream-state-ttl-test</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-statebackend-rocksdb_2.11</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-datastream-allround-test</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.0.0</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <finalName>DataStreamStateTTLTestProgram</finalName> + <artifactSet> + <excludes> + <exclude>com.google.code.findbugs:jsr305</exclude> + <exclude>org.slf4j:*</exclude> + <exclude>log4j:*</exclude> + </excludes> + </artifactSet> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.flink.streaming.tests.DataStreamStateTTLTestProgram</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/6a36afd3/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java new file mode 100644 index 0000000..f4c9619 --- /dev/null +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.state.StateTtlConfiguration; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; + +import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment; + +/** + * A test job for State TTL feature. + * + * <p>The test pipeline does the following: + * - generates random keyed state updates for each state TTL verifier (state type) + * - performs update of created state with TTL for each verifier + * - keeps previous updates in other state + * - verifies expected result of last update against preserved history of updates + * + * <p>Program parameters: + * <ul> + * <li>update_generator_source.keyspace (int, default - 100): Number of different keys for updates emitted by the update generator.</li> + * <li>update_generator_source.sleep_time (long, default - 0): Milliseconds to sleep after emitting updates in the update generator. Set to 0 to disable sleeping.</li> + * <li>update_generator_source.sleep_after_elements (long, default - 0): Number of updates to emit before sleeping in the update generator. Set to 0 to disable sleeping.</li> + * <li>state_ttl_verifier.ttl_milli (long, default - 1000): State time-to-live.</li> + * <li>report_stat.after_updates_num (long, default - 200): Report state update statistics after certain number of updates (average update chain length and clashes).</li> + * </ul> + */ +public class DataStreamStateTTLTestProgram { + private static final ConfigOption<Integer> UPDATE_GENERATOR_SRC_KEYSPACE = ConfigOptions + .key("update_generator_source.keyspace") + .defaultValue(100); + + private static final ConfigOption<Long> UPDATE_GENERATOR_SRC_SLEEP_TIME = ConfigOptions + .key("update_generator_source.sleep_time") + .defaultValue(0L); + + private static final ConfigOption<Long> UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS = ConfigOptions + .key("update_generator_source.sleep_after_elements") + .defaultValue(0L); + + private static final ConfigOption<Long> STATE_TTL_VERIFIER_TTL_MILLI = ConfigOptions + .key("state_ttl_verifier.ttl_milli") + .defaultValue(1000L); + + private static final ConfigOption<Long> REPORT_STAT_AFTER_UPDATES_NUM = ConfigOptions + .key("report_stat.after_updates_num") + .defaultValue(200L); + + public static void main(String[] args) throws Exception { + final ParameterTool pt = ParameterTool.fromArgs(args); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + setupEnvironment(env, pt); + + int keySpace = pt.getInt(UPDATE_GENERATOR_SRC_KEYSPACE.key(), UPDATE_GENERATOR_SRC_KEYSPACE.defaultValue()); + long sleepAfterElements = pt.getLong(UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.key(), + UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.defaultValue()); + long sleepTime = pt.getLong(UPDATE_GENERATOR_SRC_SLEEP_TIME.key(), + UPDATE_GENERATOR_SRC_SLEEP_TIME.defaultValue()); + Time ttl = Time.milliseconds(pt.getLong(STATE_TTL_VERIFIER_TTL_MILLI.key(), + STATE_TTL_VERIFIER_TTL_MILLI.defaultValue())); + long reportStatAfterUpdatesNum = pt.getLong(REPORT_STAT_AFTER_UPDATES_NUM.key(), + REPORT_STAT_AFTER_UPDATES_NUM.defaultValue()); + + StateTtlConfiguration ttlConfig = StateTtlConfiguration.newBuilder(ttl).build(); + + env + .addSource(new TtlStateUpdateSource(keySpace, sleepAfterElements, sleepTime)) + .name("TtlStateUpdateSource") + .keyBy(TtlStateUpdate::getKey) + .flatMap(new TtlVerifyUpdateFunction(ttlConfig, reportStatAfterUpdatesNum)) + .name("TtlVerifyUpdateFunction") + .addSink(new PrintSinkFunction<>()) + .name("PrintFailedVerifications"); + + env.execute("State TTL test job"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6a36afd3/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java new file mode 100644 index 0000000..e89b544 --- /dev/null +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import javax.annotation.Nonnull; + +import java.io.Serializable; +import java.util.Map; + +/** Randomly generated keyed state updates per state type. */ +class TtlStateUpdate implements Serializable { + private final int key; + + @Nonnull + private final Map<String, Object> updates; + + TtlStateUpdate(int key, @Nonnull Map<String, Object> updates) { + this.key = key; + this.updates = updates; + } + + int getKey() { + return key; + } + + Object getUpdate(String verifierId) { + return updates.get(verifierId); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6a36afd3/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java new file mode 100644 index 0000000..6aff14e --- /dev/null +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.tests.verify.TtlStateVerifier; + +import java.util.Map; +import java.util.Random; +import java.util.stream.Collectors; + +/** + * Source of randomly generated keyed state updates. + * + * <p>Internal loop generates {@code sleepAfterElements} state updates + * for each verifier from {@link TtlStateVerifier#VERIFIERS} using {@link TtlStateVerifier#generateRandomUpdate} + * and waits for {@code sleepTime} to continue generation. + */ +class TtlStateUpdateSource extends RichParallelSourceFunction<TtlStateUpdate> { + private final int maxKey; + private final long sleepAfterElements; + private final long sleepTime; + + /** Flag that determines if this source is running, i.e. generating events. */ + private volatile boolean running = true; + + TtlStateUpdateSource(int maxKey, long sleepAfterElements, long sleepTime) { + this.maxKey = maxKey; + this.sleepAfterElements = sleepAfterElements; + this.sleepTime = sleepTime; + } + + @Override + public void run(SourceContext<TtlStateUpdate> ctx) throws Exception { + Random random = new Random(); + long elementsBeforeSleep = sleepAfterElements; + while (running) { + for (int i = 0; i < sleepAfterElements; i++) { + synchronized (ctx.getCheckpointLock()) { + Map<String, Object> updates = TtlStateVerifier.VERIFIERS.stream() + .collect(Collectors.toMap(TtlStateVerifier::getId, TtlStateVerifier::generateRandomUpdate)); + ctx.collect(new TtlStateUpdate(random.nextInt(maxKey), updates)); + } + } + + if (sleepTime > 0) { + if (elementsBeforeSleep == 1) { + elementsBeforeSleep = sleepAfterElements; + long rnd = sleepTime < Integer.MAX_VALUE ? random.nextInt((int) sleepTime) : 0L; + Thread.sleep(rnd + sleepTime); + } else if (elementsBeforeSleep > 1) { + --elementsBeforeSleep; + } + } + } + } + + @Override + public void cancel() { + running = false; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6a36afd3/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java new file mode 100644 index 0000000..a99a45f --- /dev/null +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateTtlConfiguration; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.tests.verify.TtlStateVerifier; +import org.apache.flink.streaming.tests.verify.TtlUpdateContext; +import org.apache.flink.streaming.tests.verify.TtlVerificationContext; +import org.apache.flink.streaming.tests.verify.ValueWithTs; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * Update state with TTL for each verifier. + * + * <p>This function for each verifier from {@link TtlStateVerifier#VERIFIERS} + * - creates state with TTL + * - creates state of previous updates for further verification against it + * - receives random state update + * - gets state value before update + * - updates state with random value + * - gets state value after update + * - checks if this update clashes with any previous updates + * - if clashes, clears state and recreate update + * - verifies last update against previous updates + * - emits verification context in case of failure + */ +class TtlVerifyUpdateFunction + extends RichFlatMapFunction<TtlStateUpdate, String> implements CheckpointedFunction { + private static final Logger LOG = LoggerFactory.getLogger(TtlVerifyUpdateFunction.class); + + @Nonnull + private final StateTtlConfiguration ttlConfig; + private final long ttl; + private final UpdateStat stat; + + private transient Map<String, State> states; + private transient Map<String, ListState<ValueWithTs<?>>> prevUpdatesByVerifierId; + + TtlVerifyUpdateFunction(@Nonnull StateTtlConfiguration ttlConfig, long reportStatAfterUpdatesNum) { + this.ttlConfig = ttlConfig; + this.ttl = ttlConfig.getTtl().toMilliseconds(); + this.stat = new UpdateStat(reportStatAfterUpdatesNum); + } + + @Override + public void flatMap(TtlStateUpdate updates, Collector<String> out) throws Exception { + for (TtlStateVerifier<?, ?> verifier : TtlStateVerifier.VERIFIERS) { + TtlVerificationContext<?, ?> verificationContext = generateUpdateAndVerificationContext(updates, verifier); + if (!verifier.verify(verificationContext)) { + out.collect(verificationContext.toString()); + } + } + } + + private TtlVerificationContext<?, ?> generateUpdateAndVerificationContext( + TtlStateUpdate updates, TtlStateVerifier<?, ?> verifier) throws Exception { + List<ValueWithTs<?>> prevUpdates = getPrevUpdates(verifier.getId()); + Object update = updates.getUpdate(verifier.getId()); + TtlUpdateContext<?, ?> updateContext = performUpdate(verifier, update); + boolean clashes = updateClashesWithPrevUpdates(updateContext.getUpdateWithTs(), prevUpdates); + if (clashes) { + resetState(verifier.getId()); + prevUpdates = Collections.emptyList(); + updateContext = performUpdate(verifier, update); + } + stat.update(clashes, prevUpdates.size()); + prevUpdatesByVerifierId.get(verifier.getId()).add(updateContext.getUpdateWithTs()); + return new TtlVerificationContext<>(updates.getKey(), verifier.getId(), prevUpdates, updateContext); + } + + private List<ValueWithTs<?>> getPrevUpdates(String verifierId) throws Exception { + return StreamSupport + .stream(prevUpdatesByVerifierId.get(verifierId).get().spliterator(), false) + .collect(Collectors.toList()); + } + + private TtlUpdateContext<?, ?> performUpdate( + TtlStateVerifier<?, ?> verifier, Object update) throws Exception { + State state = states.get(verifier.getId()); + long timestampBeforeUpdate = System.currentTimeMillis(); + Object valueBeforeUpdate = verifier.get(state); + verifier.update(state, update); + Object updatedValue = verifier.get(state); + return new TtlUpdateContext<>(timestampBeforeUpdate, + valueBeforeUpdate, update, updatedValue, System.currentTimeMillis()); + } + + private boolean updateClashesWithPrevUpdates(ValueWithTs<?> update, List<ValueWithTs<?>> prevUpdates) { + return tooSlow(update) || + (!prevUpdates.isEmpty() && prevUpdates.stream().anyMatch(pu -> updatesClash(pu, update))); + } + + private boolean tooSlow(ValueWithTs<?> update) { + return update.getTimestampAfterUpdate() - update.getTimestampBeforeUpdate() >= ttl; + } + + private boolean updatesClash(ValueWithTs<?> prevUpdate, ValueWithTs<?> nextUpdate) { + return prevUpdate.getTimestampAfterUpdate() + ttl >= nextUpdate.getTimestampBeforeUpdate() && + prevUpdate.getTimestampBeforeUpdate() + ttl <= nextUpdate.getTimestampAfterUpdate(); + } + + private void resetState(String verifierId) { + states.get(verifierId).clear(); + prevUpdatesByVerifierId.get(verifierId).clear(); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) { + + } + + @Override + public void initializeState(FunctionInitializationContext context) { + states = TtlStateVerifier.VERIFIERS.stream() + .collect(Collectors.toMap(TtlStateVerifier::getId, v -> v.createState(context, ttlConfig))); + prevUpdatesByVerifierId = TtlStateVerifier.VERIFIERS.stream() + .collect(Collectors.toMap(TtlStateVerifier::getId, v -> { + Preconditions.checkNotNull(v); + TypeSerializer<ValueWithTs<?>> typeSerializer = new ValueWithTs.Serializer(v.getUpdateSerializer()); + ListStateDescriptor<ValueWithTs<?>> stateDesc = new ListStateDescriptor<>( + "TtlPrevValueState_" + v.getId(), typeSerializer); + KeyedStateStore store = context.getKeyedStateStore(); + return store.getListState(stateDesc); + })); + } + + private static class UpdateStat implements Serializable { + final long reportStatAfterUpdatesNum; + long updates = 0; + long clashes = 0; + long prevUpdatesNum = 0; + + UpdateStat(long reportStatAfterUpdatesNum) { + this.reportStatAfterUpdatesNum = reportStatAfterUpdatesNum; + } + + void update(boolean clash, long prevUpdatesSize) { + updates++; + if (clash) { + clashes++; + } + prevUpdatesNum += prevUpdatesSize; + if (updates % reportStatAfterUpdatesNum == 0) { + LOG.info(String.format("Avg update chain length: %d, clash stat: %d/%d", + prevUpdatesNum / updates, clashes, updates)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6a36afd3/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java new file mode 100644 index 0000000..c56ff19 --- /dev/null +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.verify; + +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.StateTtlConfiguration; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.util.StringUtils; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Random; + +/** Base class for State TTL verifiers. */ +abstract class AbstractTtlStateVerifier<D extends StateDescriptor<S, SV>, S extends State, SV, UV, GV> + implements TtlStateVerifier<UV, GV> { + static final Random RANDOM = new Random(); + + @Nonnull + final D stateDesc; + + AbstractTtlStateVerifier(@Nonnull D stateDesc) { + this.stateDesc = stateDesc; + } + + @Nonnull + static String randomString() { + return StringUtils.getRandomString(RANDOM, 2, 20); + } + + @SuppressWarnings("unchecked") + @Override + @Nonnull + public State createState(@Nonnull FunctionInitializationContext context, @Nonnull StateTtlConfiguration ttlConfig) { + stateDesc.enableTimeToLive(ttlConfig); + return createState(context); + } + + abstract State createState(FunctionInitializationContext context); + + @SuppressWarnings("unchecked") + @Override + @Nonnull + public TypeSerializer<UV> getUpdateSerializer() { + return (TypeSerializer<UV>) stateDesc.getSerializer(); + } + + @SuppressWarnings("unchecked") + @Override + public GV get(@Nonnull State state) throws Exception { + return getInternal((S) state); + } + + abstract GV getInternal(@Nonnull S state) throws Exception; + + @SuppressWarnings("unchecked") + @Override + public void update(@Nonnull State state, Object update) throws Exception { + updateInternal((S) state, (UV) update); + } + + abstract void updateInternal(@Nonnull S state, UV update) throws Exception; + + @SuppressWarnings("unchecked") + @Override + public boolean verify(@Nonnull TtlVerificationContext<?, ?> verificationContextRaw) { + TtlVerificationContext<UV, GV> verificationContext = (TtlVerificationContext<UV, GV>) verificationContextRaw; + List<ValueWithTs<UV>> updates = new ArrayList<>(verificationContext.getPrevUpdates()); + long currentTimestamp = verificationContext.getUpdateContext().getTimestampBeforeUpdate(); + GV prevValue = expected(updates, currentTimestamp); + GV valueBeforeUpdate = verificationContext.getUpdateContext().getValueBeforeUpdate(); + ValueWithTs<UV> update = verificationContext.getUpdateContext().getUpdateWithTs(); + GV updatedValue = verificationContext.getUpdateContext().getUpdatedValue(); + updates.add(update); + GV expectedValue = expected(updates, currentTimestamp); + return Objects.equals(valueBeforeUpdate, prevValue) && Objects.equals(updatedValue, expectedValue); + } + + abstract GV expected(@Nonnull List<ValueWithTs<UV>> updates, long currentTimestamp); + + boolean expired(long lastTimestamp, long currentTimestamp) { + return lastTimestamp + stateDesc.getTtlConfig().getTtl().toMilliseconds() <= currentTimestamp; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6a36afd3/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java new file mode 100644 index 0000000..960bbe7 --- /dev/null +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.verify; + +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.runtime.state.FunctionInitializationContext; + +import javax.annotation.Nonnull; + +import java.util.List; + +class TtlAggregatingStateVerifier extends AbstractTtlStateVerifier< + AggregatingStateDescriptor<Integer, Long, String>, AggregatingState<Integer, String>, Long, Integer, String> { + TtlAggregatingStateVerifier() { + super(new AggregatingStateDescriptor<>("TtlAggregatingStateVerifier", AGG_FUNC, LongSerializer.INSTANCE)); + } + + @Override + @Nonnull + State createState(@Nonnull FunctionInitializationContext context) { + return context.getKeyedStateStore().getAggregatingState(stateDesc); + } + + @Override + @Nonnull + public TypeSerializer<Integer> getUpdateSerializer() { + return IntSerializer.INSTANCE; + } + + @Override + @Nonnull + public Integer generateRandomUpdate() { + return RANDOM.nextInt(100); + } + + @Override + String getInternal(@Nonnull AggregatingState<Integer, String> state) throws Exception { + return state.get(); + } + + @Override + void updateInternal(@Nonnull AggregatingState<Integer, String> state, Integer update) throws Exception { + state.add(update); + } + + @Override + String expected(@Nonnull List<ValueWithTs<Integer>> updates, long currentTimestamp) { + if (updates.isEmpty()) { + return null; + } + long acc = AGG_FUNC.createAccumulator(); + long lastTs = updates.get(0).getTimestampAfterUpdate(); + for (ValueWithTs<Integer> update : updates) { + if (expired(lastTs, update.getTimestampAfterUpdate())) { + acc = AGG_FUNC.createAccumulator(); + } + acc = AGG_FUNC.add(update.getValue(), acc); + lastTs = update.getTimestampAfterUpdate(); + } + return expired(lastTs, currentTimestamp) ? null : AGG_FUNC.getResult(acc); + } + + private static final AggregateFunction<Integer, Long, String> AGG_FUNC = + new AggregateFunction<Integer, Long, String>() { + @Override + public Long createAccumulator() { + return 3L; + } + + @Override + public Long add(Integer value, Long accumulator) { + return accumulator + value; + } + + @Override + public String getResult(Long accumulator) { + return Long.toString(accumulator); + } + + @Override + public Long merge(Long a, Long b) { + return a + b; + } + }; +} http://git-wip-us.apache.org/repos/asf/flink/blob/6a36afd3/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java new file mode 100644 index 0000000..c1cc761 --- /dev/null +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.verify; + +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.runtime.state.FunctionInitializationContext; + +import javax.annotation.Nonnull; + +import java.util.List; + +@SuppressWarnings("deprecation") +class TtlFoldingStateVerifier extends AbstractTtlStateVerifier< + FoldingStateDescriptor<Integer, Long>, FoldingState<Integer, Long>, Long, Integer, Long> { + private static final long INIT_VAL = 5L; + + TtlFoldingStateVerifier() { + super(new FoldingStateDescriptor<>( + "TtlFoldingStateVerifier", INIT_VAL, (v, acc) -> acc + v, LongSerializer.INSTANCE)); + } + + @Override + @Nonnull + State createState(@Nonnull FunctionInitializationContext context) { + return context.getKeyedStateStore().getFoldingState(stateDesc); + } + + @Override + @Nonnull + public TypeSerializer<Integer> getUpdateSerializer() { + return IntSerializer.INSTANCE; + } + + @Override + @Nonnull + public Integer generateRandomUpdate() { + return RANDOM.nextInt(100); + } + + @Override + Long getInternal(@Nonnull FoldingState<Integer, Long> state) throws Exception { + return state.get(); + } + + @Override + void updateInternal(@Nonnull FoldingState<Integer, Long> state, Integer update) throws Exception { + state.add(update); + } + + @Override + Long expected(@Nonnull List<ValueWithTs<Integer>> updates, long currentTimestamp) { + if (updates.isEmpty()) { + return null; + } + long acc = INIT_VAL; + long lastTs = updates.get(0).getTimestampAfterUpdate(); + for (ValueWithTs<Integer> update : updates) { + if (expired(lastTs, update.getTimestampAfterUpdate())) { + acc = INIT_VAL; + } + acc += update.getValue(); + lastTs = update.getTimestampAfterUpdate(); + } + return expired(lastTs, currentTimestamp) ? null : acc; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6a36afd3/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java new file mode 100644 index 0000000..b355aa9 --- /dev/null +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.verify; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.state.FunctionInitializationContext; + +import javax.annotation.Nonnull; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +class TtlListStateVerifier extends AbstractTtlStateVerifier< + ListStateDescriptor<String>, ListState<String>, List<String>, String, List<String>> { + TtlListStateVerifier() { + super(new ListStateDescriptor<>("TtlListStateVerifier", StringSerializer.INSTANCE)); + } + + @Override + @Nonnull + State createState(@Nonnull FunctionInitializationContext context) { + return context.getKeyedStateStore().getListState(stateDesc); + } + + @Override + @Nonnull + public TypeSerializer<String> getUpdateSerializer() { + return StringSerializer.INSTANCE; + } + + @Override + @Nonnull + public String generateRandomUpdate() { + return randomString(); + } + + @Override + @Nonnull + List<String> getInternal(@Nonnull ListState<String> state) throws Exception { + return StreamSupport.stream(state.get().spliterator(), false) + .collect(Collectors.toList()); + } + + @Override + void updateInternal(@Nonnull ListState<String> state, String update) throws Exception { + state.add(update); + } + + @Override + @Nonnull + List<String> expected(@Nonnull List<ValueWithTs<String>> updates, long currentTimestamp) { + return updates.stream() + .filter(u -> !expired(u.getTimestampAfterUpdate(), currentTimestamp)) + .map(ValueWithTs::getValue) + .collect(Collectors.toList()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6a36afd3/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java new file mode 100644 index 0000000..a9d6b36 --- /dev/null +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.verify; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.runtime.state.FunctionInitializationContext; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.StreamSupport; + +class TtlMapStateVerifier extends AbstractTtlStateVerifier< + MapStateDescriptor<String, String>, MapState<String, String>, + Map<String, String>, Tuple2<String, String>, Map<String, String>> { + private static final List<String> KEYS = new ArrayList<>(); + static { + IntStream.range(0, RANDOM.nextInt(5) + 5).forEach(i -> KEYS.add(randomString())); + } + + TtlMapStateVerifier() { + super(new MapStateDescriptor<>("TtlMapStateVerifier", StringSerializer.INSTANCE, StringSerializer.INSTANCE)); + } + + @Override + @Nonnull + State createState(@Nonnull FunctionInitializationContext context) { + return context.getKeyedStateStore().getMapState(stateDesc); + } + + @SuppressWarnings("unchecked") + @Override + @Nonnull + public TypeSerializer<Tuple2<String, String>> getUpdateSerializer() { + return new TupleSerializer( + Tuple2.class, new TypeSerializer[] {StringSerializer.INSTANCE, StringSerializer.INSTANCE}); + } + + @Override + @Nonnull + public Tuple2<String, String> generateRandomUpdate() { + return Tuple2.of(KEYS.get(RANDOM.nextInt(KEYS.size())), randomString()); + } + + @Override + @Nonnull + Map<String, String> getInternal(@Nonnull MapState<String, String> state) throws Exception { + return StreamSupport.stream(state.entries().spliterator(), false) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @Override + void updateInternal(@Nonnull MapState<String, String> state, Tuple2<String, String> update) throws Exception { + state.put(update.f0, update.f1); + } + + @Override + @Nonnull + Map<String, String> expected(@Nonnull List<ValueWithTs<Tuple2<String, String>>> updates, long currentTimestamp) { + return updates.stream() + .collect(Collectors.groupingBy(u -> u.getValue().f0)) + .entrySet().stream() + .map(e -> e.getValue().get(e.getValue().size() - 1)) + .filter(u -> !expired(u.getTimestampAfterUpdate(), currentTimestamp)) + .map(ValueWithTs::getValue) + .collect(Collectors.toMap(u -> u.f0, u -> u.f1)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6a36afd3/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java new file mode 100644 index 0000000..773be05 --- /dev/null +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.verify; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.state.FunctionInitializationContext; + +import javax.annotation.Nonnull; + +import java.util.List; + +class TtlReducingStateVerifier extends AbstractTtlStateVerifier< + ReducingStateDescriptor<Integer>, ReducingState<Integer>, Integer, Integer, Integer> { + TtlReducingStateVerifier() { + super(new ReducingStateDescriptor<>( + "TtlReducingStateVerifier", + (ReduceFunction<Integer>) (value1, value2) -> value1 + value2, + IntSerializer.INSTANCE)); + } + + @Override + @Nonnull + State createState(@Nonnull FunctionInitializationContext context) { + return context.getKeyedStateStore().getReducingState(stateDesc); + } + + @Override + @Nonnull + public TypeSerializer<Integer> getUpdateSerializer() { + return IntSerializer.INSTANCE; + } + + @Override + @Nonnull + public Integer generateRandomUpdate() { + return RANDOM.nextInt(100); + } + + @Override + Integer getInternal(@Nonnull ReducingState<Integer> state) throws Exception { + return state.get(); + } + + @Override + void updateInternal(@Nonnull ReducingState<Integer> state, Integer update) throws Exception { + state.add(update); + } + + @Override + Integer expected(@Nonnull List<ValueWithTs<Integer>> updates, long currentTimestamp) { + if (updates.isEmpty()) { + return null; + } + int acc = 0; + long lastTs = updates.get(0).getTimestampAfterUpdate(); + for (ValueWithTs<Integer> update : updates) { + if (expired(lastTs, update.getTimestampAfterUpdate())) { + acc = 0; + } + acc += update.getValue(); + lastTs = update.getTimestampAfterUpdate(); + } + return expired(lastTs, currentTimestamp) ? null : acc; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6a36afd3/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlStateVerifier.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlStateVerifier.java new file mode 100644 index 0000000..e1c2e07 --- /dev/null +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlStateVerifier.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.verify; + +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateTtlConfiguration; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.FunctionInitializationContext; + +import javax.annotation.Nonnull; + +import java.util.Arrays; +import java.util.List; + +/** TTL state verifier interface. */ +public interface TtlStateVerifier<UV, GV> { + List<TtlStateVerifier<?, ?>> VERIFIERS = Arrays.asList( + new TtlValueStateVerifier(), + new TtlListStateVerifier(), + new TtlMapStateVerifier(), + new TtlAggregatingStateVerifier(), + new TtlReducingStateVerifier(), + new TtlFoldingStateVerifier() + ); + + @Nonnull + default String getId() { + return this.getClass().getSimpleName(); + } + + @Nonnull + State createState(@Nonnull FunctionInitializationContext context, @Nonnull StateTtlConfiguration ttlConfig); + + @Nonnull + TypeSerializer<UV> getUpdateSerializer(); + + UV generateRandomUpdate(); + + GV get(@Nonnull State state) throws Exception; + + void update(@Nonnull State state, Object update) throws Exception; + + boolean verify(@Nonnull TtlVerificationContext<?, ?> verificationContext); +} http://git-wip-us.apache.org/repos/asf/flink/blob/6a36afd3/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java new file mode 100644 index 0000000..959340b --- /dev/null +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.verify; + +import javax.annotation.Nonnull; + +import java.io.Serializable; + +/** Contains context relevant for state update with TTL. */ +public class TtlUpdateContext<UV, GV> implements Serializable { + private final long timestampBeforeUpdate; + private final GV valueBeforeUpdate; + private final UV update; + private final GV updatedValue; + private final long timestampAfterUpdate; + + public TtlUpdateContext( + long timestampBeforeUpdate, + GV valueBeforeUpdate, UV update, GV updatedValue, + long timestampAfterUpdate) { + this.valueBeforeUpdate = valueBeforeUpdate; + this.update = update; + this.updatedValue = updatedValue; + this.timestampBeforeUpdate = timestampBeforeUpdate; + this.timestampAfterUpdate = timestampAfterUpdate; + } + + long getTimestampBeforeUpdate() { + return timestampBeforeUpdate; + } + + GV getValueBeforeUpdate() { + return valueBeforeUpdate; + } + + @Nonnull + public ValueWithTs<UV> getUpdateWithTs() { + return new ValueWithTs<>(update, timestampBeforeUpdate, timestampAfterUpdate); + } + + GV getUpdatedValue() { + return updatedValue; + } + + @Override + public String toString() { + return "TtlUpdateContext{" + + "timestampBeforeUpdate=" + timestampBeforeUpdate + + ", valueBeforeUpdate=" + valueBeforeUpdate + + ", update=" + update + + ", updatedValue=" + updatedValue + + ", timestampAfterUpdate=" + timestampAfterUpdate + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6a36afd3/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java new file mode 100644 index 0000000..fa4929b --- /dev/null +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.verify; + +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.state.FunctionInitializationContext; + +import javax.annotation.Nonnull; + +import java.util.List; + +class TtlValueStateVerifier + extends AbstractTtlStateVerifier<ValueStateDescriptor<String>, ValueState<String>, String, String, String> { + TtlValueStateVerifier() { + super(new ValueStateDescriptor<>("TtlValueStateVerifier", StringSerializer.INSTANCE)); + } + + @Override + @Nonnull + State createState(FunctionInitializationContext context) { + return context.getKeyedStateStore().getState(stateDesc); + } + + @Nonnull + public String generateRandomUpdate() { + return randomString(); + } + + @Override + String getInternal(@Nonnull ValueState<String> state) throws Exception { + return state.value(); + } + + @Override + void updateInternal(@Nonnull ValueState<String> state, String update) throws Exception { + state.update(update); + } + + @Override + String expected(@Nonnull List<ValueWithTs<String>> updates, long currentTimestamp) { + if (updates.isEmpty()) { + return null; + } + ValueWithTs<String> lastUpdate = updates.get(updates.size() - 1); + return expired(lastUpdate.getTimestampAfterUpdate(), currentTimestamp) ? null : lastUpdate.getValue(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6a36afd3/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java new file mode 100644 index 0000000..4c985cd --- /dev/null +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.verify; + +import javax.annotation.Nonnull; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** Data to verify state update with TTL. */ +public class TtlVerificationContext<UV, GV> implements Serializable { + private final int key; + @Nonnull + private final String verifierId; + @Nonnull + private final List<ValueWithTs<UV>> prevUpdates; + @Nonnull + private final TtlUpdateContext<UV, GV> updateContext; + + @SuppressWarnings("unchecked") + public TtlVerificationContext( + int key, + @Nonnull String verifierId, + @Nonnull List<ValueWithTs<?>> prevUpdates, + @Nonnull TtlUpdateContext<?, ?> updateContext) { + this.key = key; + this.verifierId = verifierId; + this.prevUpdates = new ArrayList<>(); + prevUpdates.forEach(pu -> this.prevUpdates.add((ValueWithTs<UV>) pu)); + this.updateContext = (TtlUpdateContext<UV, GV>) updateContext; + } + + @Nonnull + List<ValueWithTs<UV>> getPrevUpdates() { + return prevUpdates; + } + + @Nonnull + TtlUpdateContext<UV, GV> getUpdateContext() { + return updateContext; + } + + @Override + public String toString() { + return "TtlVerificationContext{" + + "key=" + key + + ", verifierId='" + verifierId + '\'' + + ", prevUpdates=" + prevUpdates + + ", updateContext=" + updateContext + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6a36afd3/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java new file mode 100644 index 0000000..9302377 --- /dev/null +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.verify; + +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nonnull; + +import java.io.Serializable; + +/** User state value with timestamps before and after update. */ +public class ValueWithTs<V> implements Serializable { + private final V value; + private final long timestampBeforeUpdate; + private final long timestampAfterUpdate; + + public ValueWithTs(V value, long timestampBeforeUpdate, long timestampAfterUpdate) { + this.value = value; + this.timestampBeforeUpdate = timestampBeforeUpdate; + this.timestampAfterUpdate = timestampAfterUpdate; + } + + V getValue() { + return value; + } + + public long getTimestampBeforeUpdate() { + return timestampBeforeUpdate; + } + + public long getTimestampAfterUpdate() { + return timestampAfterUpdate; + } + + @Override + public String toString() { + return "ValueWithTs{" + + "value=" + value + + ", timestampBeforeUpdate=" + timestampBeforeUpdate + + ", timestampAfterUpdate=" + timestampAfterUpdate + + '}'; + } + + /** Serializer for Serializer. */ + public static class Serializer extends CompositeSerializer<ValueWithTs<?>> { + + public Serializer(TypeSerializer<?> userValueSerializer) { + super(true, userValueSerializer, LongSerializer.INSTANCE, LongSerializer.INSTANCE); + } + + @SuppressWarnings("unchecked") + Serializer(PrecomputedParameters precomputed, TypeSerializer<?>... fieldSerializers) { + super(precomputed, fieldSerializers); + } + + @Override + public ValueWithTs<?> createInstance(@Nonnull Object ... values) { + return new ValueWithTs<>(values[0], (Long) values[1], (Long) values[2]); + } + + @Override + protected void setField(@Nonnull ValueWithTs<?> value, int index, Object fieldValue) { + throw new UnsupportedOperationException(); + } + + @Override + protected Object getField(@Nonnull ValueWithTs<?> value, int index) { + switch (index) { + case 0: + return value.getValue(); + case 1: + return value.getTimestampBeforeUpdate(); + case 2: + return value.getTimestampAfterUpdate(); + default: + throw new FlinkRuntimeException("Unexpected field index for ValueWithTs"); + } + } + + @SuppressWarnings("unchecked") + @Override + protected CompositeSerializer<ValueWithTs<?>> createSerializerInstance( + PrecomputedParameters precomputed, + TypeSerializer<?>... originalSerializers) { + return new Serializer(precomputed, (TypeSerializer<Object>) originalSerializers[0]); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6a36afd3/flink-end-to-end-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index 091dd0b..d80ac32 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -50,6 +50,7 @@ under the License. <module>flink-elasticsearch5-test</module> <module>flink-quickstart-test</module> <module>flink-confluent-schema-registry</module> + <module>flink-stream-state-ttl-test</module> </modules> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/6a36afd3/flink-end-to-end-tests/run-nightly-tests.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 15c73d5..dc8424f 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -102,5 +102,8 @@ run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scrip run_test "Avro Confluent Schema Registry nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_confluent_schema_registry.sh" +run_test "State TTL Heap backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh file" +run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh rocks" + printf "\n[PASS] All tests passed\n" exit 0 http://git-wip-us.apache.org/repos/asf/flink/blob/6a36afd3/flink-end-to-end-tests/test-scripts/common.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index c78afe7..f4563cc 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -240,6 +240,15 @@ function start_cluster { done } +function start_taskmanagers { + tmnum=$1 + echo "Start ${tmnum} more task managers" + for (( c=0; c<tmnum; c++ )) + do + $FLINK_DIR/bin/taskmanager.sh start + done +} + function start_and_wait_for_tm { tm_query_result=$(curl -s "http://localhost:8081/taskmanagers") @@ -456,18 +465,17 @@ function s3_delete { https://${bucket}.s3.amazonaws.com/${s3_file} } -# This function starts the given number of task managers and monitors their processes. If a task manager process goes -# away a replacement is started. +# This function starts the given number of task managers and monitors their processes. +# If a task manager process goes away a replacement is started. function tm_watchdog { local expectedTm=$1 while true; do runningTm=`jps | grep -Eo 'TaskManagerRunner|TaskManager' | wc -l`; count=$((expectedTm-runningTm)) - for (( c=0; c<count; c++ )) - do - $FLINK_DIR/bin/taskmanager.sh start > /dev/null - done + if (( count != 0 )); then + start_taskmanagers ${count} > /dev/null + fi sleep 5; done } @@ -508,7 +516,8 @@ function rollback_flink_slf4j_metric_reporter() { function get_metric_processed_records { OPERATOR=$1 - N=$(grep ".General purpose test job.$OPERATOR.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | sed 's/.* //g' | tail -1) + JOB_NAME="${2:-General purpose test job}" + N=$(grep ".${JOB_NAME}.$OPERATOR.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | sed 's/.* //g' | tail -1) if [ -z $N ]; then N=0 fi @@ -517,7 +526,8 @@ function get_metric_processed_records { function get_num_metric_samples { OPERATOR=$1 - N=$(grep ".General purpose test job.$OPERATOR.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | wc -l) + JOB_NAME="${2:-General purpose test job}" + N=$(grep ".${JOB_NAME}.$OPERATOR.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | wc -l) if [ -z $N ]; then N=0 fi @@ -527,13 +537,14 @@ function get_num_metric_samples { function wait_oper_metric_num_in_records { OPERATOR=$1 MAX_NUM_METRICS="${2:-200}" - NUM_METRICS=$(get_num_metric_samples ${OPERATOR}) - OLD_NUM_METRICS=${3:-${NUM_METRICS}} + JOB_NAME="${3:-General purpose test job}" + NUM_METRICS=$(get_num_metric_samples ${OPERATOR} '${JOB_NAME}') + OLD_NUM_METRICS=${4:-${NUM_METRICS}} # monitor the numRecordsIn metric of the state machine operator in the second execution # we let the test finish once the second restore execution has processed 200 records while : ; do - NUM_METRICS=$(get_num_metric_samples ${OPERATOR}) - NUM_RECORDS=$(get_metric_processed_records ${OPERATOR}) + NUM_METRICS=$(get_num_metric_samples ${OPERATOR} "${JOB_NAME}") + NUM_RECORDS=$(get_metric_processed_records ${OPERATOR} "${JOB_NAME}") # only account for metrics that appeared in the second execution if (( $OLD_NUM_METRICS >= $NUM_METRICS )) ; then @@ -541,7 +552,7 @@ function wait_oper_metric_num_in_records { fi if (( $NUM_RECORDS < $MAX_NUM_METRICS )); then - echo "Waiting for job to process up to 200 records, current progress: $NUM_RECORDS records ..." + echo "Waiting for job to process up to ${MAX_NUM_METRICS} records, current progress: ${NUM_RECORDS} records ..." sleep 1 else break http://git-wip-us.apache.org/repos/asf/flink/blob/6a36afd3/flink-end-to-end-tests/test-scripts/test_stream_state_ttl.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/test_stream_state_ttl.sh b/flink-end-to-end-tests/test-scripts/test_stream_state_ttl.sh new file mode 100755 index 0000000..fb911f3 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_stream_state_ttl.sh @@ -0,0 +1,83 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +source "$(dirname "$0")"/common.sh + +STATE_BACKEND_TYPE="${1:-file}" +STATE_BACKEND_FILE_ASYNC="${2:-false}" +TTL="${3:-1000}" +PRECISION="${4:-5}" +PARALLELISM="${5-3}" +UPDATE_NUM="${6-1000}" + +CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir" + +TEST=flink-stream-state-ttl-test +TEST_PROGRAM_NAME=DataStreamStateTTLTestProgram +TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar + +setup_flink_slf4j_metric_reporter +function test_cleanup { + # don't call ourselves again for another signal interruption + trap "exit -1" INT + # don't call ourselves again for normal exit + trap "" EXIT + + # revert our modifications to the Flink distribution + rm ${FLINK_DIR}/lib/flink-metrics-slf4j-*.jar +} +trap test_cleanup INT +trap test_cleanup EXIT + +start_cluster +start_taskmanagers $PARALLELISM + +function job_id() { + CMD="${FLINK_DIR}/bin/flink run -d -p ${PARALLELISM} ${TEST_PROGRAM_JAR} \ + --test.semantics exactly-once \ + --environment.parallelism ${PARALLELISM} \ + --state_backend ${STATE_BACKEND_TYPE} \ + --state_ttl_verifier.ttl_milli ${TTL} \ + --state_ttl_verifier.precision_milli ${PRECISION} \ + --state_backend.checkpoint_directory ${CHECKPOINT_DIR} \ + --state_backend.file.async ${STATE_BACKEND_FILE_ASYNC} \ + --update_generator_source.sleep_time 10 \ + --update_generator_source.sleep_after_elements 1" + echo "${CMD}" +} + +JOB_CMD=$(job_id) +echo ${JOB_CMD} +JOB=$(${JOB_CMD} | grep 'Job has been submitted with JobID' | sed 's/.* //g') +wait_job_running ${JOB} +wait_oper_metric_num_in_records TtlVerifyUpdateFunction.0 ${UPDATE_NUM} 'State TTL test job' + +SAVEPOINT_PATH=$(take_savepoint ${JOB} ${TEST_DATA_DIR} \ + | grep "Savepoint completed. Path:" | sed 's/.* //g') + +cancel_job ${JOB} + +JOB_CMD=$(job_id) +echo ${JOB_CMD} +JOB=$(${JOB_CMD} | grep 'Job has been submitted with JobID' | sed 's/.* //g') +wait_job_running ${JOB} +wait_oper_metric_num_in_records TtlVerifyUpdateFunction.0 ${UPDATE_NUM} "State TTL test job" + +# if verification fails job produces failed TTL'ed state updates, +# output would be non-empty and the test will not pass
