Roman Frigg created BEAM-10524:
----------------------------------

             Summary: Default decoder for ReadFromBigQuery does not support 
repeatable fields
                 Key: BEAM-10524
                 URL: https://issues.apache.org/jira/browse/BEAM-10524
             Project: Beam
          Issue Type: Bug
          Components: sdk-py-core
            Reporter: Roman Frigg


The code in 
[https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L570]

handles decoding fields with mode "REPEATABLE" incorrectly. This bug leads to 
the following stack trace when running a query that returns results with 
repeatable fields represented as JSON arrays. The corresponding stack trace 
looks as follows:
{noformat}
...
 File "apache_beam/runners/common.py", line 1095, in 
apache_beam.runners.common._OutputProcessor.process_outputs
  File 
"/Users/roman/src/ml/data-pipelines/etl/venv/lib/python3.6/site-packages/apache_beam/io/concat_source.py",
 line 89, in read
    range_tracker.sub_range_tracker(source_ix)):
  File 
"/Users/roman/src/ml/data-pipelines/etl/venv/lib/python3.6/site-packages/apache_beam/io/textio.py",
 line 210, in read_records
    yield self._coder.decode(record)
  File 
"/Users/roman/src/ml/data-pipelines/etl/venv/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py",
 line 566, in decode
    return self._decode_with_schema(value, self.fields)
  File 
"/Users/roman/src/ml/data-pipelines/etl/venv/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py",
 line 580, in _decode_with_schema
    value[field.name], field.fields)
  File 
"/Users/roman/src/ml/data-pipelines/etl/venv/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py",
 line 575, in _decode_with_schema
    value[field.name] = None
TypeError: list indices must be integers or slices, not str{noformat}
 

The fix could look something like this (untested):
{code:java}
def _decode_with_schema(self, value, schema_fields):
    for field in schema_fields:
        if field.name not in value:
            # The field exists in the schema, but it doesn't exist in this row.
            # It probably means its value was null, as the extract to JSON job
            # doesn't preserve null fields
            value[field.name] = None
            continue

        if field.type == 'RECORD':
            if field.mode == 'REPEATED':
                value[field.name] = [self._decode_with_schema(val, field.fields)
                                     for val in value[field.name]]
            else:
                value[field.name] = self._decode_with_schema(value[field.name],
                                                             field.fields)
        else:
            try:
                converter = self._converters[field.type]
            except KeyError:
                # No need to do any conversion
                continue

            if field.mode == 'REPEATED':
                value[field.name] = map(converter, value[field.name])
            else:
                value[field.name] = converter(value[field.name])
    return value
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to