[ 
https://issues.apache.org/jira/browse/FLINK-32751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752017#comment-17752017
 ] 

Matthias Pohl edited comment on FLINK-32751 at 8/8/23 12:15 PM:
----------------------------------------------------------------

[~chesnay] had another guess: {{CollectResultFetcher.sendRequest}} (which is 
the call that doesn't get back even while the cluster is shutdown) triggers the
 * {{Dispatcher#deliverCoordinationRequestToCoordinator}}
 ** JobMaster#deliverCoordinationRequestToCoordinator
 *** JobMaster#sendRequestToCoordinator
 **** SchedulerNG#deliverCoordinationRequestToCoordinator
 ***** OperatorCoordinatorHandler#deliverCoordinationRequestToCoordinator
 ****** CollectSinkOperatorCoordinator#handleCoordinationRequest

The latter one submits the request handling to an {{executorService}} which is 
owned by the coordinator. This {{executorService}} is shut down as part of the 
coordinator's close mechanism using the {{shutdown}} call. Any already 
submitted task is not stopped but continues to operator while the MiniCluster 
shutdown is happening (where the RPC system is shut down as well in the end). 
There is some chance that the task got executed but the RPC system is shut down 
in the mean time. Any completion of future within 
{{CollectSinkOperatorCoordinator#handleRequestImpl}} might be lost when the RPC 
system is shut down.

I'm gonna go ahead and try to come up with a test scenario for this issue. If 
we conclude that this reasoning makes sense, it would mean that Jira issue 
isn't blocking the 1.18 release. That problem exists also in older versions of 
Flink.


was (Author: mapohl):
[~chesnay] had another guess: {{CollectResultFetcher.sendRequest}} (which is 
the call that doesn't get back even while the cluster is shutdown) triggers the 
{{Dispatcher#deliverCoordinationRequestToCoordinator > 
JobMaster#deliverCoordinationRequestToCoordinator > 
JobMaster#sendRequestToCoordinator > 
SchedulerNG#deliverCoordinationRequestToCoordinator > 
OperatorCoordinatorHandler#deliverCoordinationRequestToCoordinator > 
CollectSinkOperatorCoordinator#handleCoordinationRequest}}. The latter one 
submits the request handling to an {{executorService}} which is owned by the 
coordinator. This {{executorService}} is shut down as part of the coordinator's 
close mechanism using the {{shutdown}} call. Any already submitted task is not 
stopped but continues to operator while the MiniCluster shutdown is happening 
(where the RPC system is shut down as well in the end). There is some chance 
that the task got executed but the RPC system is shut down in the mean time. 
Any completion of future within 
{{CollectSinkOperatorCoordinator#handleRequestImpl}} might be lost when the RPC 
system is shut down.

I'm gonna go ahead and try to come up with a test scenario for this issue. If 
we conclude that this reasoning makes sense, it would mean that Jira issue 
isn't blocking the 1.18 release. That problem exists also in older versions of 
Flink.

> DistinctAggregateITCaseBase.testMultiDistinctAggOnDifferentColumn got stuck 
> on AZP
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-32751
>                 URL: https://issues.apache.org/jira/browse/FLINK-32751
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.18.0
>            Reporter: Sergey Nuyanzin
>            Priority: Critical
>              Labels: test-stability
>
> This build hangs 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51955&view=logs&j=ce3801ad-3bd5-5f06-d165-34d37e757d90&t=5e4d9387-1dcc-5885-a901-90469b7e6d2f&l=14399
> {noformat}
> Aug 04 03:03:47 "ForkJoinPool-1-worker-51" #28 daemon prio=5 os_prio=0 
> cpu=49342.66ms elapsed=3079.49s tid=0x00007f67ccdd0000 nid=0x5234 waiting on 
> condition  [0x00007f6791a19000]
> Aug 04 03:03:47    java.lang.Thread.State: WAITING (parking)
> Aug 04 03:03:47       at 
> jdk.internal.misc.Unsafe.park([email protected]/Native Method)
> Aug 04 03:03:47       - parking to wait for  <0x00000000ad3b1fb8> (a 
> java.util.concurrent.CompletableFuture$Signaller)
> Aug 04 03:03:47       at 
> java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
> Aug 04 03:03:47       at 
> java.util.concurrent.CompletableFuture$Signaller.block([email protected]/CompletableFuture.java:1796)
> Aug 04 03:03:47       at 
> java.util.concurrent.ForkJoinPool.managedBlock([email protected]/ForkJoinPool.java:3118)
> Aug 04 03:03:47       at 
> java.util.concurrent.CompletableFuture.waitingGet([email protected]/CompletableFuture.java:1823)
> Aug 04 03:03:47       at 
> java.util.concurrent.CompletableFuture.get([email protected]/CompletableFuture.java:1998)
> Aug 04 03:03:47       at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:171)
> Aug 04 03:03:47       at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:129)
> Aug 04 03:03:47       at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
> Aug 04 03:03:47       at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> Aug 04 03:03:47       at 
> org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
> Aug 04 03:03:47       at 
> java.util.Iterator.forEachRemaining([email protected]/Iterator.java:132)
> Aug 04 03:03:47       at 
> org.apache.flink.util.CollectionUtil.iteratorToList(CollectionUtil.java:122)
> Aug 04 03:03:47       at 
> org.apache.flink.table.planner.runtime.utils.BatchTestBase.executeQuery(BatchTestBase.scala:309)
> Aug 04 03:03:47       at 
> org.apache.flink.table.planner.runtime.utils.BatchTestBase.check(BatchTestBase.scala:145)
> Aug 04 03:03:47       at 
> org.apache.flink.table.planner.runtime.utils.BatchTestBase.checkResult(BatchTestBase.scala:109)
> Aug 04 03:03:47       at 
> org.apache.flink.table.planner.runtime.batch.sql.agg.DistinctAggregateITCaseBase.testMultiDistinctAggOnDifferentColumn(DistinctAggregateITCaseBase.scala:97)
> ~~
> {noformat}
> it is very likely that it is an old issue
> the similar case was mentioned for 1.11.0 and closed because of lack of 
> occurrences 
> FLINK-16923
> and another similar one FLINK-22100 which was marked as a duplicate of 
> FLINK-21996



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to