[
https://issues.apache.org/jira/browse/FLINK-10531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16688409#comment-16688409
]
ASF GitHub Bot commented on FLINK-10531:
----------------------------------------
azagrebin commented on a change in pull request #7036: [FLINK-10531][e2e] Fix
unstable TTL end-to-end test.
URL: https://github.com/apache/flink/pull/7036#discussion_r233938203
##########
File path:
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
##########
@@ -90,11 +107,83 @@ public static void main(String[] args) throws Exception {
.addSource(new TtlStateUpdateSource(keySpace,
sleepAfterElements, sleepTime))
.name("TtlStateUpdateSource")
.keyBy(TtlStateUpdate::getKey)
- .flatMap(new TtlVerifyUpdateFunction(ttlConfig,
reportStatAfterUpdatesNum))
+ .flatMap(new TtlVerifyUpdateFunction(ttlConfig,
ttlTimeProvider, reportStatAfterUpdatesNum))
.name("TtlVerifyUpdateFunction")
.addSink(new PrintSinkFunction<>())
.name("PrintFailedVerifications");
env.execute("State TTL test job");
}
+
+ /**
+ * Sets the state backend to a new {@link StubStateBackend} which has a
{@link MonotonicTTLTimeProvider}.
+ *
+ * @param env The {@link StreamExecutionEnvironment} of the job.
+ * @return The {@link MonotonicTTLTimeProvider}.
+ */
+ private static MonotonicTTLTimeProvider
setBackendWithCustomTTLTimeProvider(StreamExecutionEnvironment env) {
+ final MonotonicTTLTimeProvider ttlTimeProvider = new
MonotonicTTLTimeProvider();
+
+ final StateBackend configuredBackend = env.getStateBackend();
+ final StateBackend stubBackend = new
StubStateBackend(configuredBackend, ttlTimeProvider);
+ env.setStateBackend(stubBackend);
+
+ return ttlTimeProvider;
+ }
+
+ /**
+ * A stub implementation of the {@link StateBackend} that allows the
use of
+ * a custom {@link TtlTimeProvider}.
+ */
+ private static final class StubStateBackend implements StateBackend {
Review comment:
Let's also generate UUID for `StubStateBackend`, `MonotonicTTLTimeProvider`,
`TtlVerifyUpdateFunction`, `UpdateStat`, `AggregateFunction`,
`TtlUpdateContext`, `TtlVerificationContext`, `ValueWithTs` and
`ValueWithTs.Serializer`.
I would suggest to create `StateBackendWrapperAdaptor` in main code along
with `StateBackend` as well which would always wrap `wrappedBackend` and
forward all `StateBackend` interface methods. Here we would need to override
only relevant methods. The adaptor could be reused in future for similar cases
like we have here. If `StateBackend` interface gets more methods, they would
need default forwarding only in `StateBackendWrapperAdaptor` and other
extending classes can stay untouched.
Otherwise, I would suggest to move this class at least to separate file.
I leave these last ideas up to you.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> State TTL RocksDb backend end-to-end test failed on Travis
> ----------------------------------------------------------
>
> Key: FLINK-10531
> URL: https://issues.apache.org/jira/browse/FLINK-10531
> Project: Flink
> Issue Type: Bug
> Components: Tests
> Affects Versions: 1.6.1
> Reporter: Till Rohrmann
> Assignee: Kostas Kloudas
> Priority: Critical
> Labels: pull-request-available, test-stability
> Fix For: 1.7.0
>
>
> The {{State TTL RocksDb backend end-to-end test}} end-to-end test failed on
> Travis.
> https://travis-ci.org/apache/flink/jobs/438226190
> https://api.travis-ci.org/v3/job/438226190/log.txt
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)