[
https://issues.apache.org/jira/browse/BEAM-5315?focusedWorklogId=152895&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152895
]
ASF GitHub Bot logged work on BEAM-5315:
----------------------------------------
Author: ASF GitHub Bot
Created on: 09/Oct/18 19:58
Start Date: 09/Oct/18 19:58
Worklog Time Spent: 10m
Work Description: aaltay closed pull request #6590: [BEAM-5315] Partially
port io
URL: https://github.com/apache/beam/pull/6590
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/io/filebasedsink_test.py
b/sdks/python/apache_beam/io/filebasedsink_test.py
index 55e5a16dff8..39e2b8377fd 100644
--- a/sdks/python/apache_beam/io/filebasedsink_test.py
+++ b/sdks/python/apache_beam/io/filebasedsink_test.py
@@ -76,6 +76,10 @@ def _create_temp_file(self, name='', suffix=''):
class MyFileBasedSink(filebasedsink.FileBasedSink):
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3.'
+ 'TODO: BEAM-5627, TODO:5618')
def open(self, temp_path):
# TODO: Fix main session pickling.
# file_handle = super(MyFileBasedSink, self).open(temp_path)
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py
b/sdks/python/apache_beam/io/filebasedsource_test.py
index 0e57a04356d..bedbf46c9e4 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -52,6 +52,10 @@
class LineSource(FileBasedSource):
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def read_records(self, file_name, range_tracker):
f = self.open_file(file_name)
try:
diff --git a/sdks/python/apache_beam/io/filesystem_test.py
b/sdks/python/apache_beam/io/filesystem_test.py
index 876ba7a38b8..c3ab88cb386 100644
--- a/sdks/python/apache_beam/io/filesystem_test.py
+++ b/sdks/python/apache_beam/io/filesystem_test.py
@@ -26,6 +26,7 @@
import ntpath
import os
import posixpath
+import sys
import tempfile
import unittest
from builtins import range
@@ -287,6 +288,10 @@ def _create_temp_file(self):
self._tempfiles.append(path)
return path
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def _create_compressed_file(self, compression_type, content):
file_name = self._create_temp_file()
@@ -427,6 +432,10 @@ def test_read_and_seek_back_to_beginning(self):
self.assertEqual(first_pass, second_pass)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_tell(self):
lines = ['line%d\n' % i for i in range(10)]
tmpfile = self._create_temp_file()
diff --git a/sdks/python/apache_beam/io/filesystemio.py
b/sdks/python/apache_beam/io/filesystemio.py
index 086ae164aef..161f50f9331 100644
--- a/sdks/python/apache_beam/io/filesystemio.py
+++ b/sdks/python/apache_beam/io/filesystemio.py
@@ -214,7 +214,7 @@ def __init__(self, recv_pipe):
self.conn = recv_pipe
self.closed = False
self.position = 0
- self.remaining = ''
+ self.remaining = b''
def read(self, size):
"""Read data from the wrapped pipe connection.
@@ -239,7 +239,7 @@ def read(self, size):
self.remaining = self.conn.recv_bytes()
except EOFError:
break
- return ''.join(data_list)
+ return b''.join(data_list)
def tell(self):
"""Tell the file's current offset.
diff --git a/sdks/python/apache_beam/io/filesystemio_test.py
b/sdks/python/apache_beam/io/filesystemio_test.py
index 75079a539c4..6db78e7d2cc 100644
--- a/sdks/python/apache_beam/io/filesystemio_test.py
+++ b/sdks/python/apache_beam/io/filesystemio_test.py
@@ -22,6 +22,7 @@
import logging
import multiprocessing
import os
+import sys
import threading
import unittest
from builtins import range
@@ -74,9 +75,9 @@ def test_file_attributes(self):
self.assertTrue(stream.seekable())
def test_read_empty(self):
- downloader = FakeDownloader(data='')
+ downloader = FakeDownloader(data=b'')
stream = filesystemio.DownloaderStream(downloader)
- self.assertEqual(stream.read(), '')
+ self.assertEqual(stream.read(), b'')
def test_read(self):
data = 'abcde'
@@ -89,6 +90,10 @@ def test_read(self):
self.assertEqual(stream.read(), data[1:])
self.assertEqual(downloader.last_read_size, len(data) - 1)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_read_buffered(self):
data = 'abcde'
downloader = FakeDownloader(data)
@@ -102,6 +107,10 @@ def test_read_buffered(self):
self.assertEqual(stream.read(), data[1:])
[email protected](sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
class TestUploaderStream(unittest.TestCase):
def test_file_attributes(self):
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
index 3e3f51762f4..64872532db3 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
@@ -19,6 +19,7 @@
from __future__ import absolute_import
import errno
+import os
import random
import sys
import unittest
@@ -28,9 +29,12 @@
from mock import MagicMock
# pylint: disable=ungrouped-imports
-from apache_beam.io.gcp.datastore.v1 import fake_datastore
-from apache_beam.io.gcp.datastore.v1 import helper
-from apache_beam.testing.test_utils import patch_retry
+try: # TODO(BEAM-4543): googledatastore dependency does not work on Python 3.
+ from apache_beam.io.gcp.datastore.v1 import fake_datastore
+ from apache_beam.io.gcp.datastore.v1 import helper
+ from apache_beam.testing.test_utils import patch_retry
+except ImportError:
+ pass
# Protect against environments where apitools library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
@@ -48,6 +52,10 @@
# pylint: enable=ungrouped-imports
[email protected](sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-4543')
@unittest.skipIf(datastore_helper is None, 'GCP dependencies are not
installed')
class HelperTest(unittest.TestCase):
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py
b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py
index c5bfdd81cf8..617d317c788 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py
@@ -19,13 +19,19 @@
from __future__ import absolute_import
+import os
+import sys
import unittest
from mock import MagicMock
from mock import call
-from apache_beam.io.gcp.datastore.v1 import fake_datastore
-from apache_beam.io.gcp.datastore.v1 import query_splitter
+# pylint: disable=ungrouped-imports
+try: # TODO(BEAM-4543): googledatastore dependency does not work on Python 3.
+ from apache_beam.io.gcp.datastore.v1 import fake_datastore
+ from apache_beam.io.gcp.datastore.v1 import query_splitter
+except ImportError:
+ pass
# Protect against environments where datastore library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
@@ -36,8 +42,13 @@
except ImportError:
datastore_pb2 = None
# pylint: enable=wrong-import-order, wrong-import-position
+# pylint: enable=ungrouped-imports
[email protected](sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-4543')
@unittest.skipIf(datastore_pb2 is None, 'GCP dependencies are not installed')
class QuerySplitterTest(unittest.TestCase):
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py
b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index e0af7c962b5..10789d00e22 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -23,6 +23,7 @@
import logging
import os
import random
+import sys
import unittest
from builtins import object
from builtins import range
@@ -117,11 +118,15 @@ def get_range_callback(start, end):
download.GetRange = get_range_callback
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def Insert(self, insert_request, upload=None): # pylint:
disable=invalid-name
assert upload is not None
generation = self.get_last_generation(insert_request.bucket,
insert_request.name) + 1
- f = FakeFile(insert_request.bucket, insert_request.name, '', generation)
+ f = FakeFile(insert_request.bucket, insert_request.name, b'', generation)
# Stream data into file.
stream = upload.stream
@@ -131,7 +136,7 @@ def Insert(self, insert_request, upload=None): # pylint:
disable=invalid-name
if not data:
break
data_list.append(data)
- f.contents = ''.join(data_list)
+ f.contents = b''.join(data_list)
self.add_file(f)
@@ -484,7 +489,7 @@ def test_full_file_read(self):
self.assertEqual(f.mode, 'r')
f.seek(0, os.SEEK_END)
self.assertEqual(f.tell(), file_size)
- self.assertEqual(f.read(), '')
+ self.assertEqual(f.read(), b'')
f.seek(0)
self.assertEqual(f.read(), random_file.contents)
@@ -505,6 +510,10 @@ def test_file_random_seek(self):
f.read(end - start + 1), random_file.contents[start:end + 1])
self.assertEqual(f.tell(), end + 1)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_file_iterator(self):
file_name = 'gs://gcsio-test/iterating_file'
lines = []
@@ -526,6 +535,10 @@ def test_file_iterator(self):
self.assertEqual(read_lines, line_count)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_file_read_line(self):
file_name = 'gs://gcsio-test/read_line_file'
lines = []
@@ -578,6 +591,10 @@ def test_file_read_line(self):
f.seek(start)
self.assertEqual(f.readline(), lines[line_index][chars_left:])
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_file_write(self):
file_name = 'gs://gcsio-test/write_file'
file_size = 5 * 1024 * 1024 + 2000
@@ -592,6 +609,10 @@ def test_file_write(self):
self.assertEqual(
self.client.objects.get_file(bucket, name).contents, contents)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_file_close(self):
file_name = 'gs://gcsio-test/close_file'
file_size = 5 * 1024 * 1024 + 2000
@@ -605,6 +626,10 @@ def test_file_close(self):
self.assertEqual(
self.client.objects.get_file(bucket, name).contents, contents)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_file_flush(self):
file_name = 'gs://gcsio-test/flush_file'
file_size = 5 * 1024 * 1024 + 2000
diff --git a/sdks/python/apache_beam/io/hadoopfilesystem_test.py
b/sdks/python/apache_beam/io/hadoopfilesystem_test.py
index 8421c43e629..3dd94b7ed8c 100644
--- a/sdks/python/apache_beam/io/hadoopfilesystem_test.py
+++ b/sdks/python/apache_beam/io/hadoopfilesystem_test.py
@@ -21,6 +21,7 @@
import io
import logging
+import os
import posixpath
import sys
import unittest
@@ -321,6 +322,10 @@ def test_create_success(self):
expected_file = FakeFile(url, 'wb')
self.assertEqual(self._fake_hdfs.files[url], expected_file)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_create_write_read_compressed(self):
url = self.fs.join(self.tmpdir, 'new_file.gz')
@@ -358,6 +363,10 @@ def _cmpfiles(self, url1, url2):
data2 = f2.read()
return data1 == data2
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_copy_file(self):
url1 = self.fs.join(self.tmpdir, 'new_file1')
url2 = self.fs.join(self.tmpdir, 'new_file2')
@@ -368,6 +377,10 @@ def test_copy_file(self):
self.assertTrue(self._cmpfiles(url1, url2))
self.assertTrue(self._cmpfiles(url1, url3))
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_copy_file_overwrite_error(self):
url1 = self.fs.join(self.tmpdir, 'new_file1')
url2 = self.fs.join(self.tmpdir, 'new_file2')
@@ -379,6 +392,10 @@ def test_copy_file_overwrite_error(self):
BeamIOError, r'already exists.*%s' % posixpath.basename(url2)):
self.fs.copy([url1], [url2])
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_copy_file_error(self):
url1 = self.fs.join(self.tmpdir, 'new_file1')
url2 = self.fs.join(self.tmpdir, 'new_file2')
@@ -392,6 +409,10 @@ def test_copy_file_error(self):
self.fs.copy([url1, url3], [url2, url4])
self.assertTrue(self._cmpfiles(url3, url4))
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_copy_directory(self):
url_t1 = self.fs.join(self.tmpdir, 't1')
url_t1_inner = self.fs.join(self.tmpdir, 't1/inner')
@@ -409,6 +430,10 @@ def test_copy_directory(self):
self.fs.copy([url_t1], [url_t2])
self.assertTrue(self._cmpfiles(url1, url2))
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_copy_directory_overwrite_error(self):
url_t1 = self.fs.join(self.tmpdir, 't1')
url_t1_inner = self.fs.join(self.tmpdir, 't1/inner')
@@ -433,6 +458,10 @@ def test_copy_directory_overwrite_error(self):
with self.assertRaisesRegexp(BeamIOError, r'already exists'):
self.fs.copy([url_t1], [url_t2])
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_rename_file(self):
url1 = self.fs.join(self.tmpdir, 'f1')
url2 = self.fs.join(self.tmpdir, 'f2')
@@ -443,6 +472,10 @@ def test_rename_file(self):
self.assertFalse(self.fs.exists(url1))
self.assertTrue(self.fs.exists(url2))
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_rename_file_error(self):
url1 = self.fs.join(self.tmpdir, 'f1')
url2 = self.fs.join(self.tmpdir, 'f2')
@@ -457,6 +490,10 @@ def test_rename_file_error(self):
self.assertFalse(self.fs.exists(url3))
self.assertTrue(self.fs.exists(url4))
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_rename_directory(self):
url_t1 = self.fs.join(self.tmpdir, 't1')
url_t2 = self.fs.join(self.tmpdir, 't2')
@@ -478,12 +515,20 @@ def test_exists(self):
self.assertTrue(self.fs.exists(url1))
self.assertFalse(self.fs.exists(url2))
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_size(self):
url = self.fs.join(self.tmpdir, 'f1')
with self.fs.create(url) as f:
f.write(b'Hello')
self.assertEqual(5, self.fs.size(url))
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_checksum(self):
url = self.fs.join(self.tmpdir, 'f1')
with self.fs.create(url) as f:
diff --git a/sdks/python/apache_beam/io/range_trackers_test.py
b/sdks/python/apache_beam/io/range_trackers_test.py
index 475510e5d1f..de578b8a032 100644
--- a/sdks/python/apache_beam/io/range_trackers_test.py
+++ b/sdks/python/apache_beam/io/range_trackers_test.py
@@ -22,6 +22,8 @@
import copy
import logging
import math
+import os
+import sys
import unittest
from past.builtins import long
@@ -324,17 +326,32 @@ def _check(self, fraction=None, key=None, start=None,
end=None, delta=0):
self.assertEqual(computed_fraction, fraction, str(locals()))
self.assertEqual(computed_key, key, str(locals()))
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be '
+ 'fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_key_to_fraction_no_endpoints(self):
self._check(key='\x07', fraction=7/256.)
self._check(key='\xFF', fraction=255/256.)
self._check(key='\x01\x02\x03', fraction=(2**16 + 2**9 + 3) / (2.0**24))
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be '
+ 'fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_key_to_fraction(self):
self._check(key='\x87', start='\x80', fraction=7/128.)
self._check(key='\x07', end='\x10', fraction=7/16.)
self._check(key='\x47', start='\x40', end='\x80', fraction=7/64.)
self._check(key='\x47\x80', start='\x40', end='\x80', fraction=15/128.)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be '
+ 'fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_key_to_fraction_common_prefix(self):
self._check(
key='a' * 100 + 'b', start='a' * 100 + 'a', end='a' * 100 + 'c',
@@ -349,6 +366,11 @@ def test_key_to_fraction_common_prefix(self):
end='foob\x00\x00\x00\x00\x00\x00\x00\x00\x02',
fraction=0.5)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be '
+ 'fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_tiny(self):
self._check(fraction=.5**20, key='\0\0\x10')
self._check(fraction=.5**20, start='a', end='b', key='a\0\0\x10')
@@ -363,6 +385,11 @@ def test_tiny(self):
delta=1e-15)
self._check(fraction=.5**100, key='\0' * 12 + '\x10')
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be '
+ 'fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_lots(self):
for fraction in (0, 1, .5, .75, 7./512, 1 - 7./4096):
self._check(fraction)
@@ -384,6 +411,11 @@ def test_lots(self):
self._check(fraction, start='a' * 100 + '\x80', end='a' * 100 + '\x81',
delta=1e-14)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be '
+ 'fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_good_prec(self):
# There should be about 7 characters (~53 bits) of precision
# (beyond the common prefix of start and end).
diff --git a/sdks/python/apache_beam/io/source_test_utils_test.py
b/sdks/python/apache_beam/io/source_test_utils_test.py
index 38a2e8eff74..1dd09c0f0cf 100644
--- a/sdks/python/apache_beam/io/source_test_utils_test.py
+++ b/sdks/python/apache_beam/io/source_test_utils_test.py
@@ -18,6 +18,7 @@
from __future__ import absolute_import
import logging
+import os
import sys
import tempfile
import unittest
@@ -54,12 +55,20 @@ def _create_source(self, data):
for bundle in source.split(float('inf')):
return bundle.source
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_read_from_source(self):
data = self._create_data(100)
source = self._create_source(data)
self.assertCountEqual(
data, source_test_utils.read_from_source(source, None, None))
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_source_equals_reference_source(self):
data = self._create_data(100)
reference_source = self._create_source(data)
@@ -73,6 +82,10 @@ def test_source_equals_reference_source(self):
source_test_utils.assert_sources_equal_reference_source(
(reference_source, None, None), sources_info)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_split_at_fraction_successful(self):
data = self._create_data(100)
source = self._create_source(data)
@@ -97,6 +110,10 @@ def test_split_at_fraction_successful(self):
self.assertTrue(result1[0] < result3[0])
self.assertTrue(result1[1] > result3[1])
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_split_at_fraction_fails(self):
data = self._create_data(100)
source = self._create_source(data)
@@ -110,6 +127,10 @@ def test_split_at_fraction_fails(self):
source_test_utils.assert_split_at_fraction_behavior(
source, 10, 0.5, source_test_utils.ExpectedSplitOutcome.MUST_FAIL)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_split_at_fraction_binary(self):
data = self._create_data(100)
source = self._create_source(data)
@@ -122,6 +143,10 @@ def test_split_at_fraction_binary(self):
self.assertTrue(stats.successful_fractions)
self.assertTrue(stats.non_trivial_fractions)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_split_at_fraction_exhaustive(self):
data = self._create_data(10)
source = self._create_source(data)
diff --git a/sdks/python/apache_beam/io/sources_test.py
b/sdks/python/apache_beam/io/sources_test.py
index cba8922976e..1e7de57bd9c 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -95,6 +95,10 @@ def _create_temp_file(self, contents):
f.write(contents)
return f.name
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_read_from_source(self):
file_name = self._create_temp_file('aaaa\nbbbb\ncccc\ndddd')
@@ -104,6 +108,10 @@ def test_read_from_source(self):
self.assertCountEqual(['aaaa', 'bbbb', 'cccc', 'dddd'], result)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_run_direct(self):
file_name = self._create_temp_file('aaaa\nbbbb\ncccc\ndddd')
pipeline = TestPipeline()
diff --git a/sdks/python/apache_beam/io/textio_test.py
b/sdks/python/apache_beam/io/textio_test.py
index adb96d1b8d1..1bc54eeca35 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -497,6 +497,10 @@ def test_read_corrupted_bzip2_fails(self):
with self.assertRaises(Exception):
pipeline.run()
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_read_bzip2_concat(self):
with TempDir() as tempdir:
bzip2_file_name1 = tempdir.create_temp_file()
@@ -819,6 +823,10 @@ def _write_lines(self, sink, lines):
sink.write_record(f, line)
sink.close(f)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_write_text_file(self):
sink = TextSink(self.path)
self._write_lines(sink, self.lines)
@@ -833,6 +841,10 @@ def test_write_text_file_empty(self):
with open(self.path, 'r') as f:
self.assertEqual(f.read().splitlines(), [])
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_write_bzip2_file(self):
sink = TextSink(
self.path, compression_type=CompressionTypes.BZIP2)
@@ -841,6 +853,10 @@ def test_write_bzip2_file(self):
with bz2.BZ2File(self.path, 'r') as f:
self.assertEqual(f.read().splitlines(), self.lines)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_write_bzip2_file_auto(self):
self.path = self._create_temp_file(suffix='.bz2')
sink = TextSink(self.path)
@@ -849,6 +865,10 @@ def test_write_bzip2_file_auto(self):
with bz2.BZ2File(self.path, 'r') as f:
self.assertEqual(f.read().splitlines(), self.lines)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_write_gzip_file(self):
sink = TextSink(
self.path, compression_type=CompressionTypes.GZIP)
@@ -857,6 +877,10 @@ def test_write_gzip_file(self):
with gzip.GzipFile(self.path, 'r') as f:
self.assertEqual(f.read().splitlines(), self.lines)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_write_gzip_file_auto(self):
self.path = self._create_temp_file(suffix='.gz')
sink = TextSink(self.path)
@@ -865,6 +889,10 @@ def test_write_gzip_file_auto(self):
with gzip.GzipFile(self.path, 'r') as f:
self.assertEqual(f.read().splitlines(), self.lines)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_write_gzip_file_empty(self):
sink = TextSink(
self.path, compression_type=CompressionTypes.GZIP)
@@ -873,6 +901,10 @@ def test_write_gzip_file_empty(self):
with gzip.GzipFile(self.path, 'r') as f:
self.assertEqual(f.read().splitlines(), [])
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_write_text_file_with_header(self):
header = 'header1\nheader2'
sink = TextSink(self.path, header=header)
@@ -881,6 +913,10 @@ def test_write_text_file_with_header(self):
with open(self.path, 'r') as f:
self.assertEqual(f.read().splitlines(), header.splitlines() + self.lines)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_write_text_file_empty_with_header(self):
header = 'header1\nheader2'
sink = TextSink(self.path, header=header)
@@ -889,6 +925,10 @@ def test_write_text_file_empty_with_header(self):
with open(self.path, 'r') as f:
self.assertEqual(f.read().splitlines(), header.splitlines())
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_write_dataflow(self):
pipeline = TestPipeline()
pcoll = pipeline | beam.core.Create(self.lines)
diff --git a/sdks/python/apache_beam/io/vcfio_test.py
b/sdks/python/apache_beam/io/vcfio_test.py
index 25b5d0cfa09..22d23c64718 100644
--- a/sdks/python/apache_beam/io/vcfio_test.py
+++ b/sdks/python/apache_beam/io/vcfio_test.py
@@ -21,6 +21,7 @@
import logging
import os
+import sys
import unittest
from itertools import chain
from itertools import permutations
@@ -287,6 +288,10 @@ def test_read_file_pattern_large(self):
os.path.join(get_full_dir(), 'valid-*.vcf.gz'))
self.assertEqual(9900, len(read_data_gz))
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_single_file_no_records(self):
self.assertEqual(
[], self._create_temp_file_and_read_records(['']))
@@ -295,6 +300,10 @@ def test_single_file_no_records(self):
self.assertEqual(
[], self._create_temp_file_and_read_records(_SAMPLE_HEADER_LINES))
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_single_file_verify_details(self):
variant_1, vcf_line_1 = self._get_sample_variant_1()
read_data = self._create_temp_file_and_read_records(
@@ -308,6 +317,10 @@ def test_single_file_verify_details(self):
self.assertEqual(3, len(read_data))
self._assert_variants_equal([variant_1, variant_2, variant_3], read_data)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_file_pattern_verify_details(self):
variant_1, vcf_line_1 = self._get_sample_variant_1()
variant_2, vcf_line_2 = self._get_sample_variant_2()
@@ -336,6 +349,10 @@ def test_read_after_splitting(self):
split_records.extend(source_test_utils.read_from_source(*source_info))
self.assertEqual(9882, len(split_records))
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_invalid_file(self):
invalid_file_contents = self._get_invalid_file_contents()
for content in chain(*invalid_file_contents):
@@ -347,6 +364,10 @@ def test_invalid_file(self):
self._create_temp_vcf_file(content, tempdir)
self._read_records(os.path.join(tempdir.get_path(), '*.vcf'))
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_allow_malformed_records(self):
invalid_records, invalid_headers = self._get_invalid_file_contents()
@@ -365,6 +386,10 @@ def test_allow_malformed_records(self):
self._read_records(self._create_temp_vcf_file(content, tempdir),
allow_malformed_records=True)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_no_samples(self):
header_line = '#CHROM POS ID REF ALT QUAL FILTER
INFO\n'
record_line = '19 123 . G A . PASS AF=0.2'
@@ -377,6 +402,10 @@ def test_no_samples(self):
self.assertEqual(1, len(read_data))
self.assertEqual(expected_variant, read_data[0])
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_no_info(self):
record_line = 'chr19 123 . . . . .
. GT . .'
expected_variant = Variant(reference_name='chr19', start=122, end=123)
@@ -389,6 +418,10 @@ def test_no_info(self):
self.assertEqual(1, len(read_data))
self.assertEqual(expected_variant, read_data[0])
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_info_numbers_and_types(self):
info_headers = [
'##INFO=<ID=HA,Number=A,Type=String,Description="StringInfo_A">\n',
@@ -422,6 +455,10 @@ def test_info_numbers_and_types(self):
self.assertEqual(2, len(read_data))
self._assert_variants_equal([variant_1, variant_2], read_data)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_end_info_key(self):
phaseset_header_line = (
'##INFO=<ID=END,Number=1,Type=Integer,Description="End of record.">\n')
@@ -440,6 +477,10 @@ def test_end_info_key(self):
self.assertEqual(2, len(read_data))
self._assert_variants_equal([variant_1, variant_2], read_data)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_custom_phaseset(self):
phaseset_header_line = (
'##FORMAT=<ID=PS,Number=1,Type=Integer,Description="Phaseset">\n')
@@ -463,6 +504,10 @@ def test_custom_phaseset(self):
self.assertEqual(2, len(read_data))
self._assert_variants_equal([variant_1, variant_2], read_data)
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_format_numbers(self):
format_headers = [
'##FORMAT=<ID=FU,Number=.,Type=String,Description="Format_variable">\n',
@@ -486,6 +531,10 @@ def test_format_numbers(self):
self.assertEqual(1, len(read_data))
self.assertEqual(expected_variant, read_data[0])
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_pipeline_read_single_file(self):
with TempDir() as tempdir:
file_name = self._create_temp_vcf_file(_SAMPLE_HEADER_LINES +
@@ -511,6 +560,10 @@ def test_pipeline_read_file_pattern_large(self):
assert_that(pcoll, _count_equals_to(9900))
pipeline.run()
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_read_reentrant_without_splitting(self):
with TempDir() as tempdir:
file_name = self._create_temp_vcf_file(_SAMPLE_HEADER_LINES +
@@ -518,6 +571,10 @@ def test_read_reentrant_without_splitting(self):
source = VcfSource(file_name)
source_test_utils.assert_reentrant_reads_succeed((source, None, None))
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_read_reentrant_after_splitting(self):
with TempDir() as tempdir:
file_name = self._create_temp_vcf_file(_SAMPLE_HEADER_LINES +
@@ -528,6 +585,10 @@ def test_read_reentrant_after_splitting(self):
source_test_utils.assert_reentrant_reads_succeed(
(splits[0].source, splits[0].start_position,
splits[0].stop_position))
+ @unittest.skipIf(sys.version_info[0] == 3 and
+ os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5627')
def test_dynamic_work_rebalancing(self):
with TempDir() as tempdir:
file_name = self._create_temp_vcf_file(_SAMPLE_HEADER_LINES +
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index ce94945a23e..a3db7903fca 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -140,7 +140,7 @@ def get_version():
# oauth2client >=4 only works with google-apitools>=0.5.18.
'google-apitools>=0.5.18,<=0.5.20',
'proto-google-cloud-datastore-v1>=0.90.0,<=0.90.4',
- 'googledatastore==7.0.1',
+ 'googledatastore==7.0.1; python_version < "3.0"',
'google-cloud-pubsub==0.35.4',
# GCP packages required by tests
'google-cloud-bigquery==0.25.0',
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index bf9175f972b..5ed55e9635e 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -56,8 +56,9 @@ commands =
[testenv:py3]
setenv =
BEAM_EXPERIMENTAL_PY3=1
+ RUN_SKIPPED_PY3_TESTS=0
modules =
-
apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners
+
apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners,apache_beam.io.hdfs_integration_test,apache_beam.io.gcp.tests.utils_test,apache_beam.io.gcp.big_query_query_to_table_it_test,apache_beam.io.gcp.bigquery_io_read_it_test,apache_beam.io.gcp.bigquery_test,apache_beam.io.gcp.gcsfilesystem_test,apache_beam.io.gcp.gcsio_test,apache_beam.io.gcp.pubsub_integration_test,apache_beam.io.hdfs_integration_test,apache_beam.io.gcp.internal,apache_beam.io.filesystem_test,apache_beam.io.filesystems,apache_beam.io.range_trackers_test,apache_beam.io.sources_test
commands =
python --version
pip --version
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 152895)
Time Spent: 4h (was: 3h 50m)
> Finish Python 3 porting for io module
> -------------------------------------
>
> Key: BEAM-5315
> URL: https://issues.apache.org/jira/browse/BEAM-5315
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Robbe
> Assignee: Simon
> Priority: Major
> Time Spent: 4h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)