This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c9b60bc  Minor fixes for python nexmark benchmarks
     new d9d1daf  Merge pull request #13969 from Minor fixes for python nexmark 
benchmarks
c9b60bc is described below

commit c9b60bc6eaaa38ea09618e321cf3b67aa86b7c05
Author: Yichi Zhang <[email protected]>
AuthorDate: Thu Feb 11 16:24:26 2021 -0800

    Minor fixes for python nexmark benchmarks
---
 .../apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py | 10 +++++++---
 .../apache_beam/testing/benchmarks/nexmark/queries/query0.py   |  2 +-
 .../apache_beam/testing/benchmarks/nexmark/queries/query1.py   |  2 +-
 .../apache_beam/testing/benchmarks/nexmark/queries/query10.py  |  4 ++--
 .../apache_beam/testing/benchmarks/nexmark/queries/query11.py  |  2 +-
 .../apache_beam/testing/benchmarks/nexmark/queries/query12.py  |  2 +-
 .../apache_beam/testing/benchmarks/nexmark/queries/query2.py   |  2 +-
 .../apache_beam/testing/benchmarks/nexmark/queries/query3.py   |  2 +-
 .../apache_beam/testing/benchmarks/nexmark/queries/query4.py   |  2 +-
 .../apache_beam/testing/benchmarks/nexmark/queries/query5.py   |  2 +-
 .../apache_beam/testing/benchmarks/nexmark/queries/query6.py   |  2 +-
 .../apache_beam/testing/benchmarks/nexmark/queries/query7.py   |  2 +-
 .../apache_beam/testing/benchmarks/nexmark/queries/query8.py   |  2 +-
 .../apache_beam/testing/benchmarks/nexmark/queries/query9.py   |  2 +-
 14 files changed, 21 insertions(+), 17 deletions(-)

diff --git 
a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py 
b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
index e7ce033..00f45fb 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
@@ -245,7 +245,7 @@ class NexmarkLauncher(object):
         | 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn()))
     return events
 
-  def run_query(self, query, query_args, query_errors):
+  def run_query(self, query, query_args, pipeline_options, query_errors):
     try:
       self.pipeline = beam.Pipeline(options=self.pipeline_options)
       nexmark_util.setup_coder()
@@ -263,7 +263,7 @@ class NexmarkLauncher(object):
         events = self.read_from_file()
 
       events = events | 'event_monitor' >> beam.ParDo(event_monitor.doFn)
-      output = query.load(events, query_args)
+      output = query.load(events, query_args, pipeline_options)
       output | 'result_monitor' >> beam.ParDo(result_monitor.doFn)  # pylint: 
disable=expression-not-assigned
 
       result = self.pipeline.run()
@@ -430,7 +430,11 @@ class NexmarkLauncher(object):
     query_errors = []
     for i in self.args.query:
       logging.info('Running query %d', i)
-      self.run_query(queries[i], query_args, query_errors=query_errors)
+      self.run_query(
+          queries[i],
+          query_args,
+          self.pipeline_options,
+          query_errors=query_errors)
 
     if query_errors:
       logging.error('Query failed with %s', ', '.join(query_errors))
diff --git 
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py 
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py
index a4c5055..8be89a8 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py
@@ -40,7 +40,7 @@ class RoundTripFn(beam.DoFn):
     yield recon
 
 
-def load(events, query_args=None):
+def load(events, metadata=None, pipeline_options=None):
   return (
       events
       | 'serialization_and_deserialization' >> beam.ParDo(RoundTripFn()))
diff --git 
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py 
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py
index acb205f..7c84f12 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py
@@ -34,7 +34,7 @@ from apache_beam.testing.benchmarks.nexmark.queries import 
nexmark_query_util
 USD_TO_EURO = 0.89
 
 
