This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 08f3ffa67358edfead752acb9b0fc3bf7e7cbe4a
Merge: 814a80a c635c63
Author: Robert Bradshaw <rober...@gmail.com>
AuthorDate: Tue Feb 13 16:02:27 2018 -0800

    Merge pull request #4570 from [BEAM-1251] Migrate away from xrange()
    
    [BEAM-1251] Migrate away from xrange()

 sdks/python/apache_beam/examples/complete/estimate_pi.py       |  2 +-
 .../examples/complete/juliaset/juliaset/juliaset.py            |  2 +-
 .../apache_beam/examples/cookbook/bigquery_side_input.py       |  2 +-
 sdks/python/apache_beam/examples/snippets/snippets.py          |  2 +-
 sdks/python/apache_beam/io/filebasedsink.py                    |  8 ++++----
 sdks/python/apache_beam/io/filebasedsource_test.py             | 10 +++++-----
 sdks/python/apache_beam/io/tfrecordio_test.py                  |  4 ++--
 sdks/python/apache_beam/runners/worker/opcounters_test.py      |  8 ++++----
 sdks/python/apache_beam/transforms/combiners_test.py           |  4 ++--
 sdks/python/apache_beam/transforms/trigger_test.py             |  2 +-
 sdks/python/apache_beam/typehints/native_type_compatibility.py |  2 +-
 11 files changed, 23 insertions(+), 23 deletions(-)

