[ 
https://issues.apache.org/jira/browse/BEAM-5211?focusedWorklogId=137658&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137658
 ]

ASF GitHub Bot logged work on BEAM-5211:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Aug/18 04:01
            Start Date: 24/Aug/18 04:01
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #6271: [BEAM-5211] Flink 
Streaming ExecutableStage operator chain blocks grpc receiver threads
URL: https://github.com/apache/beam/pull/6271
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index d63a3cb117e..c0e3858658a 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -23,7 +23,7 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import javax.annotation.concurrent.GuardedBy;
+import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
@@ -39,9 +39,9 @@
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -73,6 +73,7 @@
   private transient StateRequestHandler stateRequestHandler;
   private transient BundleProgressHandler progressHandler;
   private transient StageBundleFactory stageBundleFactory;
+  private transient LinkedBlockingQueue<KV<String, OutputT>> outputQueue;
 
   public ExecutableStageDoFnOperator(
       String stepName,
@@ -127,6 +128,7 @@ public void open() throws Exception {
     stateRequestHandler = stageContext.getStateRequestHandler(executableStage, 
getRuntimeContext());
     stageBundleFactory = stageContext.getStageBundleFactory(executableStage);
     progressHandler = BundleProgressHandler.unsupported();
+    outputQueue = new LinkedBlockingQueue<>();
   }
 
   // TODO: currently assumes that every element is a separate bundle,
@@ -137,12 +139,29 @@ private void 
processElementWithSdkHarness(WindowedValue<InputT> element) throws
     checkState(
         stateRequestHandler != null, "%s not yet prepared", 
StateRequestHandler.class.getName());
 
+    OutputReceiverFactory receiverFactory =
+        new OutputReceiverFactory() {
+          @Override
+          public FnDataReceiver<OutputT> create(String pCollectionId) {
+            return (receivedElement) -> {
+              // handover to queue, do not block the grpc thread
+              outputQueue.put(KV.of(pCollectionId, receivedElement));
+            };
+          }
+        };
+
     try (RemoteBundle bundle =
-        stageBundleFactory.getBundle(
-            new ReceiverFactory(outputManager, outputMap), 
stateRequestHandler, progressHandler)) {
+        stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, 
progressHandler)) {
       logger.debug(String.format("Sending value: %s", element));
       // TODO(BEAM-4681): Add support to Flink to support portable timers.
       
Iterables.getOnlyElement(bundle.getInputReceivers().values()).accept(element);
+      // TODO: it would be nice to emit results as they arrive, can thread 
wait non-blocking?
+    }
+
+    // RemoteBundle close blocks until all results are received
+    KV<String, OutputT> result;
+    while ((result = outputQueue.poll()) != null) {
+      outputManager.output(outputMap.get(result.getKey()), (WindowedValue) 
result.getValue());
     }
   }
 
@@ -192,32 +211,4 @@ public void finishBundle() {}
     @ProcessElement
     public void doNothing(ProcessContext context) {}
   }
-
-  /**
-   * Receiver factory that wraps outgoing elements with the corresponding 
union tag for a
-   * multiplexed PCollection.
-   */
-  private static class ReceiverFactory implements OutputReceiverFactory {
-
-    private final Object collectorLock = new Object();
-
-    @GuardedBy("collectorLock")
-    private final BufferedOutputManager<RawUnionValue> collector;
-
-    private final Map<String, TupleTag<?>> outputMap;
-
-    ReceiverFactory(BufferedOutputManager collector, Map<String, TupleTag<?>> 
outputMap) {
-      this.collector = collector;
-      this.outputMap = outputMap;
-    }
-
-    @Override
-    public <OutputT> FnDataReceiver<OutputT> create(String collectionId) {
-      return (receivedElement) -> {
-        synchronized (collectorLock) {
-          collector.output(outputMap.get(collectionId), (WindowedValue) 
receivedElement);
-        }
-      };
-    }
-  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 137658)
    Time Spent: 1h 40m  (was: 1.5h)

> Flink Streaming ExecutableStage operator chain blocks grpc receiver threads 
> ----------------------------------------------------------------------------
>
>                 Key: BEAM-5211
>                 URL: https://issues.apache.org/jira/browse/BEAM-5211
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.6.0
>            Reporter: Thomas Weise
>            Assignee: Thomas Weise
>            Priority: Major
>              Labels: portability
>             Fix For: 2.7.0
>
>         Attachments: jstack.log
>
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> The operator attempts to emit results as they are received, within the grpc 
> thread, while the subtask thread is waiting for bundle completion. This leads 
> to blocking of grpc threads and eventually deadlock when multiple stages are 
> within an operator chain. Observed with wordcount, see attached stacktrace.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to