-def load(events, query_args=None):
+def load(events, metadata=None, pipeline_options=None):
   return (
       events
       | nexmark_query_util.JustBids()
diff --git 
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query10.py 
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query10.py
index f678d8d..9e8adbc 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query10.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query10.py
@@ -31,7 +31,7 @@ from apache_beam.transforms import window
 from apache_beam.utils.timestamp import Duration
 
 NUM_SHARD_PER_WORKER = 5
-LATE_BATCHING_PERIOD = Duration.of(10)
+LATE_BATCHING_PERIOD = 10
 
 output_path = None
 max_num_workers = 5
@@ -78,7 +78,7 @@ def index_path_for(window):
     return None
 
 
-def load(events, pipeline_options, metadata=None):
+def load(events, metadata=None, pipeline_options=None):
   return (
       events
       | 'query10_shard_events' >> beam.ParDo(ShardEventsDoFn())
diff --git 
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query11.py 
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query11.py
index 1da48df..2cf5b59 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query11.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query11.py
@@ -33,7 +33,7 @@ from apache_beam.transforms import trigger
 from apache_beam.transforms import window
 
 
-def load(events, metadata=None):
+def load(events, metadata=None, pipeline_options=None):
 
   return (
       events
diff --git 
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query12.py 
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query12.py
index 92a2d0d..e2b480a 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query12.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query12.py
@@ -32,7 +32,7 @@ from apache_beam.transforms import trigger
 from apache_beam.transforms import window
 
 
-def load(events, metadata=None):
+def load(events, metadata=None, pipeline_options=None):
   return (
       events
       | nexmark_query_util.JustBids()
diff --git 
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py 
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py
index db3d33c..7e50fa7 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py
@@ -32,7 +32,7 @@ from apache_beam.testing.benchmarks.nexmark.queries import 
nexmark_query_util
 from apache_beam.testing.benchmarks.nexmark.queries.nexmark_query_util import 
ResultNames
 
 
-def load(events, metadata=None):
+def load(events, metadata=None, pipeline_options=None):
   return (
       events
       | nexmark_query_util.JustBids()
diff --git 
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query3.py 
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query3.py
index f723f5a..2ada862 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query3.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query3.py
@@ -45,7 +45,7 @@ from apache_beam.transforms import window
 from apache_beam.transforms.userstate import on_timer
 
 
-def load(events, metadata=None):
+def load(events, metadata=None, pipeline_options=None):
   num_events_in_pane = 30
   windowed_events = (
       events
diff --git 
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query4.py 
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query4.py
index e422619..b4cfb61 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query4.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query4.py
@@ -48,7 +48,7 @@ from 
apache_beam.testing.benchmarks.nexmark.queries.nexmark_query_util import Re
 from apache_beam.transforms import window
 
 
-def load(events, metadata=None):
+def load(events, metadata=None, pipeline_options=None):
   # find winning bids for each closed auction
   all_winning_bids = (
       events
diff --git 
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query5.py 
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query5.py
index 766a84e..fbfa791 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query5.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query5.py
@@ -39,7 +39,7 @@ from 
apache_beam.testing.benchmarks.nexmark.queries.nexmark_query_util import Re
 from apache_beam.transforms import window
 
 
-def load(events, metadata=None):
+def load(events, metadata=None, pipeline_options=None):
   return (
       events
       | nexmark_query_util.JustBids()
diff --git 
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query6.py 
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query6.py
index 6dabe37..e759932 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query6.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query6.py
@@ -39,7 +39,7 @@ from apache_beam.transforms import trigger
 from apache_beam.transforms import window
 
 
-def load(events, metadata=None):
+def load(events, metadata=None, pipeline_options=None):
   # find winning bids for each closed auction
   return (
       events
diff --git 
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query7.py 
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query7.py
index 8f16b49..50b0f72 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query7.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query7.py
@@ -36,7 +36,7 @@ from apache_beam.testing.benchmarks.nexmark.queries import 
nexmark_query_util
 from apache_beam.transforms import window
 
 
-def load(events, metadata=None):
+def load(events, metadata=None, pipeline_options=None):
   # window bids into fixed window
   sliding_bids = (
       events
diff --git 
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query8.py 
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query8.py
index 8f0e423..8a3ec1c 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query8.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query8.py
@@ -35,7 +35,7 @@ from 
apache_beam.testing.benchmarks.nexmark.queries.nexmark_query_util import Re
 from apache_beam.transforms import window
 
 
-def load(events, metadata=None):
+def load(events, metadata=None, pipeline_options=None):
   # window person and key by persons' id
   persons_by_id = (
       events
diff --git 
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py 
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py
index 180d850..c1aef26 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py
@@ -27,7 +27,7 @@ from apache_beam.testing.benchmarks.nexmark.queries import 
nexmark_query_util
 from apache_beam.testing.benchmarks.nexmark.queries import winning_bids
 
 
-def load(events, metadata=None):
+def load(events, metadata=None, pipeline_options=None):
   return (
       events
       | beam.Filter(nexmark_query_util.auction_or_bid)

Reply via email to