[ 
https://issues.apache.org/jira/browse/BEAM-10529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17520969#comment-17520969
 ] 

matthias hiltpold commented on BEAM-10529:
------------------------------------------

Here is the full Error. Currently I'm running it on DirectRunner, as I had no 
time yet to dockerize the source and deploy to DataFlow.
{quote}message: "Exception while trying to handle InstructionRequest bundle_3"
trace: "org.apache.beam.sdk.util.UserCodeException: 
java.lang.IllegalArgumentException: Unable to encode element 
\'org.apache.beam.sdk.io.kafka.KafkaIO$ByteArrayKafkaRecord@9318466\' with 
coder \'SchemaCoder<Schema: Fields:\nField\{name=topic, description=, 
type=STRING NOT NULL, options={{}}}\nField\{name=partition, description=, 
type=INT32 NOT NULL, options={{}}}\nField\{name=offset, description=, 
type=INT64 NOT NULL, options={{}}}\nField\{name=timestamp, description=, 
type=INT64 NOT NULL, options={{}}}\nField\{name=key, description=, type=BYTES 
NOT NULL, options={{}}}\nField\{name=value, description=, type=BYTES NOT NULL, 
options={{}}}\nField\{name=headers, description=, type=ARRAY<ROW<key STRING NOT 
NULL, value BYTES NOT NULL> NOT NULL> NOT NULL, 
options={{}}}\nField\{name=timestampTypeId, description=, type=INT32 NOT NULL, 
options={{}}}\nField\{name=timestampTypeName, description=, type=STRING NOT 
NULL, options={{}}}\nEncoding positions:\n\{headers=6, timestampTypeName=8, 
partition=1, offset=2, topic=0, value=5, key=4, timestamp=3, 
timestampTypeId=7}\nOptions:\{{}}UUID: 99952ae5-e379-4f4c-adfa-5052a8d4757d  
UUID: 99952ae5-e379-4f4c-adfa-5052a8d4757d delegateCoder: 
org.apache.beam.sdk.coders.Coder$ByteBuddy$J59aZwdI@33b76b03\'.\n\tat 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1763)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)\n\tat
 
org.apache.beam.sdk.io.kafka.KafkaIO$RowsWithMetadata$1.processElement(KafkaIO.java:1768)\n\tat
 
org.apache.beam.sdk.io.kafka.KafkaIO$RowsWithMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)\n\tat 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)\n\tat
 
org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:139)\n\tat
 
org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)\n\tat 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)\n\tat
 
org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$ReadFn.process(BoundedReadFromUnboundedSource.java:206)\n\tat
 
org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$ReadFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)\n\tat 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)\n\tat
 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)\n\tat
 
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)\n\tat
 
org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)\n\tat 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)\n\tat
 
org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1.processElement(Reify.java:206)\n\tat
 
org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)\n\tat 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)\n\tat
 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)\n\tat
 
org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1.process(ReifyTimestamps.java:70)\n\tat
 
org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)\n\tat 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)\n\tat
 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)\n\tat
 
org.apache.beam.sdk.transforms.Reshuffle$1.processElement(Reshuffle.java:103)\n\tat
 
org.apache.beam.sdk.transforms.Reshuffle$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)\n\tat 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
 
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:179)\n\tat
 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)\n\tat
 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)\n\tat
 
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:528)\n\tat
 
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)\n\tat
 
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)\n\tat
 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat
 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat
 java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: 
