[
https://issues.apache.org/jira/browse/FLINK-7287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16103450#comment-16103450
]
ASF GitHub Bot commented on FLINK-7287:
---------------------------------------
GitHub user NicoK opened a pull request:
https://github.com/apache/flink/pull/4414
[FLINK-7287][tests] fix test instabilities in KafkaConsumerTestBase
## What is the purpose of the change
fix test instabilities in KafkaConsumerTestBase
## Brief change log
* Properly ignore the `JobCancellationException` (several tests should have
always failed but did not due to not waiting for the job execution thread to
finish).
* Always wait for the job execution thread to finish before checking the
result of and returning from the test (although this is not strictly necessary
for the tests, it may uncover any hidden failures between the cancel command
and the actual cancellation but it also allows following tests to work with a
clean cluster and no interfering jobs).
## Verifying this change
This change is already covered by existing tests extending
`KafkaConsumerTestBase`, such as `Kafka010ITCase`.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/NicoK/flink flink-7287
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/4414.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4414
----
commit 31f672171bd44f86c4bd31bd383077141840e2a6
Author: Nico Kruber <[email protected]>
Date: 2017-07-27T16:31:13Z
[FLINK-7287][tests] fix checks ignoring a JobCancellationException
commit db4ec9f1237e704b761404b004a25df8f924e546
Author: Nico Kruber <[email protected]>
Date: 2017-07-27T16:32:14Z
[FLINK-7287][tests] fix main test threads not waiting for the job execution
thread to finish
----
> test instability in Kafka010ITCase.testCommitOffsetsToKafka
> -----------------------------------------------------------
>
> Key: FLINK-7287
> URL: https://issues.apache.org/jira/browse/FLINK-7287
> Project: Flink
> Issue Type: Improvement
> Components: Kafka Connector, Tests
> Affects Versions: 1.4.0
> Reporter: Nico Kruber
> Assignee: Nico Kruber
> Labels: test-stability
>
> sporadically, {{Kafka010ITCase.testCommitOffsetsToKafka}} seems to be
> failing, e.g.
> {code}
> ================================================================================
> Test
> testCommitOffsetsToKafka(org.apache.flink.streaming.connectors.kafka.Kafka010ITCase)
> is running.
> --------------------------------------------------------------------------------
> 12:29:31,597 INFO org.apache.flink.streaming.connectors.kafka.KafkaTestBase
> -
> ===================================
> == Writing sequence of 50 into testCommitOffsetsToKafkaTopic with p=3
> ===================================
> 12:29:31,597 INFO org.apache.flink.streaming.connectors.kafka.KafkaTestBase
> - Writing attempt #1
> 12:29:31,598 INFO
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl -
> Creating topic testCommitOffsetsToKafkaTopic-1
> 12:29:31,598 INFO org.I0Itec.zkclient.ZkEventThread
> - Starting ZkClient event thread.
> 12:29:31,599 INFO org.I0Itec.zkclient.ZkClient
> - Waiting for keeper state SyncConnected
> 12:29:31,601 INFO org.I0Itec.zkclient.ZkClient
> - zookeeper state changed (SyncConnected)
> 12:29:31,615 INFO org.I0Itec.zkclient.ZkEventThread
> - Terminate ZkClient event thread.
> 12:29:31,719 INFO org.I0Itec.zkclient.ZkEventThread
> - Starting ZkClient event thread.
> 12:29:31,722 INFO org.I0Itec.zkclient.ZkClient
> - Waiting for keeper state SyncConnected
> 12:29:31,728 INFO org.I0Itec.zkclient.ZkClient
> - zookeeper state changed (SyncConnected)
> 12:29:31,729 INFO org.I0Itec.zkclient.ZkEventThread
> - Terminate ZkClient event thread.
> 12:29:31,832 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase -
> Starting FlinkKafkaProducer (3/3) to produce into default topic
> testCommitOffsetsToKafkaTopic-1
> 12:29:31,840 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase -
> Starting FlinkKafkaProducer (2/3) to produce into default topic
> testCommitOffsetsToKafkaTopic-1
> 12:29:31,842 WARN
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase -
> Flushing on checkpoint is enabled, but checkpointing is not enabled.
> Disabling flushing.
> 12:29:31,844 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase -
> Starting FlinkKafkaProducer (1/3) to produce into default topic
> testCommitOffsetsToKafkaTopic-1
> 12:29:31,844 WARN
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase -
> Flushing on checkpoint is enabled, but checkpointing is not enabled.
> Disabling flushing.
> 12:29:31,846 WARN
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase -
> Flushing on checkpoint is enabled, but checkpointing is not enabled.
> Disabling flushing.
> 12:29:31,998 INFO org.apache.flink.streaming.connectors.kafka.KafkaTestBase
> - Finished writing sequence
> 12:29:31,998 INFO org.apache.flink.streaming.connectors.kafka.KafkaTestBase
> - Validating sequence
> 12:29:32,123 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - No
> restore state for FlinkKafkaConsumer.
> 12:29:32,129 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - No
> restore state for FlinkKafkaConsumer.
> 12:29:32,136 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - No
> restore state for FlinkKafkaConsumer.
> 12:29:32,139 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase -
> Consumer subtask 0 will start reading the following 1 partitions from the
> committed group offsets in Kafka:
> [KafkaTopicPartition{topic='testCommitOffsetsToKafkaTopic-1', partition=1}]
> 12:29:32,154 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase -
> Consumer subtask 2 will start reading the following 1 partitions from the
> committed group offsets in Kafka:
> [KafkaTopicPartition{topic='testCommitOffsetsToKafkaTopic-1', partition=0}]
> 12:29:32,236 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase -
> Consumer subtask 1 will start reading the following 1 partitions from the
> committed group offsets in Kafka:
> [KafkaTopicPartition{topic='testCommitOffsetsToKafkaTopic-1', partition=2}]
> 12:29:32,496 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - No
> restore state for FlinkKafkaConsumer.
> 12:29:32,507 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - No
> restore state for FlinkKafkaConsumer.
> 12:29:32,521 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - No
> restore state for FlinkKafkaConsumer.
> 12:29:32,531 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase -
> Consumer subtask 1 will start reading the following 1 partitions from the
> committed group offsets in Kafka:
> [KafkaTopicPartition{topic='testCommitOffsetsToKafkaTopic-1', partition=2}]
> 12:29:32,535 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase -
> Consumer subtask 2 will start reading the following 1 partitions from the
> committed group offsets in Kafka:
> [KafkaTopicPartition{topic='testCommitOffsetsToKafkaTopic-1', partition=0}]
> 12:29:32,628 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase -
> Consumer subtask 0 will start reading the following 1 partitions from the
> committed group offsets in Kafka:
> [KafkaTopicPartition{topic='testCommitOffsetsToKafkaTopic-1', partition=1}]
> 12:29:33,017 ERROR org.apache.flink.streaming.connectors.kafka.Kafka010ITCase
> -
> --------------------------------------------------------------------------------
> Test
> testCommitOffsetsToKafka(org.apache.flink.streaming.connectors.kafka.Kafka010ITCase)
> failed with:
> java.lang.RuntimeException: Job failed with an exception
> at
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runCommitOffsetsToKafka(KafkaConsumerTestBase.java:251)
> at
> org.apache.flink.streaming.connectors.kafka.Kafka010ITCase.testCommitOffsetsToKafka(Kafka010ITCase.java:162)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> at
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.runtime.client.JobCancellationException: Job was
> cancelled.
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:921)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:873)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:873)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/258046274/log.txt?X-Amz-Expires=30&X-Amz-Date=20170727T155552Z&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAJRYRXRSVGNKPKO5A/20170727/us-east-1/s3/aws4_request&X-Amz-SignedHeaders=host&X-Amz-Signature=0b629a0c2b5daedc65c8aa8eb3293bc956f8fd61dc70066a051ed5878b004dbf
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)