Repository: beam Updated Branches: refs/heads/master e26bfbe0a -> 4121ec490
Remove vestigial Read and Write from core.py Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c25380be Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c25380be Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c25380be Branch: refs/heads/master Commit: c25380be883405862e3620cecf3aa6b00945ec4b Parents: e26bfbe Author: Robert Bradshaw <[email protected]> Authored: Tue Apr 18 16:23:51 2017 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Thu Apr 20 08:59:51 2017 -0700 ---------------------------------------------------------------------- .../examples/cookbook/bigquery_side_input.py | 2 +- .../apache_beam/examples/cookbook/filters.py | 2 +- sdks/python/apache_beam/io/concat_source_test.py | 2 +- .../python/apache_beam/io/filebasedsource_test.py | 18 +++++++++--------- sdks/python/apache_beam/io/sources_test.py | 2 +- sdks/python/apache_beam/io/tfrecordio_test.py | 8 ++++---- sdks/python/apache_beam/pipeline_test.py | 2 +- sdks/python/apache_beam/transforms/core.py | 13 ++----------- 8 files changed, 20 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c25380be/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py index 486cc88..f68c95d 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py @@ -101,7 +101,7 @@ def run(argv=None): pcoll_corpus = p | 'read corpus' >> beam.io.Read( beam.io.BigQuerySource(query=query_corpus)) - pcoll_word = p | 'read_words' >> beam.Read( + pcoll_word = p | 'read_words' >> beam.io.Read( beam.io.BigQuerySource(query=query_word)) pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create( [ignore_corpus]) http://git-wip-us.apache.org/repos/asf/beam/blob/c25380be/sdks/python/apache_beam/examples/cookbook/filters.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py index d13d823..374001c 100644 --- a/sdks/python/apache_beam/examples/cookbook/filters.py +++ b/sdks/python/apache_beam/examples/cookbook/filters.py @@ -88,7 +88,7 @@ def run(argv=None): p = beam.Pipeline(argv=pipeline_args) - input_data = p | beam.Read(beam.io.BigQuerySource(known_args.input)) + input_data = p | beam.io.Read(beam.io.BigQuerySource(known_args.input)) # pylint: disable=expression-not-assigned (filter_cold_days(input_data, known_args.month_filter) http://git-wip-us.apache.org/repos/asf/beam/blob/c25380be/sdks/python/apache_beam/io/concat_source_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py index 77d2647..7c16e63 100644 --- a/sdks/python/apache_beam/io/concat_source_test.py +++ b/sdks/python/apache_beam/io/concat_source_test.py @@ -214,7 +214,7 @@ class ConcatSourceTest(unittest.TestCase): RangeSource(100, 1000), ]) pipeline = TestPipeline() - pcoll = pipeline | beam.Read(source) + pcoll = pipeline | beam.io.Read(source) assert_that(pcoll, equal_to(range(1000))) pipeline.run() http://git-wip-us.apache.org/repos/asf/beam/blob/c25380be/sdks/python/apache_beam/io/filebasedsource_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py index e681f26..5318c4d 100644 --- a/sdks/python/apache_beam/io/filebasedsource_test.py +++ b/sdks/python/apache_beam/io/filebasedsource_test.py @@ -389,7 +389,7 @@ class TestFileBasedSource(unittest.TestCase): def _run_source_test(self, pattern, expected_data, splittable=True): pipeline = TestPipeline() - pcoll = pipeline | 'Read' >> beam.Read(LineSource( + pcoll = pipeline | 'Read' >> beam.io.Read(LineSource( pattern, splittable=splittable)) assert_that(pcoll, equal_to(expected_data)) pipeline.run() @@ -429,7 +429,7 @@ class TestFileBasedSource(unittest.TestCase): f.write('\n'.join(lines)) pipeline = TestPipeline() - pcoll = pipeline | 'Read' >> beam.Read(LineSource( + pcoll = pipeline | 'Read' >> beam.io.Read(LineSource( filename, splittable=False, compression_type=CompressionTypes.BZIP2)) @@ -444,7 +444,7 @@ class TestFileBasedSource(unittest.TestCase): f.write('\n'.join(lines)) pipeline = TestPipeline() - pcoll = pipeline | 'Read' >> beam.Read(LineSource( + pcoll = pipeline | 'Read' >> beam.io.Read(LineSource( filename, splittable=False, compression_type=CompressionTypes.GZIP)) @@ -462,7 +462,7 @@ class TestFileBasedSource(unittest.TestCase): compressobj.compress('\n'.join(c)) + compressobj.flush()) file_pattern = write_prepared_pattern(compressed_chunks) pipeline = TestPipeline() - pcoll = pipeline | 'Read' >> beam.Read(LineSource( + pcoll = pipeline | 'Read' >> beam.io.Read(LineSource( file_pattern, splittable=False, compression_type=CompressionTypes.BZIP2)) @@ -481,7 +481,7 @@ class TestFileBasedSource(unittest.TestCase): compressed_chunks.append(out.getvalue()) file_pattern = write_prepared_pattern(compressed_chunks) pipeline = TestPipeline() - pcoll = pipeline | 'Read' >> beam.Read(LineSource( + pcoll = pipeline | 'Read' >> beam.io.Read(LineSource( file_pattern, splittable=False, compression_type=CompressionTypes.GZIP)) @@ -496,7 +496,7 @@ class TestFileBasedSource(unittest.TestCase): f.write('\n'.join(lines)) pipeline = TestPipeline() - pcoll = pipeline | 'Read' >> beam.Read(LineSource( + pcoll = pipeline | 'Read' >> beam.io.Read(LineSource( filename, compression_type=CompressionTypes.AUTO)) assert_that(pcoll, equal_to(lines)) @@ -510,7 +510,7 @@ class TestFileBasedSource(unittest.TestCase): f.write('\n'.join(lines)) pipeline = TestPipeline() - pcoll = pipeline | 'Read' >> beam.Read(LineSource( + pcoll = pipeline | 'Read' >> beam.io.Read(LineSource( filename, compression_type=CompressionTypes.AUTO)) assert_that(pcoll, equal_to(lines)) @@ -529,7 +529,7 @@ class TestFileBasedSource(unittest.TestCase): file_pattern = write_prepared_pattern( compressed_chunks, suffixes=['.gz']*len(chunks)) pipeline = TestPipeline() - pcoll = pipeline | 'Read' >> beam.Read(LineSource( + pcoll = pipeline | 'Read' >> beam.io.Read(LineSource( file_pattern, compression_type=CompressionTypes.AUTO)) assert_that(pcoll, equal_to(lines)) @@ -551,7 +551,7 @@ class TestFileBasedSource(unittest.TestCase): file_pattern = write_prepared_pattern(chunks_to_write, suffixes=(['.gz', '']*3)) pipeline = TestPipeline() - pcoll = pipeline | 'Read' >> beam.Read(LineSource( + pcoll = pipeline | 'Read' >> beam.io.Read(LineSource( file_pattern, compression_type=CompressionTypes.AUTO)) assert_that(pcoll, equal_to(lines)) http://git-wip-us.apache.org/repos/asf/beam/blob/c25380be/sdks/python/apache_beam/io/sources_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py index dc0fd54..3f92756 100644 --- a/sdks/python/apache_beam/io/sources_test.py +++ b/sdks/python/apache_beam/io/sources_test.py @@ -100,7 +100,7 @@ class SourcesTest(unittest.TestCase): def test_run_direct(self): file_name = self._create_temp_file('aaaa\nbbbb\ncccc\ndddd') pipeline = TestPipeline() - pcoll = pipeline | beam.Read(LineSource(file_name)) + pcoll = pipeline | beam.io.Read(LineSource(file_name)) assert_that(pcoll, equal_to(['aaaa', 'bbbb', 'cccc', 'dddd'])) pipeline.run() http://git-wip-us.apache.org/repos/asf/beam/blob/c25380be/sdks/python/apache_beam/io/tfrecordio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py b/sdks/python/apache_beam/io/tfrecordio_test.py index 49f9639..d8c706e 100644 --- a/sdks/python/apache_beam/io/tfrecordio_test.py +++ b/sdks/python/apache_beam/io/tfrecordio_test.py @@ -248,7 +248,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp): self._write_file(path, FOO_RECORD_BASE64) with TestPipeline() as p: result = (p - | beam.Read( + | beam.io.Read( _TFRecordSource( path, coder=coders.BytesCoder(), @@ -261,7 +261,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp): self._write_file(path, FOO_BAR_RECORD_BASE64) with TestPipeline() as p: result = (p - | beam.Read( + | beam.io.Read( _TFRecordSource( path, coder=coders.BytesCoder(), @@ -274,7 +274,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp): self._write_file_gzip(path, FOO_BAR_RECORD_BASE64) with TestPipeline() as p: result = (p - | beam.Read( + | beam.io.Read( _TFRecordSource( path, coder=coders.BytesCoder(), @@ -287,7 +287,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp): self._write_file_gzip(path, FOO_BAR_RECORD_BASE64) with TestPipeline() as p: result = (p - | beam.Read( + | beam.io.Read( _TFRecordSource( path, coder=coders.BytesCoder(), http://git-wip-us.apache.org/repos/asf/beam/blob/c25380be/sdks/python/apache_beam/pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 6314609..05503bd 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -25,6 +25,7 @@ import unittest # from nose.plugins.attrib import attr import apache_beam as beam +from apache_beam.io import Read from apache_beam.metrics import Metrics from apache_beam.pipeline import Pipeline from apache_beam.pipeline import PipelineOptions @@ -39,7 +40,6 @@ from apache_beam.transforms import Map from apache_beam.transforms import DoFn from apache_beam.transforms import ParDo from apache_beam.transforms import PTransform -from apache_beam.transforms import Read from apache_beam.transforms import WindowInto from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to http://git-wip-us.apache.org/repos/asf/beam/blob/c25380be/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 9f66c39..4709056 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1364,13 +1364,14 @@ class Create(PTransform): return Union[[trivial_inference.instance_to_type(v) for v in self.value]] def expand(self, pbegin): + from apache_beam.io import iobase assert isinstance(pbegin, pvalue.PBegin) self.pipeline = pbegin.pipeline ouput_type = (self.get_type_hints().simple_output_type(self.label) or self.infer_output_type(None)) coder = typecoders.registry.get_coder(ouput_type) source = self._create_source_from_iterable(self.value, coder) - return pbegin.pipeline | Read(source).with_output_types(ouput_type) + return pbegin.pipeline | iobase.Read(source).with_output_types(ouput_type) def get_windowing(self, unused_inputs): return Windowing(GlobalWindows()) @@ -1458,13 +1459,3 @@ class Create(PTransform): return self._total_size return _CreateSource(serialized_values, coder) - - -def Read(*args, **kwargs): - from apache_beam import io - return io.Read(*args, **kwargs) - - -def Write(*args, **kwargs): - from apache_beam import io - return io.Write(*args, **kwargs)
