[
https://issues.apache.org/jira/browse/BEAM-10529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17520969#comment-17520969
]
matthias hiltpold commented on BEAM-10529:
------------------------------------------
Here is the full Error. Currently I'm running it on DirectRunner, as I had no
time yet to dockerize the source and deploy to DataFlow.
{quote}message: "Exception while trying to handle InstructionRequest bundle_3"
trace: "org.apache.beam.sdk.util.UserCodeException:
java.lang.IllegalArgumentException: Unable to encode element
\'org.apache.beam.sdk.io.kafka.KafkaIO$ByteArrayKafkaRecord@9318466\' with
coder \'SchemaCoder<Schema: Fields:\nField\{name=topic, description=,
type=STRING NOT NULL, options={{}}}\nField\{name=partition, description=,
type=INT32 NOT NULL, options={{}}}\nField\{name=offset, description=,
type=INT64 NOT NULL, options={{}}}\nField\{name=timestamp, description=,
type=INT64 NOT NULL, options={{}}}\nField\{name=key, description=, type=BYTES
NOT NULL, options={{}}}\nField\{name=value, description=, type=BYTES NOT NULL,
options={{}}}\nField\{name=headers, description=, type=ARRAY<ROW<key STRING NOT
NULL, value BYTES NOT NULL> NOT NULL> NOT NULL,
options={{}}}\nField\{name=timestampTypeId, description=, type=INT32 NOT NULL,
options={{}}}\nField\{name=timestampTypeName, description=, type=STRING NOT
NULL, options={{}}}\nEncoding positions:\n\{headers=6, timestampTypeName=8,
partition=1, offset=2, topic=0, value=5, key=4, timestamp=3,
timestampTypeId=7}\nOptions:\{{}}UUID: 99952ae5-e379-4f4c-adfa-5052a8d4757d
UUID: 99952ae5-e379-4f4c-adfa-5052a8d4757d delegateCoder:
org.apache.beam.sdk.coders.Coder$ByteBuddy$J59aZwdI@33b76b03\'.\n\tat
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1763)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)\n\tat
org.apache.beam.sdk.io.kafka.KafkaIO$RowsWithMetadata$1.processElement(KafkaIO.java:1768)\n\tat
org.apache.beam.sdk.io.kafka.KafkaIO$RowsWithMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
Source)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)\n\tat
org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:139)\n\tat
org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)\n\tat
org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$ReadFn.process(BoundedReadFromUnboundedSource.java:206)\n\tat
org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$ReadFn$DoFnInvoker.invokeProcessElement(Unknown
Source)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)\n\tat
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)\n\tat
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)\n\tat
org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
Source)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)\n\tat
org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1.processElement(Reify.java:206)\n\tat
org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1$DoFnInvoker.invokeProcessElement(Unknown
Source)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)\n\tat
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)\n\tat
org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1.process(ReifyTimestamps.java:70)\n\tat
org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1$DoFnInvoker.invokeProcessElement(Unknown
Source)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)\n\tat
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)\n\tat
org.apache.beam.sdk.transforms.Reshuffle$1.processElement(Reshuffle.java:103)\n\tat
org.apache.beam.sdk.transforms.Reshuffle$1$DoFnInvoker.invokeProcessElement(Unknown
Source)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:179)\n\tat
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)\n\tat
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)\n\tat
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:528)\n\tat
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)\n\tat
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)\n\tat
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat
java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
java.lang.IllegalArgumentException: Unable to encode element
\'org.apache.beam.sdk.io.kafka.KafkaIO$ByteArrayKafkaRecord@9318466\' with
coder \'SchemaCoder<Schema: Fields:\nField\{name=topic, description=,
type=STRING NOT NULL, options={{}}}\nField\{name=partition, description=,
type=INT32 NOT NULL, options={{}}}\nField\{name=offset, description=,
type=INT64 NOT NULL, options={{}}}\nField\{name=timestamp, description=,
type=INT64 NOT NULL, options={{}}}\nField\{name=key, description=, type=BYTES
NOT NULL, options={{}}}\nField\{name=value, description=, type=BYTES NOT NULL,
options={{}}}\nField\{name=headers, description=, type=ARRAY<ROW<key STRING NOT
NULL, value BYTES NOT NULL> NOT NULL> NOT NULL,
options={{}}}\nField\{name=timestampTypeId, description=, type=INT32 NOT NULL,
options={{}}}\nField\{name=timestampTypeName, description=, type=STRING NOT
NULL, options={{}}}\nEncoding positions:\n\{headers=6, timestampTypeName=8,
partition=1, offset=2, topic=0, value=5, key=4, timestamp=3,
timestampTypeId=7}\nOptions:\{{}}UUID: 99952ae5-e379-4f4c-adfa-5052a8d4757d
UUID: 99952ae5-e379-4f4c-adfa-5052a8d4757d delegateCoder:
org.apache.beam.sdk.coders.Coder$ByteBuddy$J59aZwdI@33b76b03\'.\n\tat
org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)\n\tat
org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:374)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:248)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\nCaused
by: java.lang.RuntimeException: Null value set on non-nullable field
Field\{name=value, description=, type=BYTES NOT NULL, options={{}}}\n\tat
org.apache.beam.sdk.values.RowWithGetters.getValue(RowWithGetters.java:76)\n\tat
org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate(RowCoderGenerator.java:341)\n\tat
org.apache.beam.sdk.coders.Coder$ByteBuddy$J59aZwdI.encode(Unknown
Source)\n\tat
org.apache.beam.sdk.coders.Coder$ByteBuddy$J59aZwdI.encode(Unknown
Source)\n\tat
org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:124)\n\tat
org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)\n\tat
org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:374)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:248)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)\n\tat
org.apache.beam.sdk.io.kafka.KafkaIO$RowsWithMetadata$1.processElement(KafkaIO.java:1768)\n\tat
org.apache.beam.sdk.io.kafka.KafkaIO$RowsWithMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
Source)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)\n\tat
org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:139)\n\tat
org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)\n\tat
org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$ReadFn.process(BoundedReadFromUnboundedSource.java:206)\n\tat
org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$ReadFn$DoFnInvoker.invokeProcessElement(Unknown
Source)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)\n\tat
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)\n\tat
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)\n\tat
org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
Source)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)\n\tat
org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1.processElement(Reify.java:206)\n\tat
org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1$DoFnInvoker.invokeProcessElement(Unknown
Source)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)\n\tat
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)\n\tat
org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1.process(ReifyTimestamps.java:70)\n\tat
org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1$DoFnInvoker.invokeProcessElement(Unknown
Source)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)\n\tat
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)\n\tat
org.apache.beam.sdk.transforms.Reshuffle$1.processElement(Reshuffle.java:103)\n\tat
org.apache.beam.sdk.transforms.Reshuffle$1$DoFnInvoker.invokeProcessElement(Unknown
Source)\n\tat
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)\n\tat
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)\n\tat
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:179)\n\tat
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)\n\tat
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)\n\tat
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:528)\n\tat
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)\n\tat
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)\n\tat
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat
java.base/java.lang.Thread.run(Thread.java:829)\n"
instruction_id: "bundle_3"
log_location: "org.apache.beam.fn.harness.control.BeamFnControlClient"
thread: "23"
E0412 09:45:46.756133600 24697 fork_posix.cc:70] Fork support is
only compatible with the epoll1 and poll polling strategies
E0412 09:45:47.385400200 24450 fork_posix.cc:70] Fork support is
only compatible with the epoll1 and poll polling strategies
dd7266e1dd24cf4a016c10a7effc1c50868af5bdb7e8af945ad815a37f410bad
Traceback (most recent call last):
File "main.py", line 133, in <module>
run(
File "/home/<path>beam-pipeline/kafka2gcp/pipeline.py", line 170, in run
(
File
"/home/<path>beam-pipeline/external/beam/sdks/python/apache_beam/pipeline.py",
line 596, in __exit__
self.result = self.run()
File
"/home/<path>beam-pipeline/external/beam/sdks/python/apache_beam/pipeline.py",
line 573, in run
return self.runner.run_pipeline(self, self._options)
File
"/home/<path>beam-pipeline/external/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
line 131, in run_pipeline
return runner.run_pipeline(pipeline, options)
File
"/home/<path>beam-pipeline/external/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 199, in run_pipeline
self._latest_run_result = self.run_via_runner_api(
File
"/home/<path>beam-pipeline/external/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 208, in run_via_runner_api
return self.run_stages(stage_context, stages)
File
"/home/<path>beam-pipeline/external/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 408, in run_stages
bundle_results = self._execute_bundle(
File
"/home/<path>beam-pipeline/external/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 736, in _execute_bundle
self._run_bundle(
File
"/home/<path>beam-pipeline/external/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 966, in _run_bundle
result, splits = bundle_manager.process_bundle(
File
"/home/<path>beam-pipeline/external/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 1315, in process_bundle
raise RuntimeError(result.error)
RuntimeError: org.apache.beam.sdk.util.UserCodeException:
java.lang.IllegalArgumentException: Unable to encode element
'org.apache.beam.sdk.io.kafka.KafkaIO$ByteArrayKafkaRecord@9318466' with coder
'SchemaCoder<Schema: Fields:
Field\{name=topic, description=, type=STRING NOT NULL, options={{}}}
Field\{name=partition, description=, type=INT32 NOT NULL, options={{}}}
Field\{name=offset, description=, type=INT64 NOT NULL, options={{}}}
Field\{name=timestamp, description=, type=INT64 NOT NULL, options={{}}}
Field\{name=key, description=, type=BYTES NOT NULL, options={{}}}
Field\{name=value, description=, type=BYTES NOT NULL, options={{}}}
Field\{name=headers, description=, type=ARRAY<ROW<key STRING NOT NULL, value
BYTES NOT NULL> NOT NULL> NOT NULL, options={{}}}
Field\{name=timestampTypeId, description=, type=INT32 NOT NULL, options={{}}}
Field\{name=timestampTypeName, description=, type=STRING NOT NULL, options={{}}}
Encoding positions:
{headers=6, timestampTypeName=8, partition=1, offset=2, topic=0, value=5,
key=4, timestamp=3, timestampTypeId=7}
Options:\{{}}UUID: 99952ae5-e379-4f4c-adfa-5052a8d4757d UUID:
99952ae5-e379-4f4c-adfa-5052a8d4757d delegateCoder:
org.apache.beam.sdk.coders.Coder$ByteBuddy$J59aZwdI@33b76b03'.
at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1763)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)
at
org.apache.beam.sdk.io.kafka.KafkaIO$RowsWithMetadata$1.processElement(KafkaIO.java:1768)
at
org.apache.beam.sdk.io.kafka.KafkaIO$RowsWithMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)
at
org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:139)
at
org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
at
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)
at
org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$ReadFn.process(BoundedReadFromUnboundedSource.java:206)
at
org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$ReadFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)
at
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
at
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)
at
org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
at
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)
at
org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1.processElement(Reify.java:206)
at
org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)
at
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
at
org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1.process(ReifyTimestamps.java:70)
at
org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)
at
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
at
org.apache.beam.sdk.transforms.Reshuffle$1.processElement(Reshuffle.java:103)
at
org.apache.beam.sdk.transforms.Reshuffle$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
at
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:179)
at
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
at
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
at
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:528)
at
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
at
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: Unable to encode element
'org.apache.beam.sdk.io.kafka.KafkaIO$ByteArrayKafkaRecord@9318466' with coder
'SchemaCoder<Schema: Fields:
Field\{name=topic, description=, type=STRING NOT NULL, options={{}}}
Field\{name=partition, description=, type=INT32 NOT NULL, options={{}}}
Field\{name=offset, description=, type=INT64 NOT NULL, options={{}}}
Field\{name=timestamp, description=, type=INT64 NOT NULL, options={{}}}
Field\{name=key, description=, type=BYTES NOT NULL, options={{}}}
Field\{name=value, description=, type=BYTES NOT NULL, options={{}}}
Field\{name=headers, description=, type=ARRAY<ROW<key STRING NOT NULL, value
BYTES NOT NULL> NOT NULL> NOT NULL, options={{}}}
Field\{name=timestampTypeId, description=, type=INT32 NOT NULL, options={{}}}
Field\{name=timestampTypeName, description=, type=STRING NOT NULL, options={{}}}
Encoding positions:
{headers=6, timestampTypeName=8, partition=1, offset=2, topic=0, value=5,
key=4, timestamp=3, timestampTypeId=7}
Options:\{{}}UUID: 99952ae5-e379-4f4c-adfa-5052a8d4757d UUID:
99952ae5-e379-4f4c-adfa-5052a8d4757d delegateCoder:
org.apache.beam.sdk.coders.Coder$ByteBuddy$J59aZwdI@33b76b03'.
at
org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
at
org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:374)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:248)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
Caused by: java.lang.RuntimeException: Null value set on non-nullable field
Field\{name=value, description=, type=BYTES NOT NULL, options={{}}}
at
org.apache.beam.sdk.values.RowWithGetters.getValue(RowWithGetters.java:76)
at
org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate(RowCoderGenerator.java:341)
at org.apache.beam.sdk.coders.Coder$ByteBuddy$J59aZwdI.encode(Unknown
Source)
at org.apache.beam.sdk.coders.Coder$ByteBuddy$J59aZwdI.encode(Unknown
Source)
at org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:124)
at
org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)
at
org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:374)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:248)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)
at
org.apache.beam.sdk.io.kafka.KafkaIO$RowsWithMetadata$1.processElement(KafkaIO.java:1768)
at
org.apache.beam.sdk.io.kafka.KafkaIO$RowsWithMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)
at
org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:139)
at
org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
at
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)
at
org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$ReadFn.process(BoundedReadFromUnboundedSource.java:206)
at
org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$ReadFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)
at
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
at
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)
at
org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
at
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)
at
org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1.processElement(Reify.java:206)
at
org.apache.beam.sdk.transforms.Reify$ExtractTimestampsFromValues$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)
at
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
at
org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1.process(ReifyTimestamps.java:70)
at
org.apache.beam.sdk.transforms.ReifyTimestamps$RemoveWildcard$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1760)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2311)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2481)
at
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
at
org.apache.beam.sdk.transforms.Reshuffle$1.processElement(Reshuffle.java:103)
at
org.apache.beam.sdk.transforms.Reshuffle$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:773)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
at
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:179)
at
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
at
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
at
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:528)
at
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
at
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
{quote}
> Kafka XLang fails for ?empty? key/values
> ----------------------------------------
>
> Key: BEAM-10529
> URL: https://issues.apache.org/jira/browse/BEAM-10529
> Project: Beam
> Issue Type: Bug
> Components: cross-language, io-java-kafka
> Reporter: Luke Cwik
> Assignee: John Casey
> Priority: P1
> Fix For: 2.38.0
>
> Time Spent: 25h 20m
> Remaining Estimate: 0h
>
> It looks like the Javadoc for ByteArrayDeserializer and StringDeserializer
> can return null[1, 2] and we aren't using
> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that KafkaIO
> does this correctly in its regular coder inference logic[4].
> 1:
> [https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-|https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-2:]
> [2:|https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-2:]
>
> [https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-]
> 3:
> [https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478]
> 4:
> [https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85]
--
This message was sent by Atlassian Jira
(v8.20.1#820001)