[
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16280126#comment-16280126
]
ASF GitHub Bot commented on FLINK-7880:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5062#discussion_r155225199
--- Diff:
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
---
@@ -260,89 +260,90 @@ public void testDuplicateRegistrationFailsJob()
throws Exception {
final Deadline deadline = TEST_TIMEOUT.fromNow();
final int numKeys = 256;
- JobID jobId = null;
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStateBackend(stateBackend);
+ env.setParallelism(maxParallelism);
+ // Very important, because cluster is shared between tests and
we
+ // don't explicitly check that all slots are available before
+ // submitting.
+
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
1000L));
- try {
- //
- // Test program
- //
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStateBackend(stateBackend);
- env.setParallelism(maxParallelism);
- // Very important, because cluster is shared between
tests and we
- // don't explicitly check that all slots are available
before
- // submitting.
-
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
1000L));
-
- DataStream<Tuple2<Integer, Long>> source = env
- .addSource(new
TestKeyRangeSource(numKeys));
-
- // Reducing state
- ReducingStateDescriptor<Tuple2<Integer, Long>>
reducingState = new ReducingStateDescriptor<>(
- "any-name",
- new SumReduce(),
- source.getType());
-
- final String queryName = "duplicate-me";
-
- final QueryableStateStream<Integer, Tuple2<Integer,
Long>> queryableState =
- source.keyBy(new
KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long
serialVersionUID = -4126824763829132959L;
-
- @Override
- public Integer
getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).asQueryableState(queryName,
reducingState);
+ DataStream<Tuple2<Integer, Long>> source = env.addSource(new
TestKeyRangeSource(numKeys));
- final QueryableStateStream<Integer, Tuple2<Integer,
Long>> duplicate =
- source.keyBy(new
KeySelector<Tuple2<Integer, Long>, Integer>() {
- private static final long
serialVersionUID = -6265024000462809436L;
+ // Reducing state
+ ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState =
new ReducingStateDescriptor<>(
+ "any-name",
+ new SumReduce(),
+ source.getType());
- @Override
- public Integer
getKey(Tuple2<Integer, Long> value) throws Exception {
- return value.f0;
- }
- }).asQueryableState(queryName);
+ final String queryName = "duplicate-me";
- // Submit the job graph
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobId = jobGraph.getJobID();
+ final QueryableStateStream<Integer, Tuple2<Integer, Long>>
queryableState =
+ source.keyBy(new KeySelector<Tuple2<Integer,
Long>, Integer>() {
+ private static final long
serialVersionUID = -4126824763829132959L;
- final
CompletableFuture<TestingJobManagerMessages.JobStatusIs> failedFuture =
- notifyWhenJobStatusIs(jobId,
JobStatus.FAILED, deadline);
+ @Override
+ public Integer getKey(Tuple2<Integer,
Long> value) {
+ return value.f0;
+ }
+ }).asQueryableState(queryName, reducingState);
- cluster.submitJobDetached(jobGraph);
+ final QueryableStateStream<Integer, Tuple2<Integer, Long>>
duplicate =
+ source.keyBy(new KeySelector<Tuple2<Integer,
Long>, Integer>() {
+ private static final long
serialVersionUID = -6265024000462809436L;
- TestingJobManagerMessages.JobStatusIs jobStatus =
-
failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
- assertEquals(JobStatus.FAILED, jobStatus.state());
+ @Override
+ public Integer getKey(Tuple2<Integer,
Long> value) {
+ return value.f0;
+ }
+ }).asQueryableState(queryName);
+
+ // Submit the job graph
+ final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ final JobID jobId = jobGraph.getJobID();
+
+ final CompletableFuture<TestingJobManagerMessages.JobStatusIs>
failedFuture =
+ notifyWhenJobStatusIs(jobId, JobStatus.FAILED,
deadline);
- // Get the job and check the cause
- JobManagerMessages.JobFound jobFound =
FutureUtils.toJava(
-
cluster.getLeaderGateway(deadline.timeLeft())
- .ask(new
JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
-
.mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
- .get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS);
+ final CompletableFuture<TestingJobManagerMessages.JobStatusIs>
cancellationFuture =
+ notifyWhenJobStatusIs(jobId,
JobStatus.CANCELED, deadline);
- String failureCause =
jobFound.executionGraph().getFailureCause().getExceptionAsString();
+ cluster.submitJobDetached(jobGraph);
- assertTrue("Not instance of SuppressRestartsException",
failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));
- int causedByIndex = failureCause.indexOf("Caused by: ");
- String subFailureCause =
failureCause.substring(causedByIndex + "Caused by: ".length());
- assertTrue("Not caused by IllegalStateException",
subFailureCause.startsWith("java.lang.IllegalStateException"));
- assertTrue("Exception does not contain registration
name", subFailureCause.contains(queryName));
+ try {
+ final TestingJobManagerMessages.JobStatusIs jobStatus =
+
failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ assertEquals(JobStatus.FAILED, jobStatus.state());
--- End diff --
isn't this always true if the future did not time out? (In which case get()
throws a TimeoutException)
> flink-queryable-state-java fails with core-dump
> -----------------------------------------------
>
> Key: FLINK-7880
> URL: https://issues.apache.org/jira/browse/FLINK-7880
> Project: Flink
> Issue Type: Bug
> Components: Queryable State, Tests
> Affects Versions: 1.4.0
> Reporter: Till Rohrmann
> Assignee: Kostas Kloudas
> Priority: Critical
> Labels: test-stability
>
> The {{flink-queryable-state-java}} module fails on Travis with a core dump.
> https://travis-ci.org/tillrohrmann/flink/jobs/289949829
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)