Repository: beam Updated Branches: refs/heads/master a67019739 -> e5507d827
Adding validatesrunner test for sources Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9b6e74e8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9b6e74e8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9b6e74e8 Branch: refs/heads/master Commit: 9b6e74e8bd0ab5c357e09c0b8ea245ba8dc7ad5c Parents: a670197 Author: Pablo <[email protected]> Authored: Wed Apr 19 09:44:54 2017 -0700 Committer: [email protected] <[email protected]> Committed: Sun Apr 23 20:23:20 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/examples/snippets/snippets.py | 98 ++++++++++---------- .../apache_beam/transforms/ptransform_test.py | 27 ++++++ 2 files changed, 78 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9b6e74e8/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 85ab864..c566914 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -570,6 +570,57 @@ def examples_wordcount_debugging(renames): p.run() +import apache_beam as beam +from apache_beam.io import iobase +from apache_beam.io.range_trackers import OffsetRangeTracker +from apache_beam.transforms.core import PTransform +from apache_beam.utils.pipeline_options import PipelineOptions + + +# Defining a new source. +# [START model_custom_source_new_source] +class CountingSource(iobase.BoundedSource): + + def __init__(self, count): + self.records_read = Metrics.counter(self.__class__, 'recordsRead') + self._count = count + + def estimate_size(self): + return self._count + + def get_range_tracker(self, start_position, stop_position): + if start_position is None: + start_position = 0 + if stop_position is None: + stop_position = self._count + + return OffsetRangeTracker(start_position, stop_position) + + def read(self, range_tracker): + for i in range(self._count): + if not range_tracker.try_claim(i): + return + self.records_read.inc() + yield i + + def split(self, desired_bundle_size, start_position=None, + stop_position=None): + if start_position is None: + start_position = 0 + if stop_position is None: + stop_position = self._count + + bundle_start = start_position + while bundle_start < self._count: + bundle_stop = max(self._count, bundle_start + desired_bundle_size) + yield iobase.SourceBundle(weight=(bundle_stop - bundle_start), + source=self, + start_position=bundle_start, + stop_position=bundle_stop) + bundle_start = bundle_stop +# [END model_custom_source_new_source] + + def model_custom_source(count): """Demonstrates creating a new custom source and using it in a pipeline. @@ -595,53 +646,6 @@ def model_custom_source(count): """ - import apache_beam as beam - from apache_beam.io import iobase - from apache_beam.io.range_trackers import OffsetRangeTracker - from apache_beam.transforms.core import PTransform - from apache_beam.utils.pipeline_options import PipelineOptions - - # Defining a new source. - # [START model_custom_source_new_source] - class CountingSource(iobase.BoundedSource): - - def __init__(self, count): - self._count = count - - def estimate_size(self): - return self._count - - def get_range_tracker(self, start_position, stop_position): - if start_position is None: - start_position = 0 - if stop_position is None: - stop_position = self._count - - return OffsetRangeTracker(start_position, stop_position) - - def read(self, range_tracker): - for i in range(self._count): - if not range_tracker.try_claim(i): - return - yield i - - def split(self, desired_bundle_size, start_position=None, - stop_position=None): - if start_position is None: - start_position = 0 - if stop_position is None: - stop_position = self._count - - bundle_start = start_position - while bundle_start < self._count: - bundle_stop = max(self._count, bundle_start + desired_bundle_size) - yield iobase.SourceBundle(weight=(bundle_stop - bundle_start), - source=self, - start_position=bundle_start, - stop_position=bundle_stop) - bundle_start = bundle_stop - # [END model_custom_source_new_source] - # Using the source in an example pipeline. # [START model_custom_source_use_new_source] p = beam.Pipeline(options=PipelineOptions()) http://git-wip-us.apache.org/repos/asf/beam/blob/9b6e74e8/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 78277c2..303dfb8 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -27,6 +27,9 @@ import hamcrest as hc from nose.plugins.attrib import attr import apache_beam as beam +from apache_beam.metrics import Metrics +from apache_beam.metrics.metric import MetricsFilter +from apache_beam.io.iobase import Read from apache_beam.test_pipeline import TestPipeline import apache_beam.pvalue as pvalue import apache_beam.transforms.combiners as combine @@ -169,6 +172,30 @@ class PTransformTest(unittest.TestCase): pipeline.run() @attr('ValidatesRunner') + def test_read_from_text_metrics(self): + from apache_beam.examples.snippets.snippets import CountingSource + + class CounterDoFn(beam.DoFn): + def __init__(self): + self.received_records = Metrics.counter(self.__class__, + 'receivedRecords') + + def process(self, element): + self.received_records.inc() + + pipeline = TestPipeline() + (pipeline | Read(CountingSource(100)) | beam.ParDo(CounterDoFn())) + res = pipeline.run() + res.wait_until_finish() + metric_results = res.metrics().query(MetricsFilter() + .with_name('recordsRead')) + outputs_counter = metric_results['counters'][0] + self.assertEqual(outputs_counter.key.step, 'Read') + self.assertEqual(outputs_counter.key.metric.name, 'recordsRead') + self.assertEqual(outputs_counter.committed, 100) + self.assertEqual(outputs_counter.attempted, 100) + + @attr('ValidatesRunner') def test_par_do_with_multiple_outputs_and_using_yield(self): class SomeDoFn(beam.DoFn): """A custom DoFn using yield."""
