[
https://issues.apache.org/jira/browse/BEAM-5953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684498#comment-16684498
]
Mark Liu edited comment on BEAM-5953 at 11/13/18 12:09 AM:
-----------------------------------------------------------
In order to verify the problem comes from corrupted proto file, I did two
things: 1) dump proto into local file before staging and read it same way as
DataflowWorkerHarnessHelper.java does. The proto can be printed successfully.
2) replace existing upload approach ([this
line|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L523])
with google-cloud-storage library:
{code:python}
# Upload pipline.pb via google-cloud-storage
import tempfile
temp_file = tempfile.NamedTemporaryFile()
with open(temp_file.name, 'wb') as f:
f.write(io.BytesIO(job.proto_pipeline.SerializeToString()).read())
gcs_location = FileSystems.join(job.google_cloud_options.staging_location,
'pipeline.pb')
bucket_name, name = gcs_location[5:].split('/', 1)
from google.cloud import storage
client = storage.Client(project='my-project')
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(name)
blob.upload_from_filename(temp_file.name)
logging.info('Starting GCS upload to %s...', gcs_location)
{code}
With the code change, runner harness started successfully. see [this
job|https://pantheon.corp.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-11-12_15_25_55-9696772999324855617?project=google.com:clouddfe].
It could be another case to support moving off google-apitools from SDK
(https://issues.apache.org/jira/browse/BEAM-4850)
was (Author: markflyhigh):
In order to verify the problem comes from corrupted proto file, I did two
things: 1) dump proto into local file before staging and read it same way as
DataflowWorkerHarnessHelper.java does. The proto can be printed successfully.
2) replace existing upload approach ([this
line|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L523])
with google-cloud-storage library:
{code:python}
# Upload pipline.pb via google-cloud-storage
import tempfile
temp_file = tempfile.NamedTemporaryFile()
with open(temp_file.name, 'wb') as f:
f.write(io.BytesIO(job.proto_pipeline.SerializeToString()).read())
gcs_location = FileSystems.join(job.google_cloud_options.staging_location,
'pipeline.pb')
bucket_name, name = gcs_location[5:].split('/', 1)
from google.cloud import storage
client = storage.Client(project='my-project')
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(name)
blob.upload_from_filename(temp_file.name)
logging.info('Starting GCS upload to %s...', gcs_location)
{code}
With the code change, runner harness started successfully. see [this
job|https://pantheon.corp.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-11-12_15_25_55-9696772999324855617?project=google.com:clouddfe]
It could be another case to support moving off google-apitools from SDK
(https://issues.apache.org/jira/browse/BEAM-4850)
> Support DataflowRunner on Python 3
> ----------------------------------
>
> Key: BEAM-5953
> URL: https://issues.apache.org/jira/browse/BEAM-5953
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Mark Liu
> Assignee: Mark Liu
> Priority: Major
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)