This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c5b15f6  Cythonizing a few hot calls (#5733)
c5b15f6 is described below

commit c5b15f6100237eac4a06fecb16d0cc1deefbe8a7
Author: Ahmet Altay <[email protected]>
AuthorDate: Mon Jun 25 22:59:11 2018 -0700

    Cythonizing a few hot calls (#5733)
    
    * Cythonizing a few more hot calls.
---
 .../apache_beam/runners/worker/operations.pxd      | 11 ++++++++---
 .../apache_beam/runners/worker/operations.py       | 22 +++++++++++-----------
 2 files changed, 19 insertions(+), 14 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd 
b/sdks/python/apache_beam/runners/worker/operations.pxd
index 0aee337..5a36bba 100644
--- a/sdks/python/apache_beam/runners/worker/operations.pxd
+++ b/sdks/python/apache_beam/runners/worker/operations.pxd
@@ -26,6 +26,7 @@ from apache_beam.metrics.execution cimport 
ScopedMetricsContainer
 cdef WindowedValue _globally_windowed_value
 cdef type _global_window_type
 
+
 cdef class ConsumerSet(Receiver):
   cdef list consumers
   cdef readonly opcounters.OperationCounters opcounter
@@ -62,24 +63,25 @@ cdef class Operation(object):
   cpdef start(self)
   cpdef process(self, WindowedValue windowed_value)
   cpdef finish(self)
-
   cpdef output(self, WindowedValue windowed_value, int output_index=*)
+  cpdef progress_metrics(self)
+
 
 cdef class ReadOperation(Operation):
   @cython.locals(windowed_value=WindowedValue)
   cpdef start(self)
 
+
 cdef class DoOperation(Operation):
   cdef object dofn_runner
   cdef Receiver dofn_receiver
   cdef object tagged_receivers
   cdef object side_input_maps
 
+
 cdef class CombineOperation(Operation):
   cdef object phased_combine_fn
 
-cdef class FlattenOperation(Operation):
-  pass
 
 cdef class PGBKCVOperation(Operation):
   cdef public object combine_fn
@@ -90,3 +92,6 @@ cdef class PGBKCVOperation(Operation):
 
   cpdef output_key(self, tuple wkey, value)
 
+
+cdef class FlattenOperation(Operation):
+  pass
diff --git a/sdks/python/apache_beam/runners/worker/operations.py 
b/sdks/python/apache_beam/runners/worker/operations.py
index 9aa29b8..729ec82 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -163,14 +163,14 @@ class Operation(object):
                                     self.consumers[i], coder)
                         for i, coder in enumerate(self.spec.output_coders)]
 
-  def finish(self):
-    """Finish operation."""
-    pass
-
   def process(self, o):
     """Process element in operation."""
     pass
 
+  def finish(self):
+    """Finish operation."""
+    pass
+
   def output(self, windowed_value, output_index=0):
     cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
 
@@ -395,14 +395,14 @@ class DoOperation(Operation):
 
       self.dofn_runner.start()
 
-  def finish(self):
-    with self.scoped_finish_state:
-      self.dofn_runner.finish()
-
   def process(self, o):
     with self.scoped_process_state:
       self.dofn_receiver.receive(o)
 
+  def finish(self):
+    with self.scoped_finish_state:
+      self.dofn_runner.finish()
+
   def progress_metrics(self):
     metrics = super(DoOperation, self).progress_metrics()
     if self.tagged_receivers:
@@ -435,9 +435,6 @@ class CombineOperation(Operation):
     self.phased_combine_fn = (
         PhasedCombineFnExecutor(self.spec.phase, fn, args, kwargs))
 
-  def finish(self):
-    logging.debug('Finishing %s', self)
-
   def process(self, o):
     with self.scoped_process_state:
       if self.debug_logging_enabled:
@@ -446,6 +443,9 @@ class CombineOperation(Operation):
       self.output(
           o.with_value((key, self.phased_combine_fn.apply(values))))
 
+  def finish(self):
+    logging.debug('Finishing %s', self)
+
 
 def create_pgbk_op(step_name, spec, counter_factory, state_sampler):
   if spec.combine_fn:

Reply via email to