[ 
https://issues.apache.org/jira/browse/BEAM-3824?focusedWorklogId=83473&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83473
 ]

ASF GitHub Bot logged work on BEAM-3824:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Mar/18 05:27
            Start Date: 23/Mar/18 05:27
    Worklog Time Spent: 10m 
      Work Description: aaltay closed pull request #4932: [BEAM-3824] Convert 
big query writes to beam.io.WriteToBigQuery in mobile gaming example
URL: https://github.com/apache/beam/pull/4932
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/examples/complete/game/game_stats.py 
b/sdks/python/apache_beam/examples/complete/game/game_stats.py
index 1f13ed180f6..11b81fe1bd2 100644
--- a/sdks/python/apache_beam/examples/complete/game/game_stats.py
+++ b/sdks/python/apache_beam/examples/complete/game/game_stats.py
@@ -40,7 +40,7 @@
 publishing.
 
 To run the Java injector:
-<beam_root>/examples/java8$ mvn compile exec:java \
+<beam_root>/examples/java$ mvn compile exec:java \
     -Dexec.mainClass=org.apache.beam.examples.complete.game.injector.Injector \
     -Dexec.args="$PROJECT_ID $PUBSUB_TOPIC none"
 
@@ -68,13 +68,6 @@
     --dataset $BIGQUERY_DATASET \
     --runner DataflowRunner \
     --temp_location gs://$BUCKET/user_score/temp
-
---------------------------------------------------------------------------------
-NOTE [BEAM-2354]: This example is not yet runnable by DataflowRunner.
-    The runner still needs support for:
-      * the --save_main_session flag when streaming is enabled
-      * combiners
---------------------------------------------------------------------------------
 """
 
 from __future__ import absolute_import
@@ -182,22 +175,19 @@ def get_schema(self):
     return ', '.join(
         '%s:%s' % (col, self.schema[col]) for col in self.schema)
 
-  def get_table(self, pipeline):
-    """Utility to construct an output table reference."""
-    project = pipeline.options.view_as(GoogleCloudOptions).project
-    return '%s:%s.%s' % (project, self.dataset, self.table_name)
+  def get_schema(self):
+    """Build the output table schema."""
+    return ', '.join(
+        '%s:%s' % (col, self.schema[col]) for col in self.schema)
 
   def expand(self, pcoll):
-    table = self.get_table(pcoll.pipeline)
+    project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
     return (
         pcoll
         | 'ConvertToRow' >> beam.Map(
             lambda elem: {col: elem[col] for col in self.schema})
-        | beam.io.Write(beam.io.BigQuerySink(
-            table,
-            schema=self.get_schema(),
-            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
+        | beam.io.WriteToBigQuery(
+            self.table_name, self.dataset, project, self.get_schema()))
 
 
 # [START abuse_detect]
diff --git 
a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py 
b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
index b286a6a5ddf..e99ab23ec67 100644
--- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
@@ -175,22 +175,14 @@ def get_schema(self):
     return ', '.join(
         '%s:%s' % (col, self.schema[col]) for col in self.schema)
 
-  def get_table(self, pipeline):
-    """Utility to construct an output table reference."""
-    project = pipeline.options.view_as(GoogleCloudOptions).project
-    return '%s:%s.%s' % (project, self.dataset, self.table_name)
-
   def expand(self, pcoll):
-    table = self.get_table(pcoll.pipeline)
+    project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
     return (
         pcoll
         | 'ConvertToRow' >> beam.Map(
             lambda elem: {col: elem[col] for col in self.schema})
-        | beam.io.Write(beam.io.BigQuerySink(
-            table,
-            schema=self.get_schema(),
-            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
+        | beam.io.WriteToBigQuery(
+            self.table_name, self.dataset, project, self.get_schema()))
 
 
 # [START main]
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py 
b/sdks/python/apache_beam/examples/complete/game/leader_board.py
index e207f26712e..99a8e092822 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py
@@ -48,7 +48,7 @@
 publishing.
 
 To run the Java injector:
-<beam_root>/examples/java8$ mvn compile exec:java \
+<beam_root>/examples/java$ mvn compile exec:java \
     -Dexec.mainClass=org.apache.beam.examples.complete.game.injector.Injector \
     -Dexec.args="$PROJECT_ID $PUBSUB_TOPIC none"
 
@@ -76,12 +76,6 @@
     --dataset $BIGQUERY_DATASET \
     --runner DataflowRunner \
     --temp_location gs://$BUCKET/user_score/temp
-
---------------------------------------------------------------------------------
-NOTE [BEAM-2354]: This example is not yet runnable by DataflowRunner.
-    The runner still needs support for:
-      * the --save_main_session flag when streaming is enabled
---------------------------------------------------------------------------------
 """
 
 from __future__ import absolute_import
@@ -190,22 +184,14 @@ def get_schema(self):
     return ', '.join(
         '%s:%s' % (col, self.schema[col]) for col in self.schema)
 
-  def get_table(self, pipeline):
-    """Utility to construct an output table reference."""
-    project = pipeline.options.view_as(GoogleCloudOptions).project
-    return '%s:%s.%s' % (project, self.dataset, self.table_name)
-
   def expand(self, pcoll):
