[
https://issues.apache.org/jira/browse/IGNITE-18209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ilya Shishkov updated IGNITE-18209:
-----------------------------------
Description:
Currently, there is a possible bottleneck in metadata synchronized method
{{KafkaToIgniteMetadataUpdater#updateMetadata}} for cases, when all
{{KafkaToIgniteCdcStreamerApplier}} instances will try to update metadata at
the same moment.
Scenario:
# {{KafkaToIgniteCdcStreamer}} contains multiple
{{KafkaToIgniteCdcStreamerApplier}} which shares _single_
{{{}KafkaToIgniteMetadataUpdater{}}}.
# All appliers handle corresponding partitions consequently.
# Insertion of unknown type is performed and leads to type and mapping
registrations on all nodes.
# {{META_UPDATE_MARKER}} is sent twice to each Kafka partition of event topic
{_}from every node{_}: firstly, in case of type mappings updates, secondly, in
case of binary types update.
# When first {{KafkaToIgniteCdcStreamerApplier}} meets {{META_UPDATE_MARKER}}
it calls {{KafkaToIgniteMetadataUpdater#updateMetadata}} which in turn calls
{{{}KafkaConsumer#poll{}}}, which returns immediately [1] when data is present
in metadata topic. If there are few binary types and mappings to update, some
{{KafkaToIgniteCdcStreamerApplier}} thread will consume all entries from
metadata topic.
# All other threads of all {{KafkaToIgniteCdcStreamerApplier}} will call
{{KafkaConsumer#poll}} from empty metadata topic, which will remain blocked
until new data becomes available or request timeout occurs [1].
# Because of {{synchronized}} access to
{{KafkaToIgniteMetadataUpdater#updateMetadata}} all threads of all
{{KafkaToIgniteCdcStreamerApplier}} will form a sequence of calls. Each call
will block remaining appliers threads for {{kafkaReqTimeout}} period (if
metadata topic remains empty).
# The last call, i.e. last Kafka partition polling in this chain will happen
at least after {{clusterSize x (topicPartitions x 2 - 1) x kafkaReqTimeout}}
period. For example for default timeout and 16 Kafka partitions _last partition
will be consumed after 1.5 minutes_ in case of two one-node clusters.
# Amount of threads in {{KafkaToIgniteCdcStreamer}} does not make sence.
# Data updates are blocked for Kafka partitions with unhandled update markers.
As I understand possible solutions are:
# Tune in timeouts for {{KafkaConsumer#poll}}:
[PR#202|https://github.com/apache/ignite-extensions/pull/202]
# -Eliminate blocking in {{KafkaToIgniteMetadataUpdater#updateMetadata}}
caused by calling {{KafkaConsumer#poll}} over empty metadata topic. We can try
to calculate a lag for topic and poll data only when the lag is present.-
[PR#199|https://github.com/apache/ignite-extensions/pull/199]
# -Hold information about replicated types or get it from
{{{}BinaryContext{}}}. Information about type can be sent with
{{{}META_UPDATE_MARKER{}}}:- see PoC
[PR#187|https://github.com/apache/ignite-extensions/pull/187].
# {-}Completely remove metadata topic, and send metadata merged with marker
directly into event topic{-}: see PoC
[PR#196|https://github.com/apache/ignite-extensions/pull/196]
# Any other ways to sync appliers?
Links:
#
[https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration-]
was:
Currently, there is a possible bottleneck in synchronized method
{{KafkaToIgniteMetadataUpdater#updateMetadata}} for cases, when all
{{KafkaToIgniteCdcStreamerApplier}} instances will try to update metadata at
the same moment.
Scenario:
# {{KafkaToIgniteCdcStreamer}} contains multiple
{{KafkaToIgniteCdcStreamerApplier}} which shares _single_
{{{}KafkaToIgniteMetadataUpdater{}}}.
# All appliers handle corresponding partitions consequently.
# Insertion of unknown type is performed and leads to type and mapping
registrations on all nodes.
# {{META_UPDATE_MARKER}} is sent twice to each Kafka partition of event topic
{_}from every node{_}: firstly, in case of type mappings updates, secondly, in
case of binary types update.
# When first {{KafkaToIgniteCdcStreamerApplier}} meets {{META_UPDATE_MARKER}}
it calls {{KafkaToIgniteMetadataUpdater#updateMetadata}} which in turn calls
{{{}KafkaConsumer#poll{}}}, which returns immediately [1] when data is present
in metadata topic. If there are few binary types and mappings to update, some
{{KafkaToIgniteCdcStreamerApplier}} thread will consume all entries from
metadata topic.
# All other threads of all {{KafkaToIgniteCdcStreamerApplier}} will call
{{KafkaConsumer#poll}} from empty metadata topic, which will remain blocked
until new data becomes available or request timeout occurs [1].
# Because of {{synchronized}} access to
{{KafkaToIgniteMetadataUpdater#updateMetadata}} all threads of all
{{KafkaToIgniteCdcStreamerApplier}} will form a sequence of calls. Each call
will block remaining appliers threads for {{kafkaReqTimeout}} period (if
metadata topic remains empty).
# The last call, i.e. last Kafka partition polling in this chain will happen
at least after {{clusterSize x (topicPartitions x 2 - 1) x kafkaReqTimeout}}
period. For example for default timeout and 16 Kafka partitions _last partition
will be consumed after 1.5 minutes_ in case of two one-node clusters.
# Amount of threads in {{KafkaToIgniteCdcStreamer}} does not make sence.
# Data updates are blocked for Kafka partitions with unhandled update markers.
As I understand possible solutions are:
# Tune in timeouts for {{KafkaConsumer#poll}}:
[PR#202|https://github.com/apache/ignite-extensions/pull/202]
# -Eliminate blocking in {{KafkaToIgniteMetadataUpdater#updateMetadata}}
caused by calling {{KafkaConsumer#poll}} over empty metadata topic. We can try
to calculate a lag for topic and poll data only when the lag is present.-
[PR#199|https://github.com/apache/ignite-extensions/pull/199]
# -Hold information about replicated types or get it from
{{{}BinaryContext{}}}. Information about type can be sent with
{{{}META_UPDATE_MARKER{}}}:- see PoC
[PR#187|https://github.com/apache/ignite-extensions/pull/187].
# {-}Completely remove metadata topic, and send metadata merged with marker
directly into event topic{-}: see PoC
[PR#196|https://github.com/apache/ignite-extensions/pull/196]
# Any other ways to sync appliers?
Links:
#
[https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration-]
> Reduce binary metadata synchronization time for CDC through Kafka
> -----------------------------------------------------------------
>
> Key: IGNITE-18209
> URL: https://issues.apache.org/jira/browse/IGNITE-18209
> Project: Ignite
> Issue Type: Improvement
> Components: extensions
> Reporter: Ilya Shishkov
> Assignee: Ilya Shishkov
> Priority: Minor
> Labels: IEP-59, ise
>
> Currently, there is a possible bottleneck in metadata synchronized method
> {{KafkaToIgniteMetadataUpdater#updateMetadata}} for cases, when all
> {{KafkaToIgniteCdcStreamerApplier}} instances will try to update metadata at
> the same moment.
> Scenario:
> # {{KafkaToIgniteCdcStreamer}} contains multiple
> {{KafkaToIgniteCdcStreamerApplier}} which shares _single_
> {{{}KafkaToIgniteMetadataUpdater{}}}.
> # All appliers handle corresponding partitions consequently.
> # Insertion of unknown type is performed and leads to type and mapping
> registrations on all nodes.
> # {{META_UPDATE_MARKER}} is sent twice to each Kafka partition of event
> topic {_}from every node{_}: firstly, in case of type mappings updates,
> secondly, in case of binary types update.
> # When first {{KafkaToIgniteCdcStreamerApplier}} meets
> {{META_UPDATE_MARKER}} it calls
> {{KafkaToIgniteMetadataUpdater#updateMetadata}} which in turn calls
> {{{}KafkaConsumer#poll{}}}, which returns immediately [1] when data is
> present in metadata topic. If there are few binary types and mappings to
> update, some {{KafkaToIgniteCdcStreamerApplier}} thread will consume all
> entries from metadata topic.
> # All other threads of all {{KafkaToIgniteCdcStreamerApplier}} will call
> {{KafkaConsumer#poll}} from empty metadata topic, which will remain blocked
> until new data becomes available or request timeout occurs [1].
> # Because of {{synchronized}} access to
> {{KafkaToIgniteMetadataUpdater#updateMetadata}} all threads of all
> {{KafkaToIgniteCdcStreamerApplier}} will form a sequence of calls. Each call
> will block remaining appliers threads for {{kafkaReqTimeout}} period (if
> metadata topic remains empty).
> # The last call, i.e. last Kafka partition polling in this chain will happen
> at least after {{clusterSize x (topicPartitions x 2 - 1) x kafkaReqTimeout}}
> period. For example for default timeout and 16 Kafka partitions _last
> partition will be consumed after 1.5 minutes_ in case of two one-node
> clusters.
> # Amount of threads in {{KafkaToIgniteCdcStreamer}} does not make sence.
> # Data updates are blocked for Kafka partitions with unhandled update
> markers.
> As I understand possible solutions are:
> # Tune in timeouts for {{KafkaConsumer#poll}}:
> [PR#202|https://github.com/apache/ignite-extensions/pull/202]
> # -Eliminate blocking in {{KafkaToIgniteMetadataUpdater#updateMetadata}}
> caused by calling {{KafkaConsumer#poll}} over empty metadata topic. We can
> try to calculate a lag for topic and poll data only when the lag is present.-
> [PR#199|https://github.com/apache/ignite-extensions/pull/199]
> # -Hold information about replicated types or get it from
> {{{}BinaryContext{}}}. Information about type can be sent with
> {{{}META_UPDATE_MARKER{}}}:- see PoC
> [PR#187|https://github.com/apache/ignite-extensions/pull/187].
> # {-}Completely remove metadata topic, and send metadata merged with marker
> directly into event topic{-}: see PoC
> [PR#196|https://github.com/apache/ignite-extensions/pull/196]
> # Any other ways to sync appliers?
> Links:
> #
> [https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration-]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)