[ 
https://issues.apache.org/jira/browse/BEAM-6612?focusedWorklogId=196073&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-196073
 ]

ASF GitHub Bot logged work on BEAM-6612:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Feb/19 01:59
            Start Date: 08/Feb/19 01:59
    Worklog Time Spent: 10m 
      Work Description: ajamato commented on pull request #7764: [BEAM-6612] 
Modify QueueingBeamFnDataClient to become MetricsBeamFnDataClient and make all 
processElement() calls occur in parallel again.
URL: https://github.com/apache/beam/pull/7764#discussion_r254934457
 
 

 ##########
 File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/MetricsBeamFnDataClient.java
 ##########
 @@ -133,50 +126,40 @@ public void drainAndBlock() throws Exception {
   }
 
   /**
-   * The QueueingFnDataReceiver is a a FnDataReceiver used by the 
QueueingBeamFnDataClient.
+   * The MetricsFnDataReceiver is a a FnDataReceiver used by the 
MetricsBeamFnDataClient.
    *
    * <p>All {@link #accept accept()ed} values will be put onto a synchronous 
queue which will cause
-   * the calling thread to block until {@link 
QueueingBeamFnDataClient#drainAndBlock} is called.
-   * {@link QueueingBeamFnDataClient#drainAndBlock} is responsible for 
processing values from the
+   * the calling thread to block until {@link 
MetricsBeamFnDataClient#drainAndBlock} is called.
+   * {@link MetricsBeamFnDataClient#drainAndBlock} is responsible for 
processing values from the
    * queue.
    */
-  public class QueueingFnDataReceiver<T> implements 
FnDataReceiver<WindowedValue<T>> {
+  public class MetricsFnDataReceiver<T> implements 
FnDataReceiver<WindowedValue<T>> {
     private final FnDataReceiver<WindowedValue<T>> consumer;
-    public InboundDataClient inboundDataClient;
 
-    public QueueingFnDataReceiver(FnDataReceiver<WindowedValue<T>> consumer) {
+    public MetricsFnDataReceiver(FnDataReceiver<WindowedValue<T>> consumer) {
       this.consumer = consumer;
     }
 
     /**
      * This method is thread safe, we expect multiple threads to call this, 
passing in data when new
-     * data arrives via the QueueingBeamFnDataClient's mainClient.
+     * data arrives via the MetricsBeamFnDataClient's mainClient.
+     *
+     * <p>This code is invoked CONCURRENTLY, by separate threads.
      */
     @Override
     public void accept(WindowedValue<T> value) throws Exception {
       try {
-        ConsumerAndData offering = new ConsumerAndData(this.consumer, value);
-        while (!queue.offer(offering, 200, TimeUnit.MILLISECONDS)) {
-          if (inboundDataClient.isDone()) {
-            // If it was cancelled by the consuming side of the queue.
-            break;
-          }
+        try (Closeable close = 
MetricsContainerStepMapEnvironment.setupMetricEnvironment()) {
 
 Review comment:
   This is the major change, repurposing this class so that when elements come 
in on the gprc channel, we create a new MetricsContainerStepMap and 
ExecutionStateTracker for each element as a thread local variable. Keep track 
of them in a ConcurrentLinkedQueue and then iterate over all the 
MetricsContainerStepMaps at the end of the bundle and merge them to get the 
MonitoringInfos.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 196073)
            Time Spent: 10m
    Remaining Estimate: 0h

> Remove QueueingBeamFnDataClient
> -------------------------------
>
>                 Key: BEAM-6612
>                 URL: https://issues.apache.org/jira/browse/BEAM-6612
>             Project: Beam
>          Issue Type: New Feature
>          Components: java-fn-execution
>            Reporter: Alex Amato
>            Assignee: Alex Amato
>            Priority: Major
>              Labels: triaged
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Remove QueueingBeamFnDataClient, which made process() calls all run on the 
> same thread.
> [~lcwik] and I came up with this design thinking that it was required to 
> process the bundle in parallel anyways, and we would have good performance. 
> However after speaking to Ken, there is no requirement for a bundle or key to 
> be processed in parallel. Elements are either iterables or single elements 
> which defines the needs for processing a group of elements on the same thread.
> Simply performing this change will lead to the following issues:
> (1) MetricsContainerImpl and MetricsContainer are not thread safe, so when 
> the process() functions enter the metric container context, they will be 
> accessing an thread-unsafe collection in parallel
> (2) An ExecutionStateTracker will be needed in every thread, So we will need 
> to
> create an instance and activate it in every GrpC thread which receives a new 
> element.
> (Will this get sampled properly, since the trackers will be short lived).
> (3) The SimpleExecutionStates being used will need to be thread safe as well? 
> I don't think so, because I don't think that the ExecutionStateSampler 
> invokes them in parallel.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to