This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c5b15f6 Cythonizing a few hot calls (#5733)
c5b15f6 is described below
commit c5b15f6100237eac4a06fecb16d0cc1deefbe8a7
Author: Ahmet Altay <[email protected]>
AuthorDate: Mon Jun 25 22:59:11 2018 -0700
Cythonizing a few hot calls (#5733)
* Cythonizing a few more hot calls.
---
.../apache_beam/runners/worker/operations.pxd | 11 ++++++++---
.../apache_beam/runners/worker/operations.py | 22 +++++++++++-----------
2 files changed, 19 insertions(+), 14 deletions(-)
diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd
b/sdks/python/apache_beam/runners/worker/operations.pxd
index 0aee337..5a36bba 100644
--- a/sdks/python/apache_beam/runners/worker/operations.pxd
+++ b/sdks/python/apache_beam/runners/worker/operations.pxd
@@ -26,6 +26,7 @@ from apache_beam.metrics.execution cimport
ScopedMetricsContainer
cdef WindowedValue _globally_windowed_value
cdef type _global_window_type
+
cdef class ConsumerSet(Receiver):
cdef list consumers
cdef readonly opcounters.OperationCounters opcounter
@@ -62,24 +63,25 @@ cdef class Operation(object):
cpdef start(self)
cpdef process(self, WindowedValue windowed_value)
cpdef finish(self)
-
cpdef output(self, WindowedValue windowed_value, int output_index=*)
+ cpdef progress_metrics(self)
+
cdef class ReadOperation(Operation):
@cython.locals(windowed_value=WindowedValue)
cpdef start(self)
+
cdef class DoOperation(Operation):
cdef object dofn_runner
cdef Receiver dofn_receiver
cdef object tagged_receivers
cdef object side_input_maps
+
cdef class CombineOperation(Operation):
cdef object phased_combine_fn
-cdef class FlattenOperation(Operation):
- pass
cdef class PGBKCVOperation(Operation):
cdef public object combine_fn
@@ -90,3 +92,6 @@ cdef class PGBKCVOperation(Operation):
cpdef output_key(self, tuple wkey, value)
+
+cdef class FlattenOperation(Operation):
+ pass
diff --git a/sdks/python/apache_beam/runners/worker/operations.py
b/sdks/python/apache_beam/runners/worker/operations.py
index 9aa29b8..729ec82 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -163,14 +163,14 @@ class Operation(object):
self.consumers[i], coder)
for i, coder in enumerate(self.spec.output_coders)]
- def finish(self):
- """Finish operation."""
- pass
-
def process(self, o):
"""Process element in operation."""
pass
+ def finish(self):
+ """Finish operation."""
+ pass
+
def output(self, windowed_value, output_index=0):
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
@@ -395,14 +395,14 @@ class DoOperation(Operation):
self.dofn_runner.start()
- def finish(self):
- with self.scoped_finish_state:
- self.dofn_runner.finish()
-
def process(self, o):
with self.scoped_process_state:
self.dofn_receiver.receive(o)
+ def finish(self):
+ with self.scoped_finish_state:
+ self.dofn_runner.finish()
+
def progress_metrics(self):
metrics = super(DoOperation, self).progress_metrics()
if self.tagged_receivers:
@@ -435,9 +435,6 @@ class CombineOperation(Operation):
self.phased_combine_fn = (
PhasedCombineFnExecutor(self.spec.phase, fn, args, kwargs))
- def finish(self):
- logging.debug('Finishing %s', self)
-
def process(self, o):
with self.scoped_process_state:
if self.debug_logging_enabled:
@@ -446,6 +443,9 @@ class CombineOperation(Operation):
self.output(
o.with_value((key, self.phased_combine_fn.apply(values))))
+ def finish(self):
+ logging.debug('Finishing %s', self)
+
def create_pgbk_op(step_name, spec, counter_factory, state_sampler):
if spec.combine_fn: