Here is a test job that sometimes fails and sometimes doesn't (but most
times do).....
There seems to be something stochastic that causes this as after several
tests a couple of them did succeed....


def test_error(
    bq_table: str) -> str:

    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions

    class GenData(beam.DoFn):
        def process(self, _):
            for _ in range (20000):
                yield {'a':1,'b':2}


    def get_bigquery_schema():
        from apache_beam.io.gcp.internal.clients import bigquery

        table_schema = bigquery.TableSchema()
        columns = [
            ["a","integer","nullable"],
            ["b","integer","nullable"]
        ]

        for column in columns:
            column_schema = bigquery.TableFieldSchema()
            column_schema.name = column[0]
            column_schema.type = column[1]
            column_schema.mode = column[2]
            table_schema.fields.append(column_schema)

        return table_schema

    pipeline = beam.Pipeline(options=PipelineOptions(
        project='my-project',
        temp_location = 'gs://my-bucket/temp',
        staging_location = 'gs://my-bucket/staging',
        runner='DataflowRunner'
    ))
    #pipeline = beam.Pipeline()

    (
        pipeline
        | 'Empty start' >> beam.Create([''])
        | 'Generate Data' >> beam.ParDo(GenData())
        #| 'print' >> beam.Map(print)
        | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                    project=bq_table.split(':')[0],
                    dataset=bq_table.split(':')[1].split('.')[0],
                    table=bq_table.split(':')[1].split('.')[1],
                    schema=get_bigquery_schema(),

create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,

write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
    )

    result = pipeline.run()
    result.wait_until_finish()

    return True

test_error(
    bq_table = 'my-project:my_dataset.my_table'
)

On Tue, Feb 4, 2020 at 10:04 AM Alan Krumholz <alan.krumh...@betterup.co>
wrote:

> I tried breaking apart my pipeline. Seems the step that breaks it is:
> beam.io.WriteToBigQuery
>
> Let me see if I can create a self contained example that breaks to share
> with you
>
> Thanks!
>
> On Tue, Feb 4, 2020 at 9:53 AM Pablo Estrada <pabl...@google.com> wrote:
>
>> Hm that's odd. No changes to the pipeline? Are you able to share some of
>> the code?
>>
>> +Udi Meiri <eh...@google.com> do you have any idea what could be going
>> on here?
>>
>> On Tue, Feb 4, 2020 at 9:25 AM Alan Krumholz <alan.krumh...@betterup.co>
>> wrote:
>>
>>> Hi Pablo,
>>> This is strange... it doesn't seem to be the last beam release as last
>>> night it was already using 2.19.0 I wonder if it was some release from the
>>> DataFlow team (not beam related):
>>> Job typeBatch
>>> Job status Succeeded
>>> SDK version
>>> Apache Beam Python 3.5 SDK 2.19.0
>>> Region
>>> us-central1
>>> Start timeFebruary 3, 2020 at 9:28:35 PM GMT-8
>>> Elapsed time5 min 11 sec
>>>
>>> On Tue, Feb 4, 2020 at 9:15 AM Pablo Estrada <pabl...@google.com> wrote:
>>>
>>>> Hi Alan,
>>>> could it be that you're picking up the new Apache Beam 2.19.0 release?
>>>> Could you try depending on beam 2.18.0 to see if the issue surfaces when
>>>> using the new release?
>>>>
>>>> If something was working and no longer works, it sounds like a bug.
>>>> This may have to do with how we pickle (dill / cloudpickle) - see this
>>>> question
>>>> https://stackoverflow.com/questions/42960637/python-3-5-dill-pickling-unpickling-on-different-servers-keyerror-classtype
>>>> Best
>>>> -P.
>>>>
>>>> On Tue, Feb 4, 2020 at 6:22 AM Alan Krumholz <alan.krumh...@betterup.co>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I was running a dataflow job in GCP last night and it was running fine.
>>>>> This morning this same exact job is failing with the following error:
>>>>>
>>>>> Error message from worker: Traceback (most recent call last): File
>>>>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
>>>>> line 286, in loads return dill.loads(s) File
>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 275, in loads
>>>>> return load(file, ignore, **kwds) File
>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 270, in load
>>>>> return Unpickler(file, ignore=ignore, **kwds).load() File
>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 472, in load
>>>>> obj = StockUnpickler.load(self) File
>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in
>>>>> _load_type return _reverse_typemap[name] KeyError: 'ClassType' During
>>>>> handling of the above exception, another exception occurred: Traceback
>>>>> (most recent call last): File
>>>>> "/usr/local/lib/python3.5/site-packages/dataflow_worker/batchworker.py",
>>>>> line 648, in do_work work_executor.execute() File
>>>>> "/usr/local/lib/python3.5/site-packages/dataflow_worker/executor.py", line
>>>>> 176, in execute op.start() File 
>>>>> "apache_beam/runners/worker/operations.py",
>>>>> line 649, in apache_beam.runners.worker.operations.DoOperation.start File
>>>>> "apache_beam/runners/worker/operations.py", line 651, in
>>>>> apache_beam.runners.worker.operations.DoOperation.start File
>>>>> "apache_beam/runners/worker/operations.py", line 652, in
>>>>> apache_beam.runners.worker.operations.DoOperation.start File
>>>>> "apache_beam/runners/worker/operations.py", line 261, in
>>>>> apache_beam.runners.worker.operations.Operation.start File
>>>>> "apache_beam/runners/worker/operations.py", line 266, in
>>>>> apache_beam.runners.worker.operations.Operation.start File
>>>>> "apache_beam/runners/worker/operations.py", line 597, in
>>>>> apache_beam.runners.worker.operations.DoOperation.setup File
>>>>> "apache_beam/runners/worker/operations.py", line 602, in
>>>>> apache_beam.runners.worker.operations.DoOperation.setup File
>>>>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
>>>>> line 290, in loads return dill.loads(s) File
>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 275, in loads
>>>>> return load(file, ignore, **kwds) File
>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 270, in load
>>>>> return Unpickler(file, ignore=ignore, **kwds).load() File
>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 472, in load
>>>>> obj = StockUnpickler.load(self) File
>>>>> "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 577, in
>>>>> _load_type return _reverse_typemap[name] KeyError: 'ClassType'
>>>>>
>>>>>
>>>>> If I use a local runner it still runs fine.
>>>>> Anyone else experiencing something similar today? (or know how to fix
>>>>> this?)
>>>>>
>>>>> Thanks!
>>>>>
>>>>

Reply via email to