Repository: beam
Updated Branches:
  refs/heads/master a67019739 -> e5507d827


Adding validatesrunner test for sources


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9b6e74e8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9b6e74e8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9b6e74e8

Branch: refs/heads/master
Commit: 9b6e74e8bd0ab5c357e09c0b8ea245ba8dc7ad5c
Parents: a670197
Author: Pablo <[email protected]>
Authored: Wed Apr 19 09:44:54 2017 -0700
Committer: [email protected] <[email protected]>
Committed: Sun Apr 23 20:23:20 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/examples/snippets/snippets.py   | 98 ++++++++++----------
 .../apache_beam/transforms/ptransform_test.py   | 27 ++++++
 2 files changed, 78 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9b6e74e8/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py 
b/sdks/python/apache_beam/examples/snippets/snippets.py
index 85ab864..c566914 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -570,6 +570,57 @@ def examples_wordcount_debugging(renames):
   p.run()
 
 
+import apache_beam as beam
+from apache_beam.io import iobase
+from apache_beam.io.range_trackers import OffsetRangeTracker
+from apache_beam.transforms.core import PTransform
+from apache_beam.utils.pipeline_options import PipelineOptions
+
+
+# Defining a new source.
+# [START model_custom_source_new_source]
+class CountingSource(iobase.BoundedSource):
+
+  def __init__(self, count):
+    self.records_read = Metrics.counter(self.__class__, 'recordsRead')
+    self._count = count
+
+  def estimate_size(self):
+    return self._count
+
+  def get_range_tracker(self, start_position, stop_position):
+    if start_position is None:
+      start_position = 0
+    if stop_position is None:
+      stop_position = self._count
+
+    return OffsetRangeTracker(start_position, stop_position)
+
+  def read(self, range_tracker):
+    for i in range(self._count):
+      if not range_tracker.try_claim(i):
+        return
+      self.records_read.inc()
+      yield i
+
+  def split(self, desired_bundle_size, start_position=None,
+            stop_position=None):
+    if start_position is None:
+      start_position = 0
+    if stop_position is None:
+      stop_position = self._count
+
+    bundle_start = start_position
+    while bundle_start < self._count:
+      bundle_stop = max(self._count, bundle_start + desired_bundle_size)
+      yield iobase.SourceBundle(weight=(bundle_stop - bundle_start),
+                                source=self,
+                                start_position=bundle_start,
+                                stop_position=bundle_stop)
+      bundle_start = bundle_stop
+# [END model_custom_source_new_source]
+
+
 def model_custom_source(count):
   """Demonstrates creating a new custom source and using it in a pipeline.
 
@@ -595,53 +646,6 @@ def model_custom_source(count):
 
   """
 
-  import apache_beam as beam
-  from apache_beam.io import iobase
-  from apache_beam.io.range_trackers import OffsetRangeTracker
-  from apache_beam.transforms.core import PTransform
-  from apache_beam.utils.pipeline_options import PipelineOptions
-
-  # Defining a new source.
-  # [START model_custom_source_new_source]
-  class CountingSource(iobase.BoundedSource):
-
-    def __init__(self, count):
-      self._count = count
-
-    def estimate_size(self):
-      return self._count
-
-    def get_range_tracker(self, start_position, stop_position):
-      if start_position is None:
-        start_position = 0
-      if stop_position is None:
-        stop_position = self._count
-
-      return OffsetRangeTracker(start_position, stop_position)
-
-    def read(self, range_tracker):
-      for i in range(self._count):
-        if not range_tracker.try_claim(i):
-          return
-        yield i
-
-    def split(self, desired_bundle_size, start_position=None,
-              stop_position=None):
-      if start_position is None:
-        start_position = 0
-      if stop_position is None:
-        stop_position = self._count
-
-      bundle_start = start_position
-      while bundle_start < self._count:
-        bundle_stop = max(self._count, bundle_start + desired_bundle_size)
-        yield iobase.SourceBundle(weight=(bundle_stop - bundle_start),
-                                  source=self,
-                                  start_position=bundle_start,
-                                  stop_position=bundle_stop)
-        bundle_start = bundle_stop
-  # [END model_custom_source_new_source]
-
   # Using the source in an example pipeline.
   # [START model_custom_source_use_new_source]
   p = beam.Pipeline(options=PipelineOptions())

http://git-wip-us.apache.org/repos/asf/beam/blob/9b6e74e8/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py 
b/sdks/python/apache_beam/transforms/ptransform_test.py
index 78277c2..303dfb8 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -27,6 +27,9 @@ import hamcrest as hc
 from nose.plugins.attrib import attr
 
 import apache_beam as beam
+from apache_beam.metrics import Metrics
+from apache_beam.metrics.metric import MetricsFilter
+from apache_beam.io.iobase import Read
 from apache_beam.test_pipeline import TestPipeline
 import apache_beam.pvalue as pvalue
 import apache_beam.transforms.combiners as combine
@@ -169,6 +172,30 @@ class PTransformTest(unittest.TestCase):
     pipeline.run()
 
   @attr('ValidatesRunner')
+  def test_read_from_text_metrics(self):
+    from apache_beam.examples.snippets.snippets import CountingSource
+
+    class CounterDoFn(beam.DoFn):
+      def __init__(self):
+        self.received_records = Metrics.counter(self.__class__,
+                                                'receivedRecords')
+
+      def process(self, element):
+        self.received_records.inc()
+
+    pipeline = TestPipeline()
+    (pipeline | Read(CountingSource(100)) | beam.ParDo(CounterDoFn()))
+    res = pipeline.run()
+    res.wait_until_finish()
+    metric_results = res.metrics().query(MetricsFilter()
+                                         .with_name('recordsRead'))
+    outputs_counter = metric_results['counters'][0]
+    self.assertEqual(outputs_counter.key.step, 'Read')
+    self.assertEqual(outputs_counter.key.metric.name, 'recordsRead')
+    self.assertEqual(outputs_counter.committed, 100)
+    self.assertEqual(outputs_counter.attempted, 100)
+
+  @attr('ValidatesRunner')
   def test_par_do_with_multiple_outputs_and_using_yield(self):
     class SomeDoFn(beam.DoFn):
       """A custom DoFn using yield."""

Reply via email to