[
https://issues.apache.org/jira/browse/BEAM-7738?focusedWorklogId=296665&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-296665
]
ASF GitHub Bot logged work on BEAM-7738:
----------------------------------------
Author: ASF GitHub Bot
Created on: 16/Aug/19 23:43
Start Date: 16/Aug/19 23:43
Worklog Time Spent: 10m
Work Description: chadrik commented on issue #9268: [BEAM-7738] Add
external transform support to PubsubIO
URL: https://github.com/apache/beam/pull/9268#issuecomment-522181883
Here's some info on where I am with this. I could really use some help to
push this over the finish line.
The expansion service runs correctly, sends the expanded transforms back to
python, but the job fails inside Java on Flink because it's trying to use the
incorrect serializer. There's a good chance that I'm overlooking something
very obvious.
Here's the stack trace:
```
2019-08-16 16:04:58,297 INFO org.apache.flink.runtime.taskmanager.Task
- Source:
PubSubInflow/ExternalTransform(beam:external:java:pubsub:read:v1)/PubsubUnboundedSource/Read(PubsubSource)
(1/1) (93e578d8ae737c7502685cd978413a98) switched from RUNNING to FAILED.
java.lang.RuntimeException: org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage
cannot be cast to [B
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.emitElement(UnboundedSourceWrapper.java:342)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:268)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
at
org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException:
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage cannot be cast to [B
at
org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
at
org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56)
at
org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:105)
at
org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:81)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:578)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:569)
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
at
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:87)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
at
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:78)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:160)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 15 more
```
Here's the serializer that `CoderTypeSerializer.serialize` is using before
the exception:
```
WindowedValue$FullWindowedValueCoder(
ValueWithRecordId$ValueWithRecordIdCoder(
LengthPrefixCoder(
ByteArrayCoder)
),
GlobalWindow$Coder
)
```
Here's the value that's being serialized:
```
TimestampedValueInGlobalWindow{
value=ValueWithRecordId{
id=[54, 56, 54, 48, 48, 52, 48, 49, 49, 57, 55, 54, 52, 53, 48],
value=org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage@64d3b2a
},
timestamp=2019-08-16T20:04:29.230Z,
pane=PaneInfo.NO_FIRING
}
```
Obviously ByteArrayCoder is not the right serializer for PubsubMessage.
Note that I get the same error whether `with_attributes` is enabled or
disabled.
So why is `ByteArrayCoder` being used?
The graph that's generated after expansion looks right to me. Here's a
table of the primitive transforms and their PCollections (note the last row is
a python pardo).
| transform name
| transform type | out
collection | out collection coder | |
|------------------------------------------------------------------------------------|-------------------------------------------------------------|----------------|----------------------------------|---|
| external_1root/PubsubUnboundedSource/Read(PubsubSource)
| beam:transform:read:v1 | PubsubMessage |
PubsubMessageWithAttributesCoder | |
|
external_1root/PubsubUnboundedSource/PubsubUnboundedSource.Stats/ParMultiDo(Stats)
| beam:transform:pardo:v1 of StatsFn | PubsubMessage
| PubsubMessageWithAttributesCoder | |
| external_1root/MapElements/Map/ParMultiDo(Anonymous)
| beam:transform:pardo:v1 of ParsePayloadAsPubsubMessageProto | byte[]
| ByteArrayCoder | |
| ref_AppliedPTransform_PubSubInflow/Map(_from_proto_str)_4
| beam:transform:pardo:v1 of _from_proto_str |
PubsubMessage | Pickle | |
So as far as the Pipeline definition is concerned, it seems like the output
of `Read(PubsubSource)` is properly associated with the
`PubsubMessageWithAttributesCoder`, but when when the job runs it's using
`ByteArrayCoder`. So maybe somehow the wires are getting crossed with
`ParMultiDo(Anonymous)`, which uses `ByteArrayCoder`.
Any ideas?
The last thing that's important to note is that I needed `PubsubIO.Read` to
output a data type that is compatible with Python, so I wrote a custom parseFn
that converts each message to a protobuf byte array. So the PCollection that
bridges the divide between Java and Python uses a BytesCoder, but the
transforms on either side do the work of converting to and from protobuf
representation of a PubsubMessage.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 296665)
Time Spent: 2h (was: 1h 50m)
> Support PubSubIO to be configured externally for use with other SDKs
> --------------------------------------------------------------------
>
> Key: BEAM-7738
> URL: https://issues.apache.org/jira/browse/BEAM-7738
> Project: Beam
> Issue Type: New Feature
> Components: io-java-gcp, runner-flink, sdk-py-core
> Reporter: Chad Dombrova
> Assignee: Chad Dombrova
> Priority: Major
> Labels: portability
> Time Spent: 2h
> Remaining Estimate: 0h
>
> Now that KafkaIO is supported via the external transform API (BEAM-7029) we
> should add support for PubSub.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)