[
https://issues.apache.org/jira/browse/IGNITE-16176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17495565#comment-17495565
]
Mikhail Petrov commented on IGNITE-16176:
-----------------------------------------
TC Run-All extensions -
https://ci.ignite.apache.org/project.html?projectId=IgniteExtensions&branch_IgniteExtensions=pull%2F99%2F
> Configurable request timeouts in KafkaToIgniteCdcStreamerApplier and
> IgniteToKafkaCdcStreamer
> ---------------------------------------------------------------------------------------------
>
> Key: IGNITE-16176
> URL: https://issues.apache.org/jira/browse/IGNITE-16176
> Project: Ignite
> Issue Type: Improvement
> Reporter: Ilya Shishkov
> Assignee: Mikhail Petrov
> Priority: Minor
> Labels: IEP-59, ise
>
> Now KafkaToIgniteCdcStreamerApplier[1] and IgniteToKafkaCdcStreamer[2]
> perform requests with a hard-coded timeout equal to {{DFLT_REQ_TIMEOUT}}:
> {code:title=KafkaToIgniteCdcStreamerApplier}
> /** */
> public static final int DFLT_REQ_TIMEOUT = 3;
> ...
> private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws
> IgniteCheckedException {
> ConsumerRecords<Integer, byte[]> recs =
> cnsmr.poll(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
> if (log.isDebugEnabled()) {
> log.debug(
> "Polled from consumer [assignments=" + cnsmr.assignment() +
> ",rcvdEvts=" + rcvdEvts.addAndGet(recs.count()) + ']'
> );
> }
> apply(F.iterator(recs, this::deserialize, true, rec ->
> F.isEmpty(caches) || caches.contains(rec.key())));
> cnsmr.commitSync(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
> }
> {code}
> {code:title=IgniteToKafkaCdcStreamer}
> /** Default kafka request timeout in seconds. */
> public static final int DFLT_REQ_TIMEOUT = 5;
>
> ...
> @Override public boolean onEvents(Iterator<CdcEvent> evts) {
> List<Future<RecordMetadata>> futs = new ArrayList<>();
> ...
> if (!futs.isEmpty()) {
> try {
> for (Future<RecordMetadata> fut : futs)
> fut.get(DFLT_REQ_TIMEOUT, TimeUnit.SECONDS);
> msgsSnt.add(futs.size());
> lastMsgTs.value(System.currentTimeMillis());
> }
> {code}
> We should have configurable timeout for requests to the Kafka.
> #
> https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java#L203
> #
> https://github.com/apache/ignite-extensions/blob/master/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java#L197
--
This message was sent by Atlassian Jira
(v8.20.1#820001)