[ 
https://issues.apache.org/jira/browse/BEAM-12075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17312586#comment-17312586
 ] 

Tianzi Cai commented on BEAM-12075:
-----------------------------------

Thanks Kenneth. I just tried the second example with `GroupByKey` on 
`DataflowRunner`. I see flattened elements getting printed in the worker logs. 
But I also see a lot of `java.lang.NullPointerException` errors (see below), 
which I never saw with `DirectRunner`. This tells me that the problem is likely 
with `DirectRunner`.  

I set up logging as 
[described|https://cloud.google.com/dataflow/docs/guides/logging#worker_log_message_code_example]
 on the Dataflow website using `org.slf4j.Logger` and 
`org.slf4j.LoggerFactory`. 

 
{code:java}
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: 
java.lang.NullPointerException
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: 
java.lang.NullPointerException at 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:187)
 at 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:108)
 at 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1060)
 at 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:445)
 at 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:130)
 at 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1063)
 at 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:773)
 at 
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:95)
 at 
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
 at 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
 at 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
 at 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:81)
 at 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137)
 at 
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
 at 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
 at 
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212)
 at 
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
 at 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92)
 at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1426)
 at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:163)
 at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1105)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:834)Caused by: 
org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) at 
org.apache.beam.sdk.transforms.FlatMapElements$2$DoFnInvoker.invokeProcessElement(Unknown
 Source) at 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
 at 
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191)
 at 
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:339)
 at 
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
 at 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
 at 
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:185)
 ... 23 moreCaused by: java.lang.NullPointerException at 
org.apache.beam.sdk.transforms.FlatMapElements$2.processElement(FlatMapElements.java:155)
{code}
 

> GroupByKey doesn't seem to work with FixedWindows
> -------------------------------------------------
>
>                 Key: BEAM-12075
>                 URL: https://issues.apache.org/jira/browse/BEAM-12075
>             Project: Beam
>          Issue Type: Bug
>          Components: extensions-java-gcp
>    Affects Versions: 2.28.0
>         Environment: Java 8, 
>            Reporter: Tianzi Cai
>            Priority: P2
>              Labels: Grouping, PubSubIO, Windowing
>
> After applying `FixedWindows` on a streaming source, a `GroupByKey` operation 
> won't emit keyed elements in a window. 
> This example without `GroupByKey` prints all the windowed elements:
>  
> {noformat}
> pipeline
>  .apply("ReadFromPubsub", 
> PubsubIO.readStrings().fromSubscription(subscriptionPath))
>  .apply(Window.into(FixedWindows.of(Duration.standardSeconds(5L))))
>  .apply(WithKeys.of("bobcat"))
>  .apply(MapElements.into(TypeDescriptors.nulls()).via(
>      (KV<String, String> pair) -> {
>          LOG.info("Key: " + pair.getKey() + "\tValue: " + pair.getValue());
>          return null;
>      }
>  ));{noformat}
>  
> This example with `GroupByKey` doesn't emit anything:
>  
> {noformat}
> pipeline
>  .apply("ReadFromPubsub", 
> PubsubIO.readStrings().fromSubscription(subscriptionPath))
>  .apply(Window.into(FixedWindows.of(Duration.standardSeconds(5L))))
>  .apply(WithKeys.of("bobcat"))
>  .apply(GroupByKey.create())
>  .apply(FlatMapElements.into(TypeDescriptors.nulls()).via(
>      (KV<String, Iterable<String>> pair) -> {
>          pair.getValue().forEach(message -> LOG.info("Message: " + message));
>          return null;
>      }
>  ));{noformat}
>  
> I'm using DirectRunner. The same logic works for Python using both the 
> DirectRunner and DataflowRunner.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to