huangjianhuang created BEAM-3423:
------------------------------------

             Summary: Distinct.withRepresentativeValueFn with EventTime Trigger 
throws Exceptions 
                 Key: BEAM-3423
                 URL: https://issues.apache.org/jira/browse/BEAM-3423
             Project: Beam
          Issue Type: Bug
          Components: runner-core, runner-direct
    Affects Versions: 2.2.0
         Environment: ubuntu16.04, idea, java8
            Reporter: huangjianhuang
            Assignee: Kenneth Knowles


My code as follow:


{code:java}
pipeline
                //Read data
                .apply("Read from kafka",
                        KafkaIO.<String, String>read()
                                .withBootstrapServers("localhost:9092")
                                .withTopic(topic)
                                .withKeyDeserializer(StringDeserializer.class)
                                .withValueDeserializer(StringDeserializer.class)
                                .withoutMetadata()
                )
                .apply(Window.<KV<String, 
String>>into(FixedWindows.of(Duration.standardSeconds(10)))
                        .triggering(AfterWatermark.pastEndOfWindow()
                                
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))))
                        
.discardingFiredPanes().withAllowedLateness(Duration.ZERO))
                //works fine
//                .apply(Distinct.create())
                //ops! -> CoderException: cannot encode a null KV
                .apply(Distinct.withRepresentativeValueFn(new 
Val()).withRepresentativeType(TypeDescriptors.strings()))
                .apply(MapElements.into(TypeDescriptors.nulls())
                        .via(input -> {
                            System.out.println(Instant.now());
                            System.out.println(input);
                            return null;
                        }));

    private static class Val implements SerializableFunction<KV<String, 
String>, String> {
        @Override
        public String apply(KV<String, String> input) {
            return input.getValue();
        }
    }
{code}

Input words to Kafka:
word1
//after 10s
word2

Then got exceptions as follow:

{code:java}
begin
2018-01-06T11:18:52.971Z
KV{null, a}
Exception in thread "main" 
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot 
encode a null KV
        at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:344)
        at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:314)
        at 
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:208)
        at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:289)
        at com.xiaomi.huyu.processor.dev.EntryPoint.main(EntryPoint.java:37)
Caused by: java.lang.RuntimeException: 
org.apache.beam.sdk.coders.CoderException: cannot encode a null KV
        at 
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:113)
        at 
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235)
        at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
        at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
        at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
        at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
        at 
org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2149)
Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null KV
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:70)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
        at 
org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:93)
        at 
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:77)
        at 
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:62)
        at 
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:106)
        at 
org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44)
        at 
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:111)
        at 
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235)
        at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
        at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
        at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
        at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
        at 
org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2149)
        at 
org.apache.beam.sdk.transforms.Combine$GroupedValues$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
        at 
org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141)
        at 
org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:70)
        at 
org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:182)
        at 
org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:51)
        at 
org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:146)
        at 
org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:110)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

{code}

But if I use .apply(Distinct.create()) , it works fine. 




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to