Hi,

Why not track this in a FLIP and a ticket and link this discussion thread.

My 2 cents

Etienne

Le 15/03/2023 à 10:01, Hongshun Wang a écrit :
  Hi devs,
I’d like to join this discussion. CC:Qingsheng
As discussed above, new partitions after the first discovery should be
consumed from EARLIEST offset.

However, when KafkaSourceEnumerator restarts after a job failure, it cannot
distinguish between unassigned partitions as first-discovered or new,
because the snapshot state currently only contains assignedPartitions
collection (the assigned partitions). We can solve this by adding a
unAssignedInitialPartitons collection to snapshot state, which represents
the collection of first discovered partitions that have not yet been
assigned. Also, we can combine this two collections into a single
collection if we add status to each item.

Besides , there is also a problem which often occurs in pattern mode to
distinguish between the following two case:

    1. Case1:  The first partition discovery is too slow, before which the
    checkpoint is finished and then job is restarted .At this time, the
    restored unAssignedInitialPartitons is an empty collection, which means
    non-discovery. The next discovery will be treated as the first discovery.
    2. Case2:  The first time the partition is obtained is empty, and new
    partitions can only be obtained after multiple partition discoveries. If a
    restart occurs between this period, the restored
    *unAssignedInitialPartitons* is also an empty collection, which means
    empty-discovery. However, the next discovery should be treated as a new
    discovery.

We can solve this problem by adding a boolean value(*firstDiscoveryDone*)
to snapshot state, which represents whether the first-discovery has been
done.

Also two rejected alternatives :

    1. Change the KafkaSourceEnumerator's snapshotState method to a blocking
    one, which resumes only after the first-discovered partition has been
    successfully assigned to KafkaSourceReader. The advantage of this approach
    is no need to change the snapshot state's variable values. However, if
    first-discovered partitions are not assigned before checkpointing, the
    SourceCoordinator's event-loop thread will be blocked, but partition
    assignment also requires the event-loop thread to execute, which will cause
    thread self-locking.
    2. An alternative to the *firstDiscoveryDone* variable. If we change the
    first discovery method to a synchronous method, we can ensure that Case1
    will never happen. Because when the event-loop thread starts, it first adds
    a discovery event to the blocking queue. When it turns to execute the
    checkpoint event, the partition has already been discovered successfully.
    However, If partition discovery is a heavily time-consuming operation, the
    SourceCoordinator cannot process other event operations during the waiting
    period, such as reader registration. It is a waste.

Best regards,
Hongshun

On 2023/01/13 03:31:20 Qingsheng Ren wrote:
Hi devs,

I’d like to start a discussion about enabling the dynamic partition
discovery feature by default in Kafka source. Dynamic partition discovery
[1] is a useful feature in Kafka source especially under the scenario when
the consuming Kafka topic scales out, or the source subscribes to multiple
Kafka topics with a pattern. Users don’t have to restart the Flink job to
consume messages in the new partition with this feature enabled.
Currently,
dynamic partition discovery is disabled by default and users have to
explicitly specify the interval of discovery in order to turn it on.

# Breaking changes

For Kafka table source:

- “scan.topic-partition-discovery.interval” will be set to 30 seconds by
default.
- As we need to provide a way for users to disable the feature,
“scan.topic-partition-discovery.interval” = “0” will be used to turn off
the discovery. Before this proposal, “0” means to enable partition
discovery with interval = 0, which is a bit senseless in practice.
Unfortunately we can't use negative values as the type of this option is
Duration.

For KafkaSource (DataStream API)

- Dynamic partition discovery in Kafka source will be enabled by default,
with discovery interval set to 30 seconds.
- To align with table source, only a positive value for option “
partition.discovery.interval.ms” could be used to specify the discovery
interval. Both negative and zero will be interpreted as disabling the
feature.

# Overhead of partition discovery

Partition discovery is made on KafkaSourceEnumerator, which asynchronously
fetches topic metadata from Kafka cluster and checks if there’s any new
topic and partition. This shouldn’t introduce performance issues on the
Flink side.

On the Kafka broker side, partition discovery makes MetadataRequest to
Kafka broker for fetching topic infos. Considering Kafka broker has its
metadata cache and the default request frequency is relatively low (per 30
seconds), this is not a heavy operation and the performance of the broker
won’t be affected a lot. It'll also be great to get some inputs from Kafka
experts.

Looking forward to your feedback!

[1]

https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/kafka/#dynamic-partition-discovery
Best regards,
Qingsheng

Reply via email to