[
https://issues.apache.org/jira/browse/BEAM-5315?focusedWorklogId=155643&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-155643
]
ASF GitHub Bot logged work on BEAM-5315:
----------------------------------------
Author: ASF GitHub Bot
Created on: 17/Oct/18 23:19
Start Date: 17/Oct/18 23:19
Worklog Time Spent: 10m
Work Description: manuzhang closed pull request #6728: [BEAM-5315]
Partially port IO; fixing well-documented errors
URL: https://github.com/apache/beam/pull/6728
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/io/avroio_test.py
b/sdks/python/apache_beam/io/avroio_test.py
index 6346c9b1b65..b7a2e1b67dc 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -363,7 +363,7 @@ def test_corrupted_file(self):
# the last sync_marker.
last_char_index = len(data) - 1
corrupted_data = data[:last_char_index]
- corrupted_data += 'A' if data[last_char_index] == 'B' else 'B'
+ corrupted_data += b'A' if data[last_char_index] == b'B' else b'B'
with tempfile.NamedTemporaryFile(
delete=False, prefix=tempfile.template) as f:
f.write(corrupted_data)
diff --git a/sdks/python/apache_beam/io/filebasedsink_test.py
b/sdks/python/apache_beam/io/filebasedsink_test.py
index 39e2b8377fd..5069da593bf 100644
--- a/sdks/python/apache_beam/io/filebasedsink_test.py
+++ b/sdks/python/apache_beam/io/filebasedsink_test.py
@@ -76,24 +76,20 @@ def _create_temp_file(self, name='', suffix=''):
class MyFileBasedSink(filebasedsink.FileBasedSink):
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.'
- 'TODO: BEAM-5627, TODO:5618')
def open(self, temp_path):
# TODO: Fix main session pickling.
# file_handle = super(MyFileBasedSink, self).open(temp_path)
file_handle = filebasedsink.FileBasedSink.open(self, temp_path)
- file_handle.write('[start]')
+ file_handle.write(b'[start]')
return file_handle
def write_encoded_record(self, file_handle, encoded_value):
- file_handle.write('[')
+ file_handle.write(b'[')
file_handle.write(encoded_value)
- file_handle.write(']')
+ file_handle.write(b']')
def close(self, file_handle):
- file_handle.write('[end]')
+ file_handle.write(b'[end]')
# TODO: Fix main session pickling.
# file_handle = super(MyFileBasedSink, self).close(file_handle)
file_handle = filebasedsink.FileBasedSink.close(self, file_handle)
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py
b/sdks/python/apache_beam/io/filebasedsource_test.py
index bedbf46c9e4..4056827527c 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -74,7 +74,7 @@ def read_records(self, file_name, range_tracker):
while line:
if not range_tracker.try_claim(current):
return
- yield line.rstrip('\n')
+ yield line.rstrip(b'\n')
current += len(line)
line = f.readline()
finally:
@@ -186,6 +186,12 @@ def read(self, range_tracker):
def estimate_size(self):
return len(self._values) # Assuming each value to be 1 byte.
+ @classmethod
+ def setUpClass(cls):
+ # Method has been renamed in Python 3
+ if sys.version_info[0] < 3:
+ cls.assertCountEqual = cls.assertItemsEqual
+
def setUp(self):
# Reducing the size of thread pools. Without this test execution may fail
in
# environments with limited amount of resources.
@@ -197,7 +203,7 @@ def test_read(self):
concat = ConcatSource(sources)
range_tracker = concat.get_range_tracker(None, None)
read_data = [value for value in concat.read(range_tracker)]
- self.assertItemsEqual(list(range(30)), read_data)
+ self.assertCountEqual(list(range(30)), read_data)
def test_split(self):
sources = [TestConcatSource.DummySource(list(range(start, start + 10)))
@@ -214,7 +220,7 @@ def test_split(self):
split.stop_position)
read_data.extend([value for value in split.source.read(
range_tracker_for_split)])
- self.assertItemsEqual(list(range(30)), read_data)
+ self.assertCountEqual(list(range(30)), read_data)
def test_estimate_size(self):
sources = [TestConcatSource.DummySource(range(start, start + 10)) for start
@@ -225,6 +231,12 @@ def test_estimate_size(self):
class TestFileBasedSource(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ # Method has been renamed in Python 3
+ if sys.version_info[0] < 3:
+ cls.assertCountEqual = cls.assertItemsEqual
+
def setUp(self):
# Reducing the size of thread pools. Without this test execution may fail
in
# environments with limited amount of resources.
@@ -277,7 +289,7 @@ def test_fully_read_single_file(self):
fbs = LineSource(file_name)
range_tracker = fbs.get_range_tracker(None, None)
read_data = [record for record in fbs.read(range_tracker)]
- self.assertItemsEqual(expected_data, read_data)
+ self.assertCountEqual(expected_data, read_data)
def test_single_file_display_data(self):
file_name, _ = write_data(10)
@@ -295,7 +307,7 @@ def test_fully_read_file_pattern(self):
fbs = LineSource(pattern)
range_tracker = fbs.get_range_tracker(None, None)
read_data = [record for record in fbs.read(range_tracker)]
- self.assertItemsEqual(expected_data, read_data)
+ self.assertCountEqual(expected_data, read_data)
def test_fully_read_file_pattern_with_empty_files(self):
pattern, expected_data = write_pattern([5, 0, 12, 0, 8, 0])
@@ -303,7 +315,7 @@ def test_fully_read_file_pattern_with_empty_files(self):
fbs = LineSource(pattern)
range_tracker = fbs.get_range_tracker(None, None)
read_data = [record for record in fbs.read(range_tracker)]
- self.assertItemsEqual(expected_data, read_data)
+ self.assertCountEqual(expected_data, read_data)
def test_estimate_size_of_file(self):
file_name, expected_data = write_data(10)
@@ -375,7 +387,7 @@ def test_read_splits_single_file(self):
data_from_split = [data for data in source.read(range_tracker)]
read_data.extend(data_from_split)
- self.assertItemsEqual(expected_data, read_data)
+ self.assertCountEqual(expected_data, read_data)
def test_read_splits_file_pattern(self):
pattern, expected_data = write_pattern([34, 66, 40, 24, 24, 12])
@@ -392,7 +404,7 @@ def test_read_splits_file_pattern(self):
data_from_split = [data for data in source.read(range_tracker)]
read_data.extend(data_from_split)
- self.assertItemsEqual(expected_data, read_data)
+ self.assertCountEqual(expected_data, read_data)
def _run_source_test(self, pattern, expected_data, splittable=True):
pipeline = TestPipeline()
@@ -554,7 +566,7 @@ def
test_read_auto_pattern_compressed_and_uncompressed(self):
f.write('\n'.join(c))
chunks_to_write.append(out.getvalue())
else:
- chunks_to_write.append('\n'.join(c))
+ chunks_to_write.append(b'\n'.join(c))
file_pattern = write_prepared_pattern(chunks_to_write,
suffixes=(['.gz', '']*3))
pipeline = TestPipeline()
diff --git a/sdks/python/apache_beam/io/filesystemio_test.py
b/sdks/python/apache_beam/io/filesystemio_test.py
index 6db78e7d2cc..f11e1c45623 100644
--- a/sdks/python/apache_beam/io/filesystemio_test.py
+++ b/sdks/python/apache_beam/io/filesystemio_test.py
@@ -182,7 +182,7 @@ def _read_and_verify(self, stream, expected, buffer_size):
def test_pipe_stream(self):
block_sizes = list(4**i for i in range(0, 12))
data_blocks = list(os.urandom(size) for size in block_sizes)
- expected = ''.join(data_blocks)
+ expected = b''.join(data_blocks)
buffer_sizes = [100001, 512 * 1024, 1024 * 1024]
diff --git a/sdks/python/apache_beam/io/textio_test.py
b/sdks/python/apache_beam/io/textio_test.py
index 218e154c640..2ed3b489494 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -487,7 +487,7 @@ def test_read_corrupted_bzip2_fails(self):
with TempDir() as tempdir:
file_name = tempdir.create_temp_file()
with bz2.BZ2File(file_name, 'wb') as f:
- f.write('\n'.join(lines))
+ f.write(b'\n'.join(lines))
with open(file_name, 'wb') as f:
f.write('corrupt')
@@ -566,7 +566,7 @@ def test_read_corrupted_gzip_fails(self):
with TempDir() as tempdir:
file_name = tempdir.create_temp_file()
with gzip.GzipFile(file_name, 'wb') as f:
- f.write('\n'.join(lines))
+ f.write(b'\n'.join(lines))
with open(file_name, 'wb') as f:
f.write('corrupt')
diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py
b/sdks/python/apache_beam/io/tfrecordio_test.py
index f32d46a8cc0..6421fa5c591 100644
--- a/sdks/python/apache_beam/io/tfrecordio_test.py
+++ b/sdks/python/apache_beam/io/tfrecordio_test.py
@@ -132,7 +132,7 @@ def test_write_record(self):
def test_read_record(self):
actual = _TFRecordUtil.read_record(self._as_file_handle(self.record))
- self.assertEqual('foo', actual)
+ self.assertEqual(b'foo', actual)
def test_read_record_invalid_record(self):
self._test_error('bar', 'Not a valid TFRecord. Fewer than 12 bytes')
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 155643)
Time Spent: 4.5h (was: 4h 20m)
> Finish Python 3 porting for io module
> -------------------------------------
>
> Key: BEAM-5315
> URL: https://issues.apache.org/jira/browse/BEAM-5315
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Robbe
> Assignee: Simon
> Priority: Major
> Time Spent: 4.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)