diff --cc sdks/python/apache_beam/io/tfrecordio_test.py
index 5bc13ce,191c757..c540cfa
--- a/sdks/python/apache_beam/io/tfrecordio_test.py
+++ b/sdks/python/apache_beam/io/tfrecordio_test.py
@@@ -195,202 -203,127 +195,202 @@@ class TestTFRecordSink(unittest.TestCas
  class TestWriteToTFRecord(TestTFRecordSink):
  
    def test_write_record_gzip(self):
 -    file_path_prefix = os.path.join(self._new_tempdir(), 'result')
 -    with TestPipeline() as p:
 -      input_data = ['foo', 'bar']
 -      _ = p | beam.Create(input_data) | WriteToTFRecord(
 -          file_path_prefix, compression_type=CompressionTypes.GZIP)
 -
 -    actual = []
 -    file_name = glob.glob(file_path_prefix + '-*')[0]
 -    for r in tf.python_io.tf_record_iterator(
 -        file_name, options=tf.python_io.TFRecordOptions(
 -            tf.python_io.TFRecordCompressionType.GZIP)):
 -      actual.append(r)
 -    self.assertEqual(actual, input_data)
 +    with TempDir() as temp_dir:
 +      file_path_prefix = temp_dir.create_temp_file('result')
 +      with TestPipeline() as p:
 +        input_data = ['foo', 'bar']
 +        _ = p | beam.Create(input_data) | WriteToTFRecord(
 +            file_path_prefix, compression_type=CompressionTypes.GZIP)
 +
 +      actual = []
 +      file_name = glob.glob(file_path_prefix + '-*')[0]
 +      for r in tf.python_io.tf_record_iterator(
 +          file_name, options=tf.python_io.TFRecordOptions(
 +              tf.python_io.TFRecordCompressionType.GZIP)):
 +        actual.append(r)
 +      self.assertEqual(actual, input_data)
  
    def test_write_record_auto(self):
 -    file_path_prefix = os.path.join(self._new_tempdir(), 'result')
 -    with TestPipeline() as p:
 -      input_data = ['foo', 'bar']
 -      _ = p | beam.Create(input_data) | WriteToTFRecord(
 -          file_path_prefix, file_name_suffix='.gz')
 +    with TempDir() as temp_dir:
 +      file_path_prefix = temp_dir.create_temp_file('result')
 +      with TestPipeline() as p:
 +        input_data = ['foo', 'bar']
 +        _ = p | beam.Create(input_data) | WriteToTFRecord(
 +            file_path_prefix, file_name_suffix='.gz')
  
 -    actual = []
 -    file_name = glob.glob(file_path_prefix + '-*.gz')[0]
 -    for r in tf.python_io.tf_record_iterator(
 -        file_name, options=tf.python_io.TFRecordOptions(
 -            tf.python_io.TFRecordCompressionType.GZIP)):
 -      actual.append(r)
 -    self.assertEqual(actual, input_data)
 +      actual = []
 +      file_name = glob.glob(file_path_prefix + '-*.gz')[0]
 +      for r in tf.python_io.tf_record_iterator(
 +          file_name, options=tf.python_io.TFRecordOptions(
 +              tf.python_io.TFRecordCompressionType.GZIP)):
 +        actual.append(r)
 +      self.assertEqual(actual, input_data)
  
  
 -class TestTFRecordSource(_TestCaseWithTempDirCleanUp):
 -
 -  def _write_file(self, path, base64_records):
 -    record = binascii.a2b_base64(base64_records)
 -    with open(path, 'wb') as f:
 -      f.write(record)
 -
 -  def _write_file_gzip(self, path, base64_records):
 -    record = binascii.a2b_base64(base64_records)
 -    with gzip.GzipFile(path, 'wb') as f:
 -      f.write(record)
 +class TestReadFromTFRecord(unittest.TestCase):
  
    def test_process_single(self):
 -    path = os.path.join(self._new_tempdir(), 'result')
 -    self._write_file(path, FOO_RECORD_BASE64)
 -    with TestPipeline() as p:
 -      result = (p
 -                | beam.io.Read(
 -                    _TFRecordSource(
 -                        path,
 -                        coder=coders.BytesCoder(),
 -                        compression_type=CompressionTypes.AUTO,
 -                        validate=True)))
 -      assert_that(result, equal_to(['foo']))
 +    with TempDir() as temp_dir:
 +      path = temp_dir.create_temp_file('result')
 +      _write_file(path, FOO_RECORD_BASE64)
 +      with TestPipeline() as p:
 +        result = (p
 +                  | ReadFromTFRecord(
 +                      path,
 +                      coder=coders.BytesCoder(),
 +                      compression_type=CompressionTypes.AUTO,
 +                      validate=True))
 +        assert_that(result, equal_to(['foo']))
  
    def test_process_multiple(self):
 -    path = os.path.join(self._new_tempdir(), 'result')
 -    self._write_file(path, FOO_BAR_RECORD_BASE64)
 -    with TestPipeline() as p:
 -      result = (p
 -                | beam.io.Read(
 -                    _TFRecordSource(
 -                        path,
 -                        coder=coders.BytesCoder(),
 -                        compression_type=CompressionTypes.AUTO,
 -                        validate=True)))
 -      assert_that(result, equal_to(['foo', 'bar']))
 +    with TempDir() as temp_dir:
 +      path = temp_dir.create_temp_file('result')
 +      _write_file(path, FOO_BAR_RECORD_BASE64)
 +      with TestPipeline() as p:
 +        result = (p
 +                  | ReadFromTFRecord(
 +                      path,
 +                      coder=coders.BytesCoder(),
 +                      compression_type=CompressionTypes.AUTO,
 +                      validate=True))
 +        assert_that(result, equal_to(['foo', 'bar']))
  
    def test_process_gzip(self):
 -    path = os.path.join(self._new_tempdir(), 'result')
 -    self._write_file_gzip(path, FOO_BAR_RECORD_BASE64)
 -    with TestPipeline() as p:
 -      result = (p
 -                | beam.io.Read(
 -                    _TFRecordSource(
 -                        path,
 -                        coder=coders.BytesCoder(),
 -                        compression_type=CompressionTypes.GZIP,
 -                        validate=True)))
 -      assert_that(result, equal_to(['foo', 'bar']))
 +    with TempDir() as temp_dir:
 +      path = temp_dir.create_temp_file('result')
 +      _write_file_gzip(path, FOO_BAR_RECORD_BASE64)
 +      with TestPipeline() as p:
 +        result = (p
 +                  | ReadFromTFRecord(
 +                      path,
 +                      coder=coders.BytesCoder(),
 +                      compression_type=CompressionTypes.GZIP,
 +                      validate=True))
 +        assert_that(result, equal_to(['foo', 'bar']))
  
    def test_process_auto(self):
 -    path = os.path.join(self._new_tempdir(), 'result.gz')
 -    self._write_file_gzip(path, FOO_BAR_RECORD_BASE64)
 -    with TestPipeline() as p:
 -      result = (p
 -                | beam.io.Read(
 -                    _TFRecordSource(
 -                        path,
 -                        coder=coders.BytesCoder(),
 -                        compression_type=CompressionTypes.AUTO,
 -                        validate=True)))
 -      assert_that(result, equal_to(['foo', 'bar']))
 +    with TempDir() as temp_dir:
 +      path = temp_dir.create_temp_file('result.gz')
 +      _write_file_gzip(path, FOO_BAR_RECORD_BASE64)
 +      with TestPipeline() as p:
 +        result = (p
 +                  | ReadFromTFRecord(
 +                      path,
 +                      coder=coders.BytesCoder(),
 +                      compression_type=CompressionTypes.AUTO,
 +                      validate=True))
 +        assert_that(result, equal_to(['foo', 'bar']))
 +
 +  def test_process_gzip(self):
 +    with TempDir() as temp_dir:
 +      path = temp_dir.create_temp_file('result')
 +      _write_file_gzip(path, FOO_BAR_RECORD_BASE64)
 +      with TestPipeline() as p:
 +        result = (p
 +                  | ReadFromTFRecord(
 +                      path, compression_type=CompressionTypes.GZIP))
 +        assert_that(result, equal_to(['foo', 'bar']))
 +
 +  def test_process_gzip_auto(self):
 +    with TempDir() as temp_dir:
 +      path = temp_dir.create_temp_file('result.gz')
 +      _write_file_gzip(path, FOO_BAR_RECORD_BASE64)
 +      with TestPipeline() as p:
 +        result = (p
 +                  | ReadFromTFRecord(
 +                      path, compression_type=CompressionTypes.AUTO))
 +        assert_that(result, equal_to(['foo', 'bar']))
 +
  
 +class TestReadAllFromTFRecord(unittest.TestCase):
  
 -class TestReadFromTFRecordSource(TestTFRecordSource):
 +  def _write_glob(self, temp_dir, suffix):
 +    for _ in range(3):
 +      path = temp_dir.create_temp_file(suffix)
 +      _write_file(path, FOO_BAR_RECORD_BASE64)
 +
 +  def test_process_single(self):
 +    with TempDir() as temp_dir:
 +      path = temp_dir.create_temp_file('result')
 +      _write_file(path, FOO_RECORD_BASE64)
 +      with TestPipeline() as p:
 +        result = (p
 +                  | Create([path])
 +                  | ReadAllFromTFRecord(
 +                      coder=coders.BytesCoder(),
 +                      compression_type=CompressionTypes.AUTO))
 +        assert_that(result, equal_to(['foo']))
 +
 +  def test_process_multiple(self):
 +    with TempDir() as temp_dir:
 +      path = temp_dir.create_temp_file('result')
 +      _write_file(path, FOO_BAR_RECORD_BASE64)
 +      with TestPipeline() as p:
 +        result = (p
 +                  | Create([path])
 +                  | ReadAllFromTFRecord(
 +                      coder=coders.BytesCoder(),
 +                      compression_type=CompressionTypes.AUTO))
 +        assert_that(result, equal_to(['foo', 'bar']))
 +
 +  def test_process_glob(self):
 +    with TempDir() as temp_dir:
 +      self._write_glob(temp_dir, 'result')
 +      glob = temp_dir.get_path() + os.path.sep + '*result'
 +      with TestPipeline() as p:
 +        result = (p
 +                  | Create([glob])
 +                  | ReadAllFromTFRecord(
 +                      coder=coders.BytesCoder(),
 +                      compression_type=CompressionTypes.AUTO))
 +        assert_that(result, equal_to(['foo', 'bar'] * 3))
 +
 +  def test_process_multiple_globs(self):
 +    with TempDir() as temp_dir:
 +      globs = []
 +      for i in range(3):
 +        suffix = 'result' + str(i)
 +        self._write_glob(temp_dir, suffix)
 +        globs.append(temp_dir.get_path() + os.path.sep + '*' + suffix)
 +
 +      with TestPipeline() as p:
 +        result = (p
 +                  | Create(globs)
 +                  | ReadAllFromTFRecord(
 +                      coder=coders.BytesCoder(),
 +                      compression_type=CompressionTypes.AUTO))
 +        assert_that(result, equal_to(['foo', 'bar'] * 9))
  
    def test_process_gzip(self):
 -    path = os.path.join(self._new_tempdir(), 'result')
 -    self._write_file_gzip(path, FOO_BAR_RECORD_BASE64)
 -    with TestPipeline() as p:
 -      result = (p
 -                | ReadFromTFRecord(
 -                    path, compression_type=CompressionTypes.GZIP))
 -      assert_that(result, equal_to(['foo', 'bar']))
 +    with TempDir() as temp_dir:
 +      path = temp_dir.create_temp_file('result')
 +      _write_file_gzip(path, FOO_BAR_RECORD_BASE64)
 +      with TestPipeline() as p:
 +        result = (p
 +                  | Create([path])
 +                  | ReadAllFromTFRecord(
 +                      coder=coders.BytesCoder(),
 +                      compression_type=CompressionTypes.GZIP))
 +        assert_that(result, equal_to(['foo', 'bar']))
  
 -  def test_process_gzip_auto(self):
 -    path = os.path.join(self._new_tempdir(), 'result.gz')
 -    self._write_file_gzip(path, FOO_BAR_RECORD_BASE64)
 -    with TestPipeline() as p:
 -      result = (p
 -                | ReadFromTFRecord(
 -                    path, compression_type=CompressionTypes.AUTO))
 -      assert_that(result, equal_to(['foo', 'bar']))
 +  def test_process_auto(self):
 +    with TempDir() as temp_dir:
 +      path = temp_dir.create_temp_file('result.gz')
 +      _write_file_gzip(path, FOO_BAR_RECORD_BASE64)
 +      with TestPipeline() as p:
 +        result = (p
 +                  | Create([path])
 +                  | ReadAllFromTFRecord(
 +                      coder=coders.BytesCoder(),
 +                      compression_type=CompressionTypes.AUTO))
 +        assert_that(result, equal_to(['foo', 'bar']))
  
  
 -class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp):
 +class TestEnd2EndWriteAndRead(unittest.TestCase):
  
    def create_inputs(self):
-     input_array = [[random.random() - 0.5 for _ in xrange(15)]
-                    for _ in xrange(12)]
+     input_array = [[random.random() - 0.5 for _ in range(15)]
+                    for _ in range(12)]
      memfile = cStringIO.StringIO()
      pickle.dump(input_array, memfile)
      return memfile.getvalue()

-- 
To stop receiving notification emails like this one, please contact
rober...@apache.org.

Reply via email to