Deepak Verma created BEAM-9417:
----------------------------------
Summary: Unable to Read form BigQuery and File system in same
pipeline
Key: BEAM-9417
URL: https://issues.apache.org/jira/browse/BEAM-9417
Project: Beam
Issue Type: Bug
Components: io-py-gcp
Environment: macbook pro cataline, python3.7, apache-beam[gcp]===2.19.0
Reporter: Deepak Verma
I am trying to read from Bigquery and Local file system in my apache beam[gcp]
pipeline.
{code:java}
pipeline_options = PipelineOptions()
options = pipeline_options.view_as(PreProcessOptions)
options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=options)
apn_query = "select * from `{bq_project}.config.apn` where
customer='{customer}'"\
.format(bq_project=options.bq_project, customer=options.customer)
file_path = "mycsv.csv.gz"
apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query,
use_standard_sql=True))
preprocess_rows = p | beam.io.ReadFromText(file_path, coder=UnicodeCoder())
{code}
When I am running this job, I am getting below error
{code:java}
Traceback (most recent call last):
File "/etl/dataflow/etlTXLPreprocessor.py", line 125, in <module>
run()
File "/etl/dataflow/etlTXLPreprocessor.py", line 120, in run
p.run().wait_until_finish()
File
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line
461, in run
self._options).run(False)
File
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line
474, in run
return self.runner.run_pipeline(self, self._options)
File
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
line 182, in run_pipeline
return runner.run_pipeline(pipeline, options)
File
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
line 413, in run_pipeline
pipeline.replace_all(_get_transform_overrides(options))
File
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line
443, in replace_all
self._replace(override)
File
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line
340, in _replace
self.visit(TransformUpdater(self))
File
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line
503, in visit
self._root_transform().visit(visitor, self, visited)
File
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line
939, in visit
part.visit(visitor, pipeline, visited)
File
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line
939, in visit
part.visit(visitor, pipeline, visited)
File
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line
939, in visit
part.visit(visitor, pipeline, visited)
File
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line
942, in visit
visitor.visit_transform(self)
File
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line
338, in visit_transform
self._replace_if_needed(transform_node)
File
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line
301, in _replace_if_needed
new_output = replacement_transform.expand(input_node)
File
"/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/sdf_direct_runner.py",
line 87, in expand
invoker = DoFnInvoker.create_invoker(signature, process_invocation=False)
File "apache_beam/runners/common.py", line 360, in
apache_beam.runners.common.DoFnInvoker.create_invoker
TypeError: create_invoker() takes at least 2 positional arguments (1
given){code}
But If I run my code like this
{code:java}
pipeline_options = PipelineOptions()
options = pipeline_options.view_as(PreProcessOptions)
options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=options)
file_path = "mycsv.csv.gz"
preprocess_rows = p | beam.io.ReadFromText(file_path, coder=UnicodeCoder())
{code}
or like this
{code:java}
pipeline_options = PipelineOptions()
options = pipeline_options.view_as(PreProcessOptions)
options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=options)
apn_query = "select * from `{bq_project}.config.apn` where
customer='{customer}'"\
.format(bq_project=options.bq_project, customer=options.customer)
apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query,
use_standard_sql=True))
{code}
or even like this
{code:java}
pipeline_options = PipelineOptions()
options = pipeline_options.view_as(PreProcessOptions)
options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=options)
apn_query = "select * from `{bq_project}.config.apn` where
customer='{customer}'"\
.format(bq_project=options.bq_project, customer=options.customer)
apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query,
use_standard_sql=True))
apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query,
use_standard_sql=True)){code}
the code just works fine.
Is it a limitation of the apache beam to read from the same source?
If so, can we add this feature?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)