[ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113270&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113270
 ]

ASF GitHub Bot logged work on BEAM-4194:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Jun/18 17:28
            Start Date: 19/Jun/18 17:28
    Worklog Time Spent: 10m 
      Work Description: kennknowles commented on a change in pull request 
#5682: [BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196512057
 
 

 ##########
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##########
 @@ -127,6 +194,66 @@ private static PipelineResult run(
     return Linq4j.asEnumerable(values);
   }
 
+  private static Enumerable<Object> limitCollect(PipelineOptions options, 
BeamRelNode node) {
+    long id = options.getOptionsId();
+    Queue<Object> values = new ConcurrentLinkedQueue<Object>();
+
+    checkArgument(
+        options
+            .getRunner()
+            .getCanonicalName()
+            .equals("org.apache.beam.runners.direct.DirectRunner"));
+
+    LimitStateWrapper stateWrapper = new LimitStateWrapper();
+    LimitCounter.globalValues.put(id, values);
+    LimitCounter.globalLimitArguments.put(id, getLimitCount(node));
+    LimitCounter.globalStates.put(id, stateWrapper);
+    limitRun(options, node, new LimitCounter(), stateWrapper);
+    LimitCounter.globalValues.remove(id);
+    LimitCounter.globalLimitArguments.remove(id);
+    LimitCounter.globalStates.remove(id);
+
+    return Linq4j.asEnumerable(values);
+  }
+
+  private static class LimitCounter extends DoFn<Row, Void> {
+    private static final Map<Long, Integer> globalLimitArguments =
 
 Review comment:
   In the `DirectRunner` this is OK. I think you could pretty easily make it a 
bit more general by using stateful ParDo(DoFn) for counting. Then the only 
thing left to make it work on a distributed runner is a communication path back 
to the shell. If you leave it like this, I think the `DoFn` also is not ready, 
because in a distributed run there will be lots of copies and separate JVMs on 
different workers, not gathered to one key, etc.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 113270)

> [SQL] Support LIMIT on Unbounded Data
> -------------------------------------
>
>                 Key: BEAM-4194
>                 URL: https://issues.apache.org/jira/browse/BEAM-4194
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>            Reporter: Anton Kedin
>            Assignee: Rui Wang
>            Priority: Major
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to