This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c9b60bc Minor fixes for python nexmark benchmarks
new d9d1daf Merge pull request #13969 from Minor fixes for python nexmark
benchmarks
c9b60bc is described below
commit c9b60bc6eaaa38ea09618e321cf3b67aa86b7c05
Author: Yichi Zhang <[email protected]>
AuthorDate: Thu Feb 11 16:24:26 2021 -0800
Minor fixes for python nexmark benchmarks
---
.../apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py | 10 +++++++---
.../apache_beam/testing/benchmarks/nexmark/queries/query0.py | 2 +-
.../apache_beam/testing/benchmarks/nexmark/queries/query1.py | 2 +-
.../apache_beam/testing/benchmarks/nexmark/queries/query10.py | 4 ++--
.../apache_beam/testing/benchmarks/nexmark/queries/query11.py | 2 +-
.../apache_beam/testing/benchmarks/nexmark/queries/query12.py | 2 +-
.../apache_beam/testing/benchmarks/nexmark/queries/query2.py | 2 +-
.../apache_beam/testing/benchmarks/nexmark/queries/query3.py | 2 +-
.../apache_beam/testing/benchmarks/nexmark/queries/query4.py | 2 +-
.../apache_beam/testing/benchmarks/nexmark/queries/query5.py | 2 +-
.../apache_beam/testing/benchmarks/nexmark/queries/query6.py | 2 +-
.../apache_beam/testing/benchmarks/nexmark/queries/query7.py | 2 +-
.../apache_beam/testing/benchmarks/nexmark/queries/query8.py | 2 +-
.../apache_beam/testing/benchmarks/nexmark/queries/query9.py | 2 +-
14 files changed, 21 insertions(+), 17 deletions(-)
diff --git
a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
index e7ce033..00f45fb 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_launcher.py
@@ -245,7 +245,7 @@ class NexmarkLauncher(object):
| 'deserialization' >> beam.ParDo(nexmark_util.ParseJsonEventFn()))
return events
- def run_query(self, query, query_args, query_errors):
+ def run_query(self, query, query_args, pipeline_options, query_errors):
try:
self.pipeline = beam.Pipeline(options=self.pipeline_options)
nexmark_util.setup_coder()
@@ -263,7 +263,7 @@ class NexmarkLauncher(object):
events = self.read_from_file()
events = events | 'event_monitor' >> beam.ParDo(event_monitor.doFn)
- output = query.load(events, query_args)
+ output = query.load(events, query_args, pipeline_options)
output | 'result_monitor' >> beam.ParDo(result_monitor.doFn) # pylint:
disable=expression-not-assigned
result = self.pipeline.run()
@@ -430,7 +430,11 @@ class NexmarkLauncher(object):
query_errors = []
for i in self.args.query:
logging.info('Running query %d', i)
- self.run_query(queries[i], query_args, query_errors=query_errors)
+ self.run_query(
+ queries[i],
+ query_args,
+ self.pipeline_options,
+ query_errors=query_errors)
if query_errors:
logging.error('Query failed with %s', ', '.join(query_errors))
diff --git
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py
index a4c5055..8be89a8 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query0.py
@@ -40,7 +40,7 @@ class RoundTripFn(beam.DoFn):
yield recon
-def load(events, query_args=None):
+def load(events, metadata=None, pipeline_options=None):
return (
events
| 'serialization_and_deserialization' >> beam.ParDo(RoundTripFn()))
diff --git
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py
index acb205f..7c84f12 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query1.py
@@ -34,7 +34,7 @@ from apache_beam.testing.benchmarks.nexmark.queries import
nexmark_query_util
USD_TO_EURO = 0.89
-def load(events, query_args=None):
+def load(events, metadata=None, pipeline_options=None):
return (
events
| nexmark_query_util.JustBids()
diff --git
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query10.py
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query10.py
index f678d8d..9e8adbc 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query10.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query10.py
@@ -31,7 +31,7 @@ from apache_beam.transforms import window
from apache_beam.utils.timestamp import Duration
NUM_SHARD_PER_WORKER = 5
-LATE_BATCHING_PERIOD = Duration.of(10)
+LATE_BATCHING_PERIOD = 10
output_path = None
max_num_workers = 5
@@ -78,7 +78,7 @@ def index_path_for(window):
return None
-def load(events, pipeline_options, metadata=None):
+def load(events, metadata=None, pipeline_options=None):
return (
events
| 'query10_shard_events' >> beam.ParDo(ShardEventsDoFn())
diff --git
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query11.py
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query11.py
index 1da48df..2cf5b59 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query11.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query11.py
@@ -33,7 +33,7 @@ from apache_beam.transforms import trigger
from apache_beam.transforms import window
-def load(events, metadata=None):
+def load(events, metadata=None, pipeline_options=None):
return (
events
diff --git
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query12.py
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query12.py
index 92a2d0d..e2b480a 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query12.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query12.py
@@ -32,7 +32,7 @@ from apache_beam.transforms import trigger
from apache_beam.transforms import window
-def load(events, metadata=None):
+def load(events, metadata=None, pipeline_options=None):
return (
events
| nexmark_query_util.JustBids()
diff --git
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py
index db3d33c..7e50fa7 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query2.py
@@ -32,7 +32,7 @@ from apache_beam.testing.benchmarks.nexmark.queries import
nexmark_query_util
from apache_beam.testing.benchmarks.nexmark.queries.nexmark_query_util import
ResultNames
-def load(events, metadata=None):
+def load(events, metadata=None, pipeline_options=None):
return (
events
| nexmark_query_util.JustBids()
diff --git
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query3.py
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query3.py
index f723f5a..2ada862 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query3.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query3.py
@@ -45,7 +45,7 @@ from apache_beam.transforms import window
from apache_beam.transforms.userstate import on_timer
-def load(events, metadata=None):
+def load(events, metadata=None, pipeline_options=None):
num_events_in_pane = 30
windowed_events = (
events
diff --git
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query4.py
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query4.py
index e422619..b4cfb61 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query4.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query4.py
@@ -48,7 +48,7 @@ from
apache_beam.testing.benchmarks.nexmark.queries.nexmark_query_util import Re
from apache_beam.transforms import window
-def load(events, metadata=None):
+def load(events, metadata=None, pipeline_options=None):
# find winning bids for each closed auction
all_winning_bids = (
events
diff --git
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query5.py
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query5.py
index 766a84e..fbfa791 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query5.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query5.py
@@ -39,7 +39,7 @@ from
apache_beam.testing.benchmarks.nexmark.queries.nexmark_query_util import Re
from apache_beam.transforms import window
-def load(events, metadata=None):
+def load(events, metadata=None, pipeline_options=None):
return (
events
| nexmark_query_util.JustBids()
diff --git
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query6.py
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query6.py
index 6dabe37..e759932 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query6.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query6.py
@@ -39,7 +39,7 @@ from apache_beam.transforms import trigger
from apache_beam.transforms import window
-def load(events, metadata=None):
+def load(events, metadata=None, pipeline_options=None):
# find winning bids for each closed auction
return (
events
diff --git
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query7.py
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query7.py
index 8f16b49..50b0f72 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query7.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query7.py
@@ -36,7 +36,7 @@ from apache_beam.testing.benchmarks.nexmark.queries import
nexmark_query_util
from apache_beam.transforms import window
-def load(events, metadata=None):
+def load(events, metadata=None, pipeline_options=None):
# window bids into fixed window
sliding_bids = (
events
diff --git
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query8.py
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query8.py
index 8f0e423..8a3ec1c 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query8.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query8.py
@@ -35,7 +35,7 @@ from
apache_beam.testing.benchmarks.nexmark.queries.nexmark_query_util import Re
from apache_beam.transforms import window
-def load(events, metadata=None):
+def load(events, metadata=None, pipeline_options=None):
# window person and key by persons' id
persons_by_id = (
events
diff --git
a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py
b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py
index 180d850..c1aef26 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/queries/query9.py
@@ -27,7 +27,7 @@ from apache_beam.testing.benchmarks.nexmark.queries import
nexmark_query_util
from apache_beam.testing.benchmarks.nexmark.queries import winning_bids
-def load(events, metadata=None):
+def load(events, metadata=None, pipeline_options=None):
return (
events
| beam.Filter(nexmark_query_util.auction_or_bid)