[
https://issues.apache.org/jira/browse/BEAM-5462?focusedWorklogId=161678&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-161678
]
ASF GitHub Bot logged work on BEAM-5462:
----------------------------------------
Author: ASF GitHub Bot
Created on: 01/Nov/18 18:18
Start Date: 01/Nov/18 18:18
Worklog Time Spent: 10m
Work Description: aaltay closed pull request #6834: [BEAM-5462] Fix
<pipeline>.options in game example since it's deprecated
URL: https://github.com/apache/beam/pull/6834
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/examples/complete/game/game_stats.py
b/sdks/python/apache_beam/examples/complete/game/game_stats.py
index 57664a15589..5f7a0ed66ee 100644
--- a/sdks/python/apache_beam/examples/complete/game/game_stats.py
+++ b/sdks/python/apache_beam/examples/complete/game/game_stats.py
@@ -159,17 +159,19 @@ def process(self, team_score,
window=beam.DoFn.WindowParam):
class WriteToBigQuery(beam.PTransform):
"""Generate, format, and write BigQuery table row information."""
- def __init__(self, table_name, dataset, schema):
+ def __init__(self, table_name, dataset, schema, project):
"""Initializes the transform.
Args:
table_name: Name of the BigQuery table to use.
dataset: Name of the dataset to use.
schema: Dictionary in the format {'column_name': 'bigquery_type'}
+ project: Name of the Cloud project containing BigQuery table.
"""
super(WriteToBigQuery, self).__init__()
self.table_name = table_name
self.dataset = dataset
self.schema = schema
+ self.project = project
def get_schema(self):
"""Build the output table schema."""
@@ -182,13 +184,12 @@ def get_schema(self):
'%s:%s' % (col, self.schema[col]) for col in self.schema)
def expand(self, pcoll):
- project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
return (
pcoll
| 'ConvertToRow' >> beam.Map(
lambda elem: {col: elem[col] for col in self.schema})
| beam.io.WriteToBigQuery(
- self.table_name, self.dataset, project, self.get_schema()))
+ self.table_name, self.dataset, self.project, self.get_schema()))
# [START abuse_detect]
@@ -354,7 +355,7 @@ def run(argv=None):
'total_score': 'INTEGER',
'window_start': 'STRING',
'processing_time': 'STRING',
- }))
+ }, options.view_as(GoogleCloudOptions).project))
# [START session_calc]
# Detect user sessions-- that is, a burst of activity separated by a gap
@@ -388,7 +389,7 @@ def run(argv=None):
| 'WriteAvgSessionLength' >> WriteToBigQuery(
args.table_name + '_sessions', args.dataset, {
'mean_duration': 'FLOAT',
- }))
+ }, options.view_as(GoogleCloudOptions).project))
# [END rewindow]
diff --git
a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
index d736cf50fce..c46559aa383 100644
--- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
@@ -159,17 +159,19 @@ def process(self, team_score,
window=beam.DoFn.WindowParam):
class WriteToBigQuery(beam.PTransform):
"""Generate, format, and write BigQuery table row information."""
- def __init__(self, table_name, dataset, schema):
+ def __init__(self, table_name, dataset, schema, project):
"""Initializes the transform.
Args:
table_name: Name of the BigQuery table to use.
dataset: Name of the dataset to use.
schema: Dictionary in the format {'column_name': 'bigquery_type'}
+ project: Name of the Cloud project containing BigQuery table.
"""
super(WriteToBigQuery, self).__init__()
self.table_name = table_name
self.dataset = dataset
self.schema = schema
+ self.project = project
def get_schema(self):
"""Build the output table schema."""
@@ -177,13 +179,12 @@ def get_schema(self):
'%s:%s' % (col, self.schema[col]) for col in self.schema)
def expand(self, pcoll):
- project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
return (
pcoll
| 'ConvertToRow' >> beam.Map(
lambda elem: {col: elem[col] for col in self.schema})
| beam.io.WriteToBigQuery(
- self.table_name, self.dataset, project, self.get_schema()))
+ self.table_name, self.dataset, self.project, self.get_schema()))
# [START main]
@@ -291,7 +292,7 @@ def run(argv=None):
'team': 'STRING',
'total_score': 'INTEGER',
'window_start': 'STRING',
- }))
+ }, options.view_as(GoogleCloudOptions).project))
# [END main]
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py
b/sdks/python/apache_beam/examples/complete/game/leader_board.py
index b114b1321b7..cde1544baef 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py
@@ -168,17 +168,19 @@ def process(self, team_score,
window=beam.DoFn.WindowParam):
class WriteToBigQuery(beam.PTransform):
"""Generate, format, and write BigQuery table row information."""
- def __init__(self, table_name, dataset, schema):
+ def __init__(self, table_name, dataset, schema, project):
"""Initializes the transform.
Args:
table_name: Name of the BigQuery table to use.
dataset: Name of the dataset to use.
schema: Dictionary in the format {'column_name': 'bigquery_type'}
+ project: Name of the Cloud project containing BigQuery table.
"""
super(WriteToBigQuery, self).__init__()
self.table_name = table_name
self.dataset = dataset
self.schema = schema
+ self.project = project
def get_schema(self):
"""Build the output table schema."""
@@ -186,13 +188,12 @@ def get_schema(self):
'%s:%s' % (col, self.schema[col]) for col in self.schema)
def expand(self, pcoll):
- project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
return (
pcoll
| 'ConvertToRow' >> beam.Map(
lambda elem: {col: elem[col] for col in self.schema})
| beam.io.WriteToBigQuery(
- self.table_name, self.dataset, project, self.get_schema()))
+ self.table_name, self.dataset, self.project, self.get_schema()))
# [START window_and_trigger]
@@ -329,7 +330,7 @@ def run(argv=None):
'total_score': 'INTEGER',
'window_start': 'STRING',
'processing_time': 'STRING',
- }))
+ }, options.view_as(GoogleCloudOptions).project))
def format_user_score_sums(user_score):
(user, score) = user_score
@@ -343,7 +344,7 @@ def format_user_score_sums(user_score):
args.table_name + '_users', args.dataset, {
'user': 'STRING',
'total_score': 'INTEGER',
- }))
+ }, options.view_as(GoogleCloudOptions).project))
if __name__ == '__main__':
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 161678)
Time Spent: 1h 40m (was: 1.5h)
> get rid of <pipeline>.options deprecation warnings in tests
> -----------------------------------------------------------
>
> Key: BEAM-5462
> URL: https://issues.apache.org/jira/browse/BEAM-5462
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Udi Meiri
> Assignee: Heejong Lee
> Priority: Minor
> Time Spent: 1h 40m
> Remaining Estimate: 0h
>
> Messages look like:
> {{/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/direct/direct_runner.py:360:
> DeprecationWarning: options is deprecated since First stable release.
> References to <pipeline>.options will not be supported}}
> {{pipeline.replace_all(_get_transform_overrides(pipeline.options))}}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)