[
https://issues.apache.org/jira/browse/BEAM-10996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208270#comment-17208270
]
Ning Kang commented on BEAM-10996:
----------------------------------
First, I have to clarify this is not related to the notebook, the pipeline
fails no matter where it runs.
Second, there are 2 problems with the pipeline:
When using `beam.FlatMap`, the 3 serialized strings are flattened into dozens
of integers. Thus you have the assertion error because the elements associated
with the PCollection are now of type `int`. You should use `beam.Map` instead
of `beam.FlatMap`. If this is intended, you have to convert the int elements
into strings such as appending another transform `beam.Map(lambda x:
str(x))`.The SerializeToString is not decodable by Beam. You can enclose it
with `return base64.b64encode(tfexample.SerializeToString()).decode('utf-8')`.
> AssertionError: (10, <class 'int'>) when writing TF Records
> -----------------------------------------------------------
>
> Key: BEAM-10996
> URL: https://issues.apache.org/jira/browse/BEAM-10996
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Affects Versions: 2.22.0
> Reporter: Valliappa Lakshmanan
> Assignee: Ning Kang
> Priority: P1
> Attachments: repro.py
>
>
> This code snippet:
>
> def create_tfrecord(x):
> size = np.array([2.0, 3.0])
> tfexample = tf.train.Example(
> features=tf.train.Features(
> feature={
> 'size': tf.train.Feature(float_list=tf.train.FloatList(value=size))
> }))
> return tfexample.SerializeToString()
>
> ...
> beam.FlatMap(lambda x: create_tfrecord(x))
> ...
>
> throws this error:
>
> Traceback (most recent call last): File "apache_beam/runners/common.py", line
> 961, in apache_beam.runners.common.DoFnRunner.process File
> "apache_beam/runners/common.py", line 726, in
> apache_beam.runners.common.PerWindowInvoker.invoke_process File
> "apache_beam/runners/common.py", line 814, in
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File
> "/opt/conda/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1061,
> in process self.writer.write(element) File
> "/opt/conda/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py",
> line 420, in write self.sink.write_record(self.temp_handle, value) File
> "/opt/conda/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py",
> line 146, in write_record self.write_encoded_record(file_handle,
> self.coder.encode(value)) File
> "/opt/conda/lib/python3.7/site-packages/apache_beam/coders/coders.py", line
> 463, in encode return self.get_impl().encode(value) File
> "apache_beam/coders/coder_impl.py", line 494, in
> apache_beam.coders.coder_impl.BytesCoderImpl.encode File
> "apache_beam/coders/coder_impl.py", line 495, in
> apache_beam.coders.coder_impl.BytesCoderImpl.encode AssertionError: (10,
> <class 'int'>)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)