java.lang.IllegalArgumentException: Unable to encode element 
\'org.apache.beam.sdk.io.kafka.KafkaIO$ByteArrayKafkaRecord@9318466\' with 
coder \'SchemaCoder<Schema: Fields:\nField\{name=topic, description=, 
type=STRING NOT NULL, options={{}}}\nField\{name=partition, description=, 
type=INT32 NOT NULL, options={{}}}\nField\{name=offset, description=, 
type=INT64 NOT NULL, options={{}}}\nField\{name=timestamp, description=, 
type=INT64 NOT NULL, options={{}}}\nField\{name=key, description=, type=BYTES 
NOT NULL, options={{}}}\nField\{name=value, description=, type=BYTES NOT NULL, 
options={{}}}\nField\{name=headers, description=, type=ARRAY<ROW<key STRING NOT 
NULL, value BYTES NOT NULL> NOT NULL> NOT NULL, 
options={{}}}\nField\{name=timestampTypeId, description=, type=INT32 NOT NULL, 
options={{}}}\nField\{name=timestampTypeName, description=, type=STRING NOT 
NULL, options={{}}}\nEncoding positions:\n\{headers=6, timestampTypeName=8, 
partition=1, offset=2, topic=0, value=5, key=4, timestamp=3, 
timestampTypeId=7}\nOptions:\{{}}UUID: 99952ae5-e379-4f4c-adfa-5052a8d4757d  
UUID: 99952ae5-e379-4f4c-adfa-5052a8d4757d delegateCoder: 
org.apache.beam.sdk.coders.Coder$ByteBuddy$J59aZwdI@33b76b03\'.\n\tat 
org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)\n\tat
 
org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)\n\tat 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:374)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:248)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\nCaused
 by: java.lang.RuntimeException: Null value set on non-nullable field 
Field\{name=value, description=, type=BYTES NOT NULL, options={{}}}\n\tat 
org.apache.beam.sdk.values.RowWithGetters.getValue(RowWithGetters.java:76)\n\tat
 
org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate(RowCoderGenerator.java:341)\n\tat
 org.apache.beam.sdk.coders.Coder$ByteBuddy$J59aZwdI.encode(Unknown 
Source)\n\tat 
org.apache.beam.sdk.coders.Coder$ByteBuddy$J59aZwdI.encode(Unknown 
Source)\n\tat 
org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:124)\n\tat 
org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)\n\tat
 
org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)\n\tat 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:374)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:248)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)\n\tat
 
org.apache.beam.sdk.io.kafka.KafkaIO$RowsWithMetadata$1.processElement(KafkaIO.java:1768)\n\tat
 
org.apache.beam.sdk.io.kafka.KafkaIO$RowsWithMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)\n\tat 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)\n\tat
 
org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:139)\n\tat
 
org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)\n\tat 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)\n\tat
 
org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$ReadFn.process(BoundedReadFromUnboundedSource.java:206)\n\tat
 
org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$ReadFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)\n\tat 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)\n\tat
 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)\n\tat
 
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)\n\tat
 
org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)\n\tat 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)\n\tat
 
org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1.processElement(Reify.java:206)\n\tat
 
org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)\n\tat 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)\n\tat
 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)\n\tat
 
org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1.process(ReifyTimestamps.java:70)\n\tat
 
org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)\n\tat 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
 
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)\n\tat
 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)\n\tat
 
org.apache.beam.sdk.transforms.Reshuffle$1.processElement(Reshuffle.java:103)\n\tat
 
org.apache.beam.sdk.transforms.Reshuffle$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)\n\tat 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
 
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:179)\n\tat
 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)\n\tat
 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)\n\tat
 
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:528)\n\tat
 
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)\n\tat
 
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)\n\tat
 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat
 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat
 java.base/java.lang.Thread.run(Thread.java:829)\n"
instruction_id: "bundle_3"
log_location: "org.apache.beam.fn.harness.control.BeamFnControlClient"
thread: "23"

E0412 09:45:46.756133600   24697 fork_posix.cc:70]           Fork support is 
only compatible with the epoll1 and poll polling strategies
E0412 09:45:47.385400200   24450 fork_posix.cc:70]           Fork support is 
only compatible with the epoll1 and poll polling strategies
dd7266e1dd24cf4a016c10a7effc1c50868af5bdb7e8af945ad815a37f410bad
Traceback (most recent call last):
  File "main.py", line 133, in <module>
    run(
  File "/home/<path>beam-pipeline/kafka2gcp/pipeline.py", line 170, in run
    (
  File 
"/home/<path>beam-pipeline/external/beam/sdks/python/apache_beam/pipeline.py", 
line 596, in __exit__
    self.result = self.run()
  File 
"/home/<path>beam-pipeline/external/beam/sdks/python/apache_beam/pipeline.py", 
line 573, in run
    return self.runner.run_pipeline(self, self._options)
  File 
"/home/<path>beam-pipeline/external/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
 line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File 
"/home/<path>beam-pipeline/external/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 199, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File 
"/home/<path>beam-pipeline/external/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 208, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File 
"/home/<path>beam-pipeline/external/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 408, in run_stages
    bundle_results = self._execute_bundle(
  File 
"/home/<path>beam-pipeline/external/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 736, in _execute_bundle
    self._run_bundle(
  File 
"/home/<path>beam-pipeline/external/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 966, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File 
"/home/<path>beam-pipeline/external/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 1315, in process_bundle
    raise RuntimeError(result.error)
RuntimeError: org.apache.beam.sdk.util.UserCodeException: 
java.lang.IllegalArgumentException: Unable to encode element 
'org.apache.beam.sdk.io.kafka.KafkaIO$ByteArrayKafkaRecord@9318466' with coder 
'SchemaCoder<Schema: Fields:
Field\{name=topic, description=, type=STRING NOT NULL, options={{}}}
Field\{name=partition, description=, type=INT32 NOT NULL, options={{}}}
Field\{name=offset, description=, type=INT64 NOT NULL, options={{}}}
Field\{name=timestamp, description=, type=INT64 NOT NULL, options={{}}}
Field\{name=key, description=, type=BYTES NOT NULL, options={{}}}
Field\{name=value, description=, type=BYTES NOT NULL, options={{}}}
Field\{name=headers, description=, type=ARRAY<ROW<key STRING NOT NULL, value 
BYTES NOT NULL> NOT NULL> NOT NULL, options={{}}}
Field\{name=timestampTypeId, description=, type=INT32 NOT NULL, options={{}}}
Field\{name=timestampTypeName, description=, type=STRING NOT NULL, options={{}}}
Encoding positions:
{headers=6, timestampTypeName=8, partition=1, offset=2, topic=0, value=5, 
key=4, timestamp=3, timestampTypeId=7}
Options:\{{}}UUID: 99952ae5-e379-4f4c-adfa-5052a8d4757d  UUID: 
99952ae5-e379-4f4c-adfa-5052a8d4757d delegateCoder: 
org.apache.beam.sdk.coders.Coder$ByteBuddy$J59aZwdI@33b76b03'.
        at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1763)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)
        at 
org.apache.beam.sdk.io.kafka.KafkaIO$RowsWithMetadata$1.processElement(KafkaIO.java:1768)
        at 
org.apache.beam.sdk.io.kafka.KafkaIO$RowsWithMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)
        at 
org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:139)
        at 
org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
        at 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)
        at 
org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$ReadFn.process(BoundedReadFromUnboundedSource.java:206)
        at 
org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$ReadFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)
        at 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
        at 
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)
        at 
org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
        at 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)
        at 
org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1.processElement(Reify.java:206)
        at 
org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)
        at 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
        at 
org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1.process(ReifyTimestamps.java:70)
        at 
org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)
        at 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
        at 
org.apache.beam.sdk.transforms.Reshuffle$1.processElement(Reshuffle.java:103)
        at 
org.apache.beam.sdk.transforms.Reshuffle$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
        at 
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:179)
        at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
        at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
        at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:528)
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: Unable to encode element 
'org.apache.beam.sdk.io.kafka.KafkaIO$ByteArrayKafkaRecord@9318466' with coder 
'SchemaCoder<Schema: Fields:
Field\{name=topic, description=, type=STRING NOT NULL, options={{}}}
Field\{name=partition, description=, type=INT32 NOT NULL, options={{}}}
Field\{name=offset, description=, type=INT64 NOT NULL, options={{}}}
Field\{name=timestamp, description=, type=INT64 NOT NULL, options={{}}}
Field\{name=key, description=, type=BYTES NOT NULL, options={{}}}
Field\{name=value, description=, type=BYTES NOT NULL, options={{}}}
Field\{name=headers, description=, type=ARRAY<ROW<key STRING NOT NULL, value 
BYTES NOT NULL> NOT NULL> NOT NULL, options={{}}}
Field\{name=timestampTypeId, description=, type=INT32 NOT NULL, options={{}}}
Field\{name=timestampTypeName, description=, type=STRING NOT NULL, options={{}}}
Encoding positions:
{headers=6, timestampTypeName=8, partition=1, offset=2, topic=0, value=5, 
key=4, timestamp=3, timestampTypeId=7}
Options:\{{}}UUID: 99952ae5-e379-4f4c-adfa-5052a8d4757d  UUID: 
99952ae5-e379-4f4c-adfa-5052a8d4757d delegateCoder: 
org.apache.beam.sdk.coders.Coder$ByteBuddy$J59aZwdI@33b76b03'.
        at 
org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
        at 
org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:374)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:248)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
Caused by: java.lang.RuntimeException: Null value set on non-nullable field 
Field\{name=value, description=, type=BYTES NOT NULL, options={{}}}
        at 
org.apache.beam.sdk.values.RowWithGetters.getValue(RowWithGetters.java:76)
        at 
org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate(RowCoderGenerator.java:341)
        at org.apache.beam.sdk.coders.Coder$ByteBuddy$J59aZwdI.encode(Unknown 
Source)
        at org.apache.beam.sdk.coders.Coder$ByteBuddy$J59aZwdI.encode(Unknown 
Source)
        at org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:124)
        at 
org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)
        at 
org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:374)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:248)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)
        at 
org.apache.beam.sdk.io.kafka.KafkaIO$RowsWithMetadata$1.processElement(KafkaIO.java:1768)
        at 
org.apache.beam.sdk.io.kafka.KafkaIO$RowsWithMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)
        at 
org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:139)
        at 
org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
        at 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)
        at 
org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$ReadFn.process(BoundedReadFromUnboundedSource.java:206)
        at 
org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$ReadFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)
        at 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
        at 
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)
        at 
org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
        at 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)
        at 
org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1.processElement(Reify.java:206)
        at 
org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)
        at 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
        at 
org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1.process(ReifyTimestamps.java:70)
        at 
org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)
        at 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
        at 
org.apache.beam.sdk.transforms.Reshuffle$1.processElement(Reshuffle.java:103)
        at 
org.apache.beam.sdk.transforms.Reshuffle$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        at 
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
        at 
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
        at 
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:179)
        at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
        at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
        at 
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:528)
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
        at 
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
{quote}

> Kafka XLang fails for ?empty? key/values
> ----------------------------------------
>
>                 Key: BEAM-10529
>                 URL: https://issues.apache.org/jira/browse/BEAM-10529
>             Project: Beam
>          Issue Type: Bug
>          Components: cross-language, io-java-kafka
>            Reporter: Luke Cwik
>            Assignee: John Casey
>            Priority: P1
>             Fix For: 2.38.0
>
>          Time Spent: 25h 20m
>  Remaining Estimate: 0h
>
> It looks like the Javadoc for ByteArrayDeserializer and StringDeserializer 
> can return null[1, 2] and we aren't using 
> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that KafkaIO 
> does this correctly in its regular coder inference logic[4].
> 1: 
> [https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-|https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-2:]
> [2:|https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-2:]
>  
> [https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-]
> 3: 
> [https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478]
> 4: 
> [https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to