[
https://issues.apache.org/jira/browse/FLINK-20114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17238154#comment-17238154
]
Robert Metzger commented on FLINK-20114:
----------------------------------------
Testing Issue 1:
I started a job with the KafkaSource on a setup where no Kafka broker was
reachable. The job was running without an errors for 15 minutes.
I would expect the job to fail after a timeout of a few minutes if no broker
could be reached?
Testing Issue 2:
While testing the new Kafka source on current master, I tried stopping the job
with a savepoint. I came across this unexpected exception:
{code}
2020-11-24 14:37:56,517 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Triggering cancel-with-savepoint for job
fae21d0ce1804445dd4cc904fcdfbf43.
2020-11-24 14:37:56,522 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 488 (type=SAVEPOINT) @ 1606225076519 for job
fae21d0ce1804445dd4cc904fcdfbf43.
2020-11-24 14:37:56,554 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Savepoint stored in
file:/tmp/sm-savepoint/savepoint-fae21d-9723b187d107. Now cancelling
fae21d0ce1804445dd4cc904fcdfbf43.
2020-11-24 14:37:56,555 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed
checkpoint 488 for job fae21d0ce1804445dd4cc904fcdfbf43 (7183 bytes in 35 ms).
2020-11-24 14:37:56,555 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job State
machine job (fae21d0ce1804445dd4cc904fcdfbf43) switched from state RUNNING to
CANCELLING.
2020-11-24 14:37:56,556 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Kafka
Source (1/5) (b23b8a46bee25317d2bf1822bc043c31) switched from RUNNING to
CANCELING.
2020-11-24 14:37:56,557 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Kafka
Source (2/5) (e40a16e61b64c8e71e873383a18c0a45) switched from RUNNING to
CANCELING.
2020-11-24 14:37:56,557 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Kafka
Source (3/5) (d5b299c2e3f7a9dd9cfc5579c55845ba) switched from RUNNING to
CANCELING.
2020-11-24 14:37:56,557 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Kafka
Source (4/5) (06bb492f8fe20534c9c3e9f5adf16d3c) switched from RUNNING to
CANCELING.
2020-11-24 14:37:56,557 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Kafka
Source (5/5) (70c4d09343b647ee4f64b784b88145b6) switched from RUNNING to
CANCELING.
2020-11-24 14:37:56,558 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Flat Map ->
Sink: Print to Std. Out (1/5) (5b288ff9d47a2528783fe04d30844ac7) switched from
RUNNING to CANCELING.
2020-11-24 14:37:56,558 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Flat Map ->
Sink: Print to Std. Out (2/5) (e8b2cd2fcd7b5d240d7ce83693ad88b6) switched from
RUNNING to CANCELING.
2020-11-24 14:37:56,558 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Flat Map ->
Sink: Print to Std. Out (3/5) (a5ff2af6ae3c00104e0430eff149c2e8) switched from
RUNNING to CANCELING.
2020-11-24 14:37:56,558 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Flat Map ->
Sink: Print to Std. Out (4/5) (26ff08ea4128b9fa16bfe4251f390c11) switched from
RUNNING to CANCELING.
2020-11-24 14:37:56,558 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Flat Map ->
Sink: Print to Std. Out (5/5) (658b5ccf92f07ac5a3b2ea9b648c4ba9) switched from
RUNNING to CANCELING.
2020-11-24 14:37:56,560 INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking
checkpoint 488 as completed for source Source: Kafka Source.
2020-11-24 14:37:56,575 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Kafka
Source (1/5) (b23b8a46bee25317d2bf1822bc043c31) switched from CANCELING to
CANCELED.
2020-11-24 14:37:56,576 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Kafka
Source (3/5) (d5b299c2e3f7a9dd9cfc5579c55845ba) switched from CANCELING to
CANCELED.
2020-11-24 14:37:56,577 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Kafka
Source (2/5) (e40a16e61b64c8e71e873383a18c0a45) switched from CANCELING to
CANCELED.
2020-11-24 14:37:56,578 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Kafka
Source (5/5) (70c4d09343b647ee4f64b784b88145b6) switched from CANCELING to
CANCELED.
2020-11-24 14:37:56,579 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Flat Map ->
Sink: Print to Std. Out (2/5) (e8b2cd2fcd7b5d240d7ce83693ad88b6) switched from
CANCELING to CANCELED.
2020-11-24 14:37:56,580 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Flat Map ->
Sink: Print to Std. Out (5/5) (658b5ccf92f07ac5a3b2ea9b648c4ba9) switched from
CANCELING to CANCELED.
2020-11-24 14:37:56,582 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Kafka
Source (4/5) (06bb492f8fe20534c9c3e9f5adf16d3c) switched from CANCELING to
CANCELED.
2020-11-24 14:37:56,583 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Flat Map ->
Sink: Print to Std. Out (4/5) (26ff08ea4128b9fa16bfe4251f390c11) switched from
CANCELING to CANCELED.
2020-11-24 14:37:56,589 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Flat Map ->
Sink: Print to Std. Out (1/5) (5b288ff9d47a2528783fe04d30844ac7) switched from
CANCELING to CANCELED.
2020-11-24 14:37:56,591 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Flat Map ->
Sink: Print to Std. Out (3/5) (a5ff2af6ae3c00104e0430eff149c2e8) switched from
CANCELING to CANCELED.
2020-11-24 14:37:56,592 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job State
machine job (fae21d0ce1804445dd4cc904fcdfbf43) switched from state CANCELLING
to CANCELED.
2020-11-24 14:37:56,592 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping
checkpoint coordinator for job fae21d0ce1804445dd4cc904fcdfbf43.
2020-11-24 14:37:56,593 INFO
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] -
Shutting down
2020-11-24 14:37:56,593 INFO
org.apache.flink.runtime.checkpoint.CompletedCheckpoint [] - Checkpoint
with ID 488 at 'file:/tmp/sm-savepoint/savepoint-fae21d-9723b187d107' not
discarded.
2020-11-24 14:37:56,596 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job
fae21d0ce1804445dd4cc904fcdfbf43 reached globally terminal state CANCELED.
2020-11-24 14:37:56,630 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Stopping the JobMaster for job State machine
job(fae21d0ce1804445dd4cc904fcdfbf43).
2020-11-24 14:37:56,631 INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Closing
SourceCoordinator for source Source: Kafka Source.
2020-11-24 14:37:56,631 ERROR
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext [] -
Exception while handling result from async call in SourceCoordinator-Source:
Kafka Source. Triggering job failover.
org.apache.flink.util.FlinkRuntimeException: Failed to handle partition splits
change due to
at
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:199)
~[blob_p-38777177cae883549cd5d28a76d1854e5087cc9f-075706c79af3170480b19252d2647a17:?]
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$1(ExecutorNotifier.java:86)
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
[?:?]
at java.lang.Thread.run(Thread.java:832) [?:?]
Caused by: java.lang.RuntimeException: Failed to get topic metadata.
at
org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:60)
~[blob_p-38777177cae883549cd5d28a76d1854e5087cc9f-075706c79af3170480b19252d2647a17:?]
at
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:176)
~[blob_p-38777177cae883549cd5d28a76d1854e5087cc9f-075706c79af3170480b19252d2647a17:?]
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83)
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
~[?:?]
... 3 more
Caused by: java.lang.InterruptedException
at java.lang.Object.wait(Native Method) ~[?:?]
at java.lang.Object.wait(Object.java:321) ~[?:?]
at
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92)
~[blob_p-38777177cae883549cd5d28a76d1854e5087cc9f-075706c79af3170480b19252d2647a17:?]
at
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
~[blob_p-38777177cae883549cd5d28a76d1854e5087cc9f-075706c79af3170480b19252d2647a17:?]
at
org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:58)
~[blob_p-38777177cae883549cd5d28a76d1854e5087cc9f-075706c79af3170480b19252d2647a17:?]
at
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:176)
~[blob_p-38777177cae883549cd5d28a76d1854e5087cc9f-075706c79af3170480b19252d2647a17:?]
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83)
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
~[?:?]
... 3 more
2020-11-24 14:37:56,636 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Suspending
SlotPool.
2020-11-24 14:37:56,637 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Close ResourceManager connection
1dc7c4d8b846d99ade33fbd7bfb620cf: Stopping JobMaster for job State machine
job(fae21d0ce1804445dd4cc904fcdfbf43)..
2020-11-24 14:37:56,637 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Stopping
SlotPool.
2020-11-24 14:37:56,638 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Disconnect job manager
[email protected]://flink@localhost:6123/user/rpc/jobmanager_2
for job fae21d0ce1804445dd4cc904fcdfbf43 from the resource manager.
{code}
It does not seem to cause any issues, except that the logged exception might
confuse users.
Testing Issue 3:
I also noticed that the {{NetworkClient}} kept logging every few seconds for ~
1.5 minutes after the job has finished. It eventually stopped with a timeout.
Maybe we can proactively stop the NetworkClient on the JobManager to avoid
resource leaks in case the timeout is configured differently on a user setup:
{code}
2020-11-24 14:37:56,637 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Stopping
SlotPool.
2020-11-24 14:37:56,638 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Disconnect job manager
[email protected]://flink@localhost:6123/user/rpc/jobmanager_2
for job fae21d0ce1804445dd4cc904fcdfbf43 from the resource manager.
2020-11-24 14:37:57,370 WARN org.apache.kafka.clients.NetworkClient
[] - [AdminClient clientId=null-enumerator-admin-client] Connection
to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not
be available.
2020-11-24 14:37:58,642 WARN org.apache.kafka.clients.NetworkClient
[] - [AdminClient clientId=null-enumerator-admin-client] Connection
to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not
be available.
2020-11-24 14:37:59,806 WARN org.apache.kafka.clients.NetworkClient
[] - [AdminClient clientId=null-enumerator-admin-client] Connection
to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not
be available.
2020-11-24 14:38:00,973 WARN org.apache.kafka.clients.NetworkClient
[] - [AdminClient clientId=null-enumerator-admin-client] Connection
to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not
be available.
[ ... ]
2020-11-24 14:38:56,002 WARN org.apache.kafka.clients.NetworkClient
[] - [AdminClient clientId=null-enumerator-admin-client] Connection
to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not
be available.
2020-11-24 14:38:56,639 INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
coordinator for source Source: Kafka Source closed.
2020-11-24 14:38:56,959 WARN org.apache.kafka.clients.NetworkClient
[] - [AdminClient clientId=null-enumerator-admin-client] Connection
to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not
be available.
[ ... ]
2020-11-24 14:39:37,927 WARN org.apache.kafka.clients.NetworkClient
[] - [AdminClient clientId=null-enumerator-admin-client] Connection
to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not
be available.
2020-11-24 14:39:38,890 WARN org.apache.kafka.clients.NetworkClient
[] - [AdminClient clientId=null-enumerator-admin-client] Connection
to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not
be available.
2020-11-24 14:39:39,944 WARN org.apache.kafka.clients.NetworkClient
[] - [AdminClient clientId=null-enumerator-admin-client] Connection
to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not
be available.
2020-11-24 14:39:40,050 INFO
org.apache.kafka.clients.admin.internals.AdminMetadataManager [] - [AdminClient
clientId=null-enumerator-admin-client] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node
assignment.
2020-11-24 14:39:40,158 INFO
org.apache.kafka.clients.admin.internals.AdminMetadataManager [] - [AdminClient
clientId=null-enumerator-admin-client] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has
exited.
{code}
Testing Issue 4:
When cancelling my job, I saw this WARNING in the logs. I have not added any
jars into my lib folder.
{code}
2020-11-24 15:09:42,325 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Disconnect job manager
[email protected]://flink@localhost:6123/user/rpc/jobmanager_5
for job b5bb1c9bb705d9d771b63e33dc6e877f from the resource manager.
2020-11-24 15:09:42,329 WARN org.apache.kafka.common.utils.Utils
[] - Failed to close KafkaClient with type
org.apache.kafka.clients.NetworkClient
java.lang.NoClassDefFoundError:
org/apache/kafka/common/network/Selector$CloseMode
at org.apache.kafka.common.network.Selector.close(Selector.java:806)
~[blob_p-38777177cae883549cd5d28a76d1854e5087cc9f-1eba36fb91ea0b66651554912ceaca10:?]
at org.apache.kafka.common.network.Selector.close(Selector.java:365)
~[blob_p-38777177cae883549cd5d28a76d1854e5087cc9f-1eba36fb91ea0b66651554912ceaca10:?]
at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:639)
~[blob_p-38777177cae883549cd5d28a76d1854e5087cc9f-1eba36fb91ea0b66651554912ceaca10:?]
at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:834)
[blob_p-38777177cae883549cd5d28a76d1854e5087cc9f-1eba36fb91ea0b66651554912ceaca10:?]
at
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1219)
[blob_p-38777177cae883549cd5d28a76d1854e5087cc9f-1eba36fb91ea0b66651554912ceaca10:?]
at java.lang.Thread.run(Thread.java:832) [?:?]
Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.common.network.Selector$CloseMode
at java.net.URLClassLoader.findClass(URLClassLoader.java:435) ~[?:?]
at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72)
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
... 6 more
2020-11-24 15:09:42,330 INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
coordinator for source Source: Kafka Source closed.
{code}
> Test Kafka Source based on the new Source API
> ----------------------------------------------
>
> Key: FLINK-20114
> URL: https://issues.apache.org/jira/browse/FLINK-20114
> Project: Flink
> Issue Type: Sub-task
> Components: Connectors / Kafka
> Affects Versions: 1.12.0
> Reporter: Robert Metzger
> Assignee: Roman Khachatryan
> Priority: Critical
> Fix For: 1.12.0
>
>
> Feature introduced in https://issues.apache.org/jira/browse/FLINK-18323
> --------
> [General Information about the Flink 1.12 release
> testing|https://cwiki.apache.org/confluence/display/FLINK/1.12+Release+-+Community+Testing]
> When testing a feature, consider the following aspects:
> - Is the documentation easy to understand
> - Are the error messages, log messages, APIs etc. easy to understand
> - Is the feature working as expected under normal conditions
> - Is the feature working / failing as expected with invalid input, induced
> errors etc.
> If you find a problem during testing, please file a ticket
> (Priority=Critical; Fix Version = 1.12.0), and link it in this testing ticket.
> During the testing, and once you are finished, please write a short summary
> of all things you have tested.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)