[ 
https://issues.apache.org/jira/browse/FLINK-32751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17758986#comment-17758986
 ] 

Matthias Pohl commented on FLINK-32751:
---------------------------------------

There's a race condition between closing the coordinator (which closes the 
socket and set the corresponding field to {{null}}) and handling a request 
where a socket is opened again if the corresponding socket field is null. If 
the close call happens after a new request is submitted but before the null 
check in the request handling happens, a new Socket will be created that's 
never closed but doesn't get any data, anymore. That results in the response 
never returning.

This race condition was also able to happen prior to the changes we introduced 
in the merges documented in [the comment 
above|https://issues.apache.org/jira/browse/FLINK-32751?focusedCommentId=17756703&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17756703].
 I'm still puzzled why this issue starts to appear more frequently now. The 
main difference (besides the log output refactoring) was to change from 
{{shutdown}} to {{shutdownNow}} for the executor service and when the 
{{closeConnection}} call happened (before or after the shutdown). I would have 
expected that we run into this issue more often with the {{closeConnection}} 
call happening before the shutdown of the {{ExecutorService}} (which is how it 
was implemented before the recent changes) because there's a higher chance for 
the socket to be recreated. I still have to investigate that one.

But I'm working on a fix now with more elaborate unit testing of the class.

> DistinctAggregateITCaseBase.testMultiDistinctAggOnDifferentColumn got stuck 
> on AZP
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-32751
>                 URL: https://issues.apache.org/jira/browse/FLINK-32751
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.18.0, 1.16.3, 1.17.2, 1.19.0
>            Reporter: Sergey Nuyanzin
>            Assignee: Matthias Pohl
>            Priority: Blocker
>              Labels: pull-request-available, test-stability
>
> This build hangs 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51955&view=logs&j=ce3801ad-3bd5-5f06-d165-34d37e757d90&t=5e4d9387-1dcc-5885-a901-90469b7e6d2f&l=14399
> {noformat}
> Aug 04 03:03:47 "ForkJoinPool-1-worker-51" #28 daemon prio=5 os_prio=0 
> cpu=49342.66ms elapsed=3079.49s tid=0x00007f67ccdd0000 nid=0x5234 waiting on 
> condition  [0x00007f6791a19000]
> Aug 04 03:03:47    java.lang.Thread.State: WAITING (parking)
> Aug 04 03:03:47       at 
> jdk.internal.misc.Unsafe.park([email protected]/Native Method)
> Aug 04 03:03:47       - parking to wait for  <0x00000000ad3b1fb8> (a 
> java.util.concurrent.CompletableFuture$Signaller)
> Aug 04 03:03:47       at 
> java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
> Aug 04 03:03:47       at 
> java.util.concurrent.CompletableFuture$Signaller.block([email protected]/CompletableFuture.java:1796)
> Aug 04 03:03:47       at 
> java.util.concurrent.ForkJoinPool.managedBlock([email protected]/ForkJoinPool.java:3118)
> Aug 04 03:03:47       at 
> java.util.concurrent.CompletableFuture.waitingGet([email protected]/CompletableFuture.java:1823)
> Aug 04 03:03:47       at 
> java.util.concurrent.CompletableFuture.get([email protected]/CompletableFuture.java:1998)
> Aug 04 03:03:47       at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:171)
> Aug 04 03:03:47       at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:129)
> Aug 04 03:03:47       at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
> Aug 04 03:03:47       at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> Aug 04 03:03:47       at 
> org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
> Aug 04 03:03:47       at 
> java.util.Iterator.forEachRemaining([email protected]/Iterator.java:132)
> Aug 04 03:03:47       at 
> org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:122)
> Aug 04 03:03:47       at 
> org.apache.flink.table.planner.runtime.utils.BatchTestBase.executeQuery(BatchTestBase.scala:309)
> Aug 04 03:03:47       at 
> org.apache.flink.table.planner.runtime.utils.BatchTestBase.check(BatchTestBase.scala:145)
> Aug 04 03:03:47       at 
> org.apache.flink.table.planner.runtime.utils.BatchTestBase.checkResult(BatchTestBase.scala:109)
> Aug 04 03:03:47       at 
> org.apache.flink.table.planner.runtime.batch.sql.agg.DistinctAggregateITCaseBase.testMultiDistinctAggOnDifferentColumn(DistinctAggregateITCaseBase.scala:97)
> ~~
> {noformat}
> it is very likely that it is an old issue
> the similar case was mentioned for 1.11.0 and closed because of lack of 
> occurrences 
> FLINK-16923
> and another similar one FLINK-22100 which was marked as a duplicate of 
> FLINK-21996



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to