[ 
https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=325077&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-325077
 ]

ASF GitHub Bot logged work on BEAM-5690:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Oct/19 13:51
            Start Date: 08/Oct/19 13:51
    Worklog Time Spent: 10m 
      Work Description: echauchot commented on pull request #9567: [BEAM-5690] 
Fix Zero value issue with GroupByKey/CountByKey in SparkRunner
URL: https://github.com/apache/beam/pull/9567#discussion_r332522630
 
 

 ##########
 File path: 
runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
 ##########
 @@ -451,6 +457,47 @@ public void 
testAdvanceWatermarkEqualToPositiveInfinityThrows() {
     source.advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE);
   }
 
+  @Test
+  public void testInStreamingModeCountByKey() throws Exception {
+    Instant instant = new Instant(0);
+
+    CreateStream<KV<Integer, Long>> kvSource =
+        CreateStream.of(KvCoder.of(VarIntCoder.of(), VarLongCoder.of()), 
batchDuration())
+            .emptyBatch()
+            .advanceWatermarkForNextBatch(instant)
+            .nextBatch(
+                TimestampedValue.of(KV.of(1, 100L), 
instant.plus(Duration.standardSeconds(3L))),
+                TimestampedValue.of(KV.of(1, 300L), 
instant.plus(Duration.standardSeconds(4L))))
+            
.advanceWatermarkForNextBatch(instant.plus(Duration.standardSeconds(7L)))
+            .nextBatch(
+                TimestampedValue.of(KV.of(1, 400L), 
instant.plus(Duration.standardSeconds(8L))))
+            .advanceNextBatchWatermarkToInfinity();
+
+    PCollection<KV<Integer, Long>> output =
+        p.apply("create kv Source", kvSource)
+            .apply(
+                "window input",
+                Window.<KV<Integer, 
Long>>into(FixedWindows.of(Duration.standardSeconds(3L)))
+                    .withAllowedLateness(Duration.ZERO))
+            .apply(Count.perKey());
 
 Review comment:
   you're lucky :) , Count is implemented in the sdk as Combine.PerKey which is 
translated in the current spark runner as GBK + Pardo and GBK's translation 
calls `SparkGroupAlsoByWindowViaWindowSet` which contains your fix. So your fix 
is called even if you call Count and not GBK. If the runner translated 
Combine.perKey directly (as the new spark runner) you would not be testing your 
fix.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 325077)

> Issue with GroupByKey in BeamSql using SparkRunner
> --------------------------------------------------
>
>                 Key: BEAM-5690
>                 URL: https://issues.apache.org/jira/browse/BEAM-5690
>             Project: Beam
>          Issue Type: Task
>          Components: runner-spark
>            Reporter: Kenneth Knowles
>            Priority: Major
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Reported on user@
> {quote}We are trying to setup a pipeline with using BeamSql and the trigger 
> used is default (AfterWatermark crosses the window). 
> Below is the pipeline:
>   
>    KafkaSource (KafkaIO) 
>        ---> Windowing (FixedWindow 1min)
>        ---> BeamSql
>        ---> KafkaSink (KafkaIO)
>                          
> We are using Spark Runner for this. 
> The BeamSql query is:
> {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code}
> We are grouping by Col3 which is a string. It can hold values string[0-9]. 
>              
> The records are getting emitted out at 1 min to kafka sink, but the output 
> record in kafka is not as expected.
> Below is the output observed: (WST and WET are indicators for window start 
> time and window end time)
> {code}
> {"count_col1":1,"Col3":"string5","WST":"2018-10-09  09-55-00 0000  
> +0000","WET":"2018-10-09  09-56-00 0000  +0000"}
> {"count_col1":3,"Col3":"string7","WST":"2018-10-09  09-55-00 0000  
> +0000","WET":"2018-10-09  09-56-00 0000  +0000"}
> {"count_col1":2,"Col3":"string8","WST":"2018-10-09  09-55-00 0000  
> +0000","WET":"2018-10-09  09-56-00 0000  +0000"}
> {"count_col1":1,"Col3":"string2","WST":"2018-10-09  09-55-00 0000  
> +0000","WET":"2018-10-09  09-56-00 0000  +0000"}
> {"count_col1":1,"Col3":"string6","WST":"2018-10-09  09-55-00 0000  
> +0000","WET":"2018-10-09  09-56-00 0000  +0000"}
> {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00 0000  
> +0000","WET":"2018-10-09  09-56-00 0000  +0000"}
> {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00 0000  
> +0000","WET":"2018-10-09  09-56-00 0000  +0000"}
> {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00 0000  
> +0000","WET":"2018-10-09  09-56-00 0000  +0000"}
> {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00 0000  
> +0000","WET":"2018-10-09  09-56-00 0000  +0000"}
> {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00 0000  
> +0000","WET":"2018-10-09  09-56-00 0000  +0000"}
> {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00 0000  
> +0000","WET":"2018-10-09  09-56-00 0}
> {code}
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to