-    table = self.get_table(pcoll.pipeline)
+    project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
     return (
         pcoll
         | 'ConvertToRow' >> beam.Map(
             lambda elem: {col: elem[col] for col in self.schema})
-        | beam.io.Write(beam.io.BigQuerySink(
-            table,
-            schema=self.get_schema(),
-            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
+        | beam.io.WriteToBigQuery(
+            self.table_name, self.dataset, project, self.get_schema()))
 
 
 # [START window_and_trigger]


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 83473)
    Time Spent: 1h  (was: 50m)

> Use  WriteToBigQuery in Python mobile gaming examples. 
> -------------------------------------------------------
>
>                 Key: BEAM-3824
>                 URL: https://issues.apache.org/jira/browse/BEAM-3824
>             Project: Beam
>          Issue Type: Bug
>          Components: examples-python
>            Reporter: Valentyn Tymofieiev
>            Assignee: David Cavazos
>            Priority: Minor
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> python -m apache_beam.examples.complete.game.hourly_team_score 
> --project=$PROJECT --dataset=beam_release_2_4_0 
> --input=gs://$BUCKET/mobile/first_5000_gaming_data.csv
> The pipeline fails with:
> INFO:root:finish <DoOperation WriteTeamScoreSums/WriteToBigQuery 
> output_tags=['out'], 
> receivers=[ConsumerSet[WriteTeamScoreSums/WriteToBigQuery.out0, 
> coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]> 
> Traceback (most recent call last):
>  File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
>  "__main__", fname, loader, pkg_name) 
>  File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
>  exec code in run_globals 
>  File 
> "/tmp/release_testing/r2.4.0_env/lib/python2.7/site-packages/apache_beam/examples/complete/game/hourly_team_score.py",
>  line 276, in <
> module> 
>  run() 
>  File 
> "/tmp/release_testing/r2.4.0_env/lib/python2.7/site-packages/apache_beam/examples/complete/game/hourly_team_score.py",
>  line 270, in r
> un 
>  write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)) 
>  File 
> "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/pipeline.py",
>  line 389, in __exit__
>  self.run().wait_until_finish() 
>  File 
> "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/pipeline.py",
>  line 369, in run
>  self.to_runner_api(), self.runner, self._options).run(False) 
>  File 
> "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/pipeline.py",
>  line 382, in run
>  return self.runner.run_pipeline(self) 
>  File 
> "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py",
>  line 129, in run_pip
> eline 
>  return runner.run_pipeline(pipeline)
>  File 
> "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
>  line 215, in ru
> n_pipeline 
>  return self.run_via_runner_api(pipeline.to_runner_api())
>  File 
> "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
>  line 218, in ru
> n_via_runner_api 
>  return self.run_stages(*self.create_stages(pipeline_proto))
>  File 
> "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
>  line 837, in ru
> n_stages 
>  pcoll_buffers, safe_coders).process_bundle.metrics
>  File 
> "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
>  line 938, in ru
> n_stage 
>  self._progress_frequency).process_bundle(data_input, data_output)
>  File 
> "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
>  line 1110, in p
> rocess_bundle 
>  result_future = self._controller.control_handler.push(process_bundle)
>  File 
> "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
>  line 1003, in p
> ush 
>  response = self.worker.do_instruction(request)
>  File 
> "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 185, in do_instruc
> tion 
>  request.instruction_id) 
>  File 
> "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 202, in process_bu
> ndle 
>  processor.process_bundle(instruction_id)
>  File 
> "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 298, in proc
> ess_bundle 
>  op.finish() 
>  File 
> "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/worker/operations.py",
>  line 389, in finish
>  self.dofn_runner.finish()
>  File 
> "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/common.py",
>  line 517, in finish
>  self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle)
>  File 
> "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/common.py",
>  line 508, in _invoke_bundle_method
>  self._reraise_augmented(exn) 
>  File 
> "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/apache_beam/runners/common.py",
>  line 537, in _reraise_augmented
>  six.raise_from(new_exn, original_traceback)
>  File 
> "/tmp/release_testing/r2.4.0_env/local/lib/python2.7/site-packages/six.py", 
> line 737, in raise_from
>  raise value 
> RuntimeError: Could not successfully insert rows to BigQuery table 
> [google.com:clouddfe:beam_release_2_4_0.leader_board]. Errors: [<InsertEr
> rorsValueListEntry
>  errors: [<ErrorProto 
>  debugInfo: u'' 
>  location: u'processing_time' 
>  message: u'no such field.'
>  reason: u'invalid'>]
>  index: 0>, <InsertErrorsValueListEntry
>  errors: [<ErrorProto
>  debugInfo: u''
>  location: u'processing_time'
>  message: u'no such field.'
>  reason: u'invalid'>]
>  index: 1>, <InsertErrorsValueListEntry
>  errors: [<ErrorProto
>  debugInfo: u''
>  location: u'processing_time'
>  message: u'no such field.'
>  reason: u'invalid'>]
>  index: 2>, <InsertErrorsValueListEntry
>  errors: [<ErrorProto
>  debugInfo: u''
>  location: u'processing_time'
>  message: u'no such field.'
>  reason: u'invalid'>]
>  index: 3>, <InsertErrorsValueListEntry
>  errors: [<ErrorProto
>  debugInfo: u''
>  location: u'processing_time'
>  message: u'no such field.'
>  reason: u'invalid'>]
> ...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to