[
https://issues.apache.org/jira/browse/FLINK-5048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15662974#comment-15662974
]
ASF GitHub Bot commented on FLINK-5048:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/2789#discussion_r87744290
--- Diff:
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.metrics.MetricGroup;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The thread the runs the {@link KafkaConsumer}, connecting to the
brokers and polling records.
+ * The thread pushes the data into a {@link Handover} to be picked up by
the fetcher that will
+ * deserialize and emit the records.
+ *
+ * <p><b>IMPORTANT:</b> This thread must not be interrupted when
attempting to shut it down.
+ * The Kafka consumer code was found to not always handle interrupts well,
and to even
+ * deadlock in certain situations.
+ *
+ * <p>Implementation Note: This code is written to be reusable in later
versions of the KafkaConsumer.
+ * Because Kafka is not maintaining binary compatibility, we use a "call
bridge" as an indirection
+ * to the KafkaConsumer calls that change signature.
+ */
+public class KafkaConsumerThread extends Thread {
+
+ /** Logger for this consumer */
+ final Logger log;
+
+ /** The handover of data and exceptions between the consumer thread and
the task thread */
+ private final Handover handover;
+
+ /** The next offsets that the main thread should commit */
+ private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>>
nextOffsetsToCommit;
+
+ /** The configuration for the Kafka consumer */
+ private final Properties kafkaProperties;
+
+ /** The partitions that this consumer reads from */
+ private final KafkaTopicPartitionState<TopicPartition>[]
subscribedPartitions;
+
+ /** We get this from the outside to publish metrics. **/
+ private final MetricGroup kafkaMetricGroup;
+
+ /** The indirections on KafkaConsumer methods, for cases where
KafkaConsumer compatibility is broken */
+ private final KafkaConsumerCallBridge consumerCallBridge;
+
+ /** The maximum number of milliseconds to wait for a fetch batch */
+ private final long pollTimeout;
+
+ /** Flag whether to add Kafka's metrics to the Flink metrics */
+ private final boolean useMetrics;
+
+ /** Reference to the Kafka consumer, once it is created */
+ private volatile KafkaConsumer<byte[], byte[]> consumer;
+
+ /** Flag to mark the main work loop as alive */
+ private volatile boolean running;
+
+ /** Flag tracking whether the latest commit request has completed */
+ private volatile boolean commitInProgress;
+
+
+ public KafkaConsumerThread(
+ Logger log,
+ Handover handover,
+ Properties kafkaProperties,
+ KafkaTopicPartitionState<TopicPartition>[]
subscribedPartitions,
+ MetricGroup kafkaMetricGroup,
+ KafkaConsumerCallBridge consumerCallBridge,
+ String threadName,
+ long pollTimeout,
+ boolean useMetrics) {
+
+ super(threadName);
+ setDaemon(true);
+
+ this.log = checkNotNull(log);
+ this.handover = checkNotNull(handover);
+ this.kafkaProperties = checkNotNull(kafkaProperties);
+ this.subscribedPartitions = checkNotNull(subscribedPartitions);
+ this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup);
+ this.consumerCallBridge = checkNotNull(consumerCallBridge);
+ this.pollTimeout = pollTimeout;
+ this.useMetrics = useMetrics;
+
+ this.nextOffsetsToCommit = new AtomicReference<>();
+ this.running = true;
+ }
+
+ //
------------------------------------------------------------------------
+
+ @Override
+ public void run() {
+ // early exit check
+ if (!running) {
+ return;
+ }
+
+ // this is the means to talk to FlinkKafkaConsumer's main thread
+ final Handover handover = this.handover;
+
+ // This method initializes the KafkaConsumer and guarantees it
is torn down properly.
+ // This is important, because the consumer has multi-threading
issues,
+ // including concurrent 'close()' calls.
+ final KafkaConsumer<byte[], byte[]> consumer;
+ try {
+ consumer = new KafkaConsumer<>(kafkaProperties);
+ }
+ catch (Throwable t) {
+ handover.reportError(t);
+ return;
+ }
+
+ // from here on, the consumer is guaranteed to be closed
properly
+ try {
+ // The callback invoked by Kafka once an offset commit
is complete
+ final OffsetCommitCallback offsetCommitCallback = new
CommitCallback();
+
+ // tell the consumer which partitions to work with
+ consumerCallBridge.assignPartitions(consumer,
convertKafkaPartitions(subscribedPartitions));
+
+ // register Kafka's very own metrics in Flink's metric
reporters
+ if (useMetrics) {
+ // register Kafka metrics to Flink
+ Map<MetricName, ? extends Metric> metrics =
consumer.metrics();
+ if (metrics == null) {
+ // MapR's Kafka implementation returns
null here.
+ log.info("Consumer implementation does
not support metrics");
+ } else {
+ // we have Kafka metrics, register them
+ for (Map.Entry<MetricName, ? extends
Metric> metric: metrics.entrySet()) {
+
kafkaMetricGroup.gauge(metric.getKey().name(), new
KafkaMetricWrapper(metric.getValue()));
+ }
+ }
+ }
+
+ // early exit check
+ if (!running) {
+ return;
+ }
+
+ // seek the consumer to the initial offsets
+ for (KafkaTopicPartitionState<TopicPartition> partition
: subscribedPartitions) {
+ if (partition.isOffsetDefined()) {
+ log.info("Partition {} has restored
initial offsets {} from checkpoint / savepoint; " +
+ "seeking the consumer
to position {}",
+
partition.getKafkaPartitionHandle(), partition.getOffset(),
partition.getOffset() + 1);
+
+
consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
+ }
+ else {
+ // for partitions that do not have
offsets restored from a checkpoint/savepoint,
+ // we need to define our internal
offset state for them using the initial offsets retrieved from Kafka
+ // by the KafkaConsumer, so that they
are correctly checkpointed and committed on the next checkpoint
+
+ long fetchedOffset =
consumer.position(partition.getKafkaPartitionHandle());
+
+ log.info("Partition {} has no initial
offset; the consumer has position {}, " +
+ "so the initial offset
will be set to {}",
+
partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1);
+
+ // the fetched offset represents the
next record to process, so we need to subtract it by 1
+ partition.setOffset(fetchedOffset - 1);
+ }
+ }
+
+ // from now on, external operations may call the
consumer
+ this.consumer = consumer;
+
+ // the latest bulk of records. may carry across the
loop if the thread is woken up
+ // from blocking on the handover
+ ConsumerRecords<byte[], byte[]> records = null;
+
+ // main fetch loop
+ while (running) {
+
+ // check if there is something to commit
+ if (!commitInProgress) {
+ // get and reset the work-to-be
committed, so we don't repeatedly commit the same
+ final Map<TopicPartition,
OffsetAndMetadata> toCommit = nextOffsetsToCommit.getAndSet(null);
+
+ if (toCommit != null) {
+ log.debug("Sending async offset
commit request to Kafka broker");
+
+ // also record that a commit is
already in progress
+ // the order here matters!
first set the flag, then send the commit command.
+ commitInProgress = true;
+ consumer.commitAsync(toCommit,
offsetCommitCallback);
+ }
+ }
+
+ // get the next batch of records, unless we did
not manage to hand the old batch over
+ if (records == null) {
+ try {
+ records =
consumer.poll(pollTimeout);
+ }
+ catch (WakeupException we) {
+ continue;
+ }
+ }
+
+ try {
+ handover.produce(records);
+ records = null;
+ }
+ catch (Handover.WakeupException e) {
+ // fall through the loop
+ }
+ }
+ // end main fetch loop
+ }
+ catch (Throwable t) {
+ // let the main thread know and exit
+ // it may be that this exception comes because the main
thread closed the handover, in
+ // which case the below reporting is irrelevant, but
does not hurt either
+ handover.reportError(t);
+ }
+ finally {
+ try {
+ consumer.close();
+ }
+ catch (Throwable t) {
+ log.warn("Error while closing Kafka consumer",
t);
+ }
+ }
+ }
+
+ /**
+ * Shuts this thread down, waking up the thread gracefully if blocked.
+ */
+ public void shutdown() {
+ running = false;
+
+ // this wakes up the consumer if it is blocked handing over
records
+ handover.close();
+
+ // this wakes up the consumer if it is blocked in a kafka poll
+ if (consumer != null) {
+ consumer.wakeup();
--- End diff --
There was a notice comment before in the code, on calling `close()` on the
consumer:
> // - We cannot call close() on the consumer, because it will actually
throw
> // an exception if a concurrent call is in progress
Perhaps it is reasonable to note that here too, to only use `wakeup`, and
let the consumer safely close in the `finally` clause, to let others know so we
don't get confusing exceptions rethrown to the fetcher thread via the handover.
> Kafka Consumer (0.9/0.10) threading model leads problematic cancellation
> behavior
> ---------------------------------------------------------------------------------
>
> Key: FLINK-5048
> URL: https://issues.apache.org/jira/browse/FLINK-5048
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.1.3
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> The {{FLinkKafkaConsumer}} (0.9 / 0.10) spawns a separate thread that
> operates the KafkaConsumer. That thread is shielded from interrupts, because
> the Kafka Consumer has not been handling thread interrupts well.
> Since that thread is also the thread that emits records, it may block in the
> network stack (backpressure) or in chained operators. The later case leads to
> situations where cancellations get very slow unless that thread would be
> interrupted (which it cannot be).
> I propose to change the thread model as follows:
> - A spawned consumer thread pull from the KafkaConsumer and pushes its
> pulled batch of records into a blocking queue (size one)
> - The main thread of the task will pull the record batches from the
> blocking queue and emit the records.
> This allows actually for some additional I/O overlay while limiting the
> additional memory consumption - only two batches are ever held, one being
> fetched and one being emitted.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)