Repository: beam Updated Branches: refs/heads/master 0a8ac3528 -> 82c5e89ca
Clean up in textio and tfrecordio Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e05e6011 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e05e6011 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e05e6011 Branch: refs/heads/master Commit: e05e60117f03fbcb1ba99fd5205b04cd37a917da Parents: 0a8ac35 Author: Ahmet Altay <[email protected]> Authored: Fri Apr 7 11:21:17 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Fri Apr 7 11:46:59 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/textio.py | 1 - sdks/python/apache_beam/io/tfrecordio.py | 12 +++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e05e6011/sdks/python/apache_beam/io/textio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py index 8122fae..9217e74 100644 --- a/sdks/python/apache_beam/io/textio.py +++ b/sdks/python/apache_beam/io/textio.py @@ -385,7 +385,6 @@ class ReadFromText(PTransform): """ super(ReadFromText, self).__init__(**kwargs) - self._strip_trailing_newlines = strip_trailing_newlines self._source = _TextSource( file_pattern, min_bundle_size, compression_type, strip_trailing_newlines, coder, validate=validate, http://git-wip-us.apache.org/repos/asf/beam/blob/e05e6011/sdks/python/apache_beam/io/tfrecordio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/tfrecordio.py b/sdks/python/apache_beam/io/tfrecordio.py index 8b9d9ea..e2b41bf 100644 --- a/sdks/python/apache_beam/io/tfrecordio.py +++ b/sdks/python/apache_beam/io/tfrecordio.py @@ -201,10 +201,11 @@ class ReadFromTFRecord(PTransform): A ReadFromTFRecord transform object. """ super(ReadFromTFRecord, self).__init__(**kwargs) - self._args = (file_pattern, coder, compression_type, validate) + self._source = _TFRecordSource(file_pattern, coder, compression_type, + validate) def expand(self, pvalue): - return pvalue.pipeline | Read(_TFRecordSource(*self._args)) + return pvalue.pipeline | Read(self._source) class _TFRecordSink(fileio.FileSink): @@ -270,8 +271,9 @@ class WriteToTFRecord(PTransform): A WriteToTFRecord transform object. """ super(WriteToTFRecord, self).__init__(**kwargs) - self._args = (file_path_prefix, coder, file_name_suffix, num_shards, - shard_name_template, compression_type) + self._sink = _TFRecordSink(file_path_prefix, coder, file_name_suffix, + num_shards, shard_name_template, + compression_type) def expand(self, pcoll): - return pcoll | Write(_TFRecordSink(*self._args)) + return pcoll | Write(self._sink)
