[
https://issues.apache.org/jira/browse/BEAM-4000?focusedWorklogId=120037&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-120037
]
ASF GitHub Bot logged work on BEAM-4000:
----------------------------------------
Author: ASF GitHub Bot
Created on: 06/Jul/18 21:18
Start Date: 06/Jul/18 21:18
Worklog Time Spent: 10m
Work Description: charlesccychen closed pull request #5715: [BEAM-4000]
Futurize io subpackage
URL: https://github.com/apache/beam/pull/5715
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/io/__init__.py
b/sdks/python/apache_beam/io/__init__.py
index 6ea0efdf65f..4cbb4458864 100644
--- a/sdks/python/apache_beam/io/__init__.py
+++ b/sdks/python/apache_beam/io/__init__.py
@@ -18,6 +18,8 @@
"""A package defining several input sources and output sinks."""
# pylint: disable=wildcard-import
+from __future__ import absolute_import
+
from apache_beam.io.avroio import *
from apache_beam.io.filebasedsink import *
from apache_beam.io.iobase import Read
diff --git a/sdks/python/apache_beam/io/avroio.py
b/sdks/python/apache_beam/io/avroio.py
index 1368734f17d..9b86b58982b 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -41,10 +41,12 @@
that can be used to write a given ``PCollection`` of Python objects to an
Avro file.
"""
+from __future__ import absolute_import
-import cStringIO
+import io
import os
import zlib
+from builtins import object
from functools import partial
import avro
@@ -341,7 +343,7 @@ def _decompress_bytes(data, codec):
# We take care to avoid extra copies of data while slicing large objects
# by use of a buffer.
result = snappy.decompress(buffer(data)[:-4])
- avroio.BinaryDecoder(cStringIO.StringIO(data[-4:])).check_crc32(result)
+ avroio.BinaryDecoder(io.BytesIO(data[-4:])).check_crc32(result)
return result
else:
raise ValueError('Unknown codec: %r' % codec)
@@ -351,7 +353,7 @@ def num_records(self):
def records(self):
decoder = avroio.BinaryDecoder(
- cStringIO.StringIO(self._decompressed_block_bytes))
+ io.BytesIO(self._decompressed_block_bytes))
reader = avroio.DatumReader(
writers_schema=self._schema, readers_schema=self._schema)
diff --git a/sdks/python/apache_beam/io/avroio_test.py
b/sdks/python/apache_beam/io/avroio_test.py
index 9b9a855fcdf..93f2ba9ebfd 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -14,12 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+from __future__ import absolute_import
import json
import logging
import os
import tempfile
import unittest
+from builtins import range
import avro.datafile
import avro.schema
diff --git a/sdks/python/apache_beam/io/concat_source.py
b/sdks/python/apache_beam/io/concat_source.py
index 56c4ccabcf7..ddf3a77745e 100644
--- a/sdks/python/apache_beam/io/concat_source.py
+++ b/sdks/python/apache_beam/io/concat_source.py
@@ -19,9 +19,12 @@
Concat Source, which reads the union of several other sources.
"""
+from __future__ import absolute_import
+from __future__ import division
import bisect
import threading
+from builtins import range
from apache_beam.io import iobase
diff --git a/sdks/python/apache_beam/io/concat_source_test.py
b/sdks/python/apache_beam/io/concat_source_test.py
index 0f7dd547e76..31e4392f645 100644
--- a/sdks/python/apache_beam/io/concat_source_test.py
+++ b/sdks/python/apache_beam/io/concat_source_test.py
@@ -16,9 +16,12 @@
#
"""Unit tests for the sources framework."""
+from __future__ import absolute_import
+from __future__ import division
import logging
import unittest
+from builtins import range
import apache_beam as beam
from apache_beam.io import iobase
@@ -91,10 +94,10 @@ def test_conact_source(self):
RangeSource(12, 16),
])
self.assertEqual(list(source.read(source.get_range_tracker())),
- range(16))
+ list(range(16)))
self.assertEqual(list(source.read(source.get_range_tracker((1, None),
(2, 10)))),
- range(4, 10))
+ list(range(4, 10)))
range_tracker = source.get_range_tracker(None, None)
self.assertEqual(range_tracker.position_at_fraction(0), (0, 0))
self.assertEqual(range_tracker.position_at_fraction(.5), (2, 8))
@@ -176,10 +179,11 @@ def test_single_source(self):
read_all = source_test_utils.read_from_source
range10 = RangeSource(0, 10)
- self.assertEquals(read_all(ConcatSource([range10])), range(10))
- self.assertEquals(read_all(ConcatSource([range10]), (0, 5)), range(5, 10))
+ self.assertEquals(read_all(ConcatSource([range10])), list(range(10)))
+ self.assertEquals(read_all(ConcatSource([range10]), (0, 5)),
+ list(range(5, 10)))
self.assertEquals(read_all(ConcatSource([range10]), None, (0, 5)),
- range(5))
+ list(range(5)))
def test_source_with_empty_ranges(self):
read_all = source_test_utils.read_from_source
@@ -189,11 +193,11 @@ def test_source_with_empty_ranges(self):
range10 = RangeSource(0, 10)
self.assertEquals(read_all(ConcatSource([empty, empty, range10])),
- range(10))
+ list(range(10)))
self.assertEquals(read_all(ConcatSource([empty, range10, empty])),
- range(10))
+ list(range(10)))
self.assertEquals(read_all(ConcatSource([range10, empty, range10, empty])),
- range(10) + range(10))
+ list(range(10)) + list(range(10)))
def test_source_with_empty_ranges_exhastive(self):
empty = RangeSource(0, 0)
@@ -214,7 +218,7 @@ def test_run_concat_direct(self):
])
pipeline = TestPipeline()
pcoll = pipeline | beam.io.Read(source)
- assert_that(pcoll, equal_to(range(1000)))
+ assert_that(pcoll, equal_to(list(range(1000))))
pipeline.run()
diff --git a/sdks/python/apache_beam/io/filebasedsink.py
b/sdks/python/apache_beam/io/filebasedsink.py
index 4c587b965f0..c4a746f5289 100644
--- a/sdks/python/apache_beam/io/filebasedsink.py
+++ b/sdks/python/apache_beam/io/filebasedsink.py
@@ -24,8 +24,10 @@
import re
import time
import uuid
+from builtins import range
+from builtins import zip
-from six import string_types
+from future.utils import iteritems
from apache_beam.internal import util
from apache_beam.io import iobase
@@ -41,6 +43,11 @@
__all__ = ['FileBasedSink']
+try:
+ unicode # pylint: disable=unicode-builtin
+except NameError:
+ unicode = str
+
class FileBasedSink(iobase.Sink):
"""A sink to a GCS or local files.
@@ -75,10 +82,10 @@ def __init__(self,
~exceptions.ValueError: if **shard_name_template** is not of expected
format.
"""
- if not isinstance(file_path_prefix, (string_types, ValueProvider)):
+ if not isinstance(file_path_prefix, ((str, unicode), ValueProvider)):
raise TypeError('file_path_prefix must be a string or ValueProvider;'
'got %r instead' % file_path_prefix)
- if not isinstance(file_name_suffix, (string_types, ValueProvider)):
+ if not isinstance(file_name_suffix, ((str, unicode), ValueProvider)):
raise TypeError('file_name_suffix must be a string or ValueProvider;'
'got %r instead' % file_name_suffix)
@@ -89,9 +96,9 @@ def __init__(self,
shard_name_template = DEFAULT_SHARD_NAME_TEMPLATE
elif shard_name_template == '':
num_shards = 1
- if isinstance(file_path_prefix, string_types):
+ if isinstance(file_path_prefix, (str, unicode)):
file_path_prefix = StaticValueProvider(str, file_path_prefix)
- if isinstance(file_name_suffix, string_types):
+ if isinstance(file_name_suffix, (str, unicode)):
file_name_suffix = StaticValueProvider(str, file_name_suffix)
self.file_path_prefix = file_path_prefix
self.file_name_suffix = file_name_suffix
@@ -297,7 +304,7 @@ def _rename_batch(batch):
except BeamIOError as exp:
if exp.exception_details is None:
raise
- for (src, dst), exception in exp.exception_details.iteritems():
+ for (src, dst), exception in iteritems(exp.exception_details):
if exception:
logging.error(('Exception in _rename_batch. src: %s, '
'dst: %s, err: %s'), src, dst, exception)
@@ -307,7 +314,7 @@ def _rename_batch(batch):
return exceptions
exception_batches = util.run_using_threadpool(
- _rename_batch, zip(source_file_batch, destination_file_batch),
+ _rename_batch, list(zip(source_file_batch, destination_file_batch)),
num_threads)
all_exceptions = [e for exception_batch in exception_batches
diff --git a/sdks/python/apache_beam/io/filebasedsink_test.py
b/sdks/python/apache_beam/io/filebasedsink_test.py
index 05ac5228992..b79370eff4a 100644
--- a/sdks/python/apache_beam/io/filebasedsink_test.py
+++ b/sdks/python/apache_beam/io/filebasedsink_test.py
@@ -18,12 +18,15 @@
"""Unit tests for file sinks."""
+from __future__ import absolute_import
+
import glob
import logging
import os
import shutil
import tempfile
import unittest
+from builtins import range
import hamcrest as hc
import mock
diff --git a/sdks/python/apache_beam/io/filebasedsource.py
b/sdks/python/apache_beam/io/filebasedsource.py
index 4509a3616b5..ec16b060c32 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -26,8 +26,10 @@
:class:`~apache_beam.io._AvroSource`.
"""
-from six import integer_types
-from six import string_types
+from __future__ import absolute_import
+
+from past.builtins import long
+from past.builtins import unicode
from apache_beam.internal import pickler
from apache_beam.io import concat_source
@@ -99,12 +101,12 @@ def __init__(self,
result.
"""
- if not isinstance(file_pattern, (string_types, ValueProvider)):
+ if not isinstance(file_pattern, ((str, unicode), ValueProvider)):
raise TypeError('%s: file_pattern must be of type string'
' or ValueProvider; got %r instead'
% (self.__class__.__name__, file_pattern))
- if isinstance(file_pattern, string_types):
+ if isinstance(file_pattern, (str, unicode)):
file_pattern = StaticValueProvider(str, file_pattern)
self._pattern = file_pattern
@@ -235,11 +237,11 @@ class _SingleFileSource(iobase.BoundedSource):
def __init__(self, file_based_source, file_name, start_offset, stop_offset,
min_bundle_size=0, splittable=True):
- if not isinstance(start_offset, integer_types):
+ if not isinstance(start_offset, (int, long)):
raise TypeError(
'start_offset must be a number. Received: %r' % start_offset)
if stop_offset != range_trackers.OffsetRangeTracker.OFFSET_INFINITY:
- if not isinstance(stop_offset, integer_types):
+ if not isinstance(stop_offset, (int, long)):
raise TypeError(
'stop_offset must be a number. Received: %r' % stop_offset)
if start_offset >= stop_offset:
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py
b/sdks/python/apache_beam/io/filebasedsource_test.py
index c567b24e77a..e9312238f5b 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -15,15 +14,19 @@
# limitations under the License.
#
+from __future__ import absolute_import
+
import bz2
-import cStringIO
import gzip
+import io
import logging
import math
import os
import random
import tempfile
import unittest
+from builtins import object
+from builtins import range
import hamcrest as hc
@@ -153,7 +156,7 @@ def __init__(self, values):
def split(self, desired_bundle_size, start_position=None,
stop_position=None):
# simply devides values into two bundles
- middle = len(self._values) / 2
+ middle = len(self._values) // 2
yield iobase.SourceBundle(0.5, TestConcatSource.DummySource(
self._values[:middle]), None, None)
yield iobase.SourceBundle(0.5, TestConcatSource.DummySource(
@@ -188,11 +191,11 @@ def test_read(self):
concat = ConcatSource(sources)
range_tracker = concat.get_range_tracker(None, None)
read_data = [value for value in concat.read(range_tracker)]
- self.assertItemsEqual(range(30), read_data)
+ self.assertItemsEqual(list(range(30)), read_data)
def test_split(self):
- sources = [TestConcatSource.DummySource(range(start, start + 10)) for start
- in [0, 10, 20]]
+ sources = [TestConcatSource.DummySource(list(range(start, start + 10)))
+ for start in [0, 10, 20]]
concat = ConcatSource(sources)
splits = [split for split in concat.split()]
self.assertEquals(6, len(splits))
@@ -205,7 +208,7 @@ def test_split(self):
split.stop_position)
read_data.extend([value for value in split.source.read(
range_tracker_for_split)])
- self.assertItemsEqual(range(30), read_data)
+ self.assertItemsEqual(list(range(30)), read_data)
def test_estimate_size(self):
sources = [TestConcatSource.DummySource(range(start, start + 10)) for start
@@ -473,8 +476,8 @@ def test_read_pattern_gzip(self):
chunks = [lines[splits[i-1]:splits[i]] for i in range(1, len(splits))]
compressed_chunks = []
for c in chunks:
- out = cStringIO.StringIO()
- with gzip.GzipFile(fileobj=out, mode="w") as f:
+ out = io.BytesIO()
+ with gzip.GzipFile(fileobj=out, mode="wb") as f:
f.write('\n'.join(c))
compressed_chunks.append(out.getvalue())
file_pattern = write_prepared_pattern(compressed_chunks)
@@ -520,8 +523,8 @@ def test_read_auto_pattern(self):
chunks = [lines[splits[i - 1]:splits[i]] for i in range(1, len(splits))]
compressed_chunks = []
for c in chunks:
- out = cStringIO.StringIO()
- with gzip.GzipFile(fileobj=out, mode="w") as f:
+ out = io.BytesIO()
+ with gzip.GzipFile(fileobj=out, mode="wb") as f:
f.write('\n'.join(c))
compressed_chunks.append(out.getvalue())
file_pattern = write_prepared_pattern(
@@ -540,8 +543,8 @@ def
test_read_auto_pattern_compressed_and_uncompressed(self):
chunks_to_write = []
for i, c in enumerate(chunks):
if i%2 == 0:
- out = cStringIO.StringIO()
- with gzip.GzipFile(fileobj=out, mode="w") as f:
+ out = io.BytesIO()
+ with gzip.GzipFile(fileobj=out, mode="wb") as f:
f.write('\n'.join(c))
chunks_to_write.append(out.getvalue())
else:
diff --git a/sdks/python/apache_beam/io/filesystem.py
b/sdks/python/apache_beam/io/filesystem.py
index 0b99793dca0..a9dafe6df70 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -22,23 +22,28 @@
"""
from __future__ import absolute_import
+from __future__ import division
import abc
import bz2
-import cStringIO
import fnmatch
+import io
import logging
import os
import posixpath
import re
import time
import zlib
+from builtins import object
+from builtins import zip
-from six import integer_types
-from six import string_types
+from future import standard_library
+from future.utils import with_metaclass
from apache_beam.utils.plugin import BeamPlugin
+standard_library.install_aliases()
+
logger = logging.getLogger(__name__)
DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
@@ -46,6 +51,13 @@
__all__ = ['CompressionTypes', 'CompressedFile', 'FileMetadata', 'FileSystem',
'MatchResult']
+try:
+ unicode # pylint: disable=unicode-builtin
+ long # pylint: disable=long-builtin
+except NameError:
+ unicode = str
+ long = int
+
class CompressionTypes(object):
"""Enum-like class representing known compression types."""
@@ -91,7 +103,7 @@ def detect_compression_type(cls, file_path):
"""Returns the compression type of a file (based on its suffix)."""
compression_types_by_suffix = {'.bz2': cls.BZIP2, '.gz': cls.GZIP}
lowercased_path = file_path.lower()
- for suffix, compression_type in compression_types_by_suffix.iteritems():
+ for suffix, compression_type in compression_types_by_suffix.items():
if lowercased_path.endswith(suffix):
return compression_type
return cls.UNCOMPRESSED
@@ -131,7 +143,7 @@ def __init__(self,
if self.readable():
self._read_size = read_size
- self._read_buffer = cStringIO.StringIO()
+ self._read_buffer = io.BytesIO()
self._read_position = 0
self._read_eof = False
@@ -246,20 +258,20 @@ def readline(self):
if not self._decompressor:
raise ValueError('decompressor not initialized')
- io = cStringIO.StringIO()
+ bytes_io = io.BytesIO()
while True:
# Ensure that the internal buffer has at least half the read_size. Going
# with half the _read_size (as opposed to a full _read_size) to ensure
# that actual fetches are more evenly spread out, as opposed to having 2
# consecutive reads at the beginning of a read.
- self._fetch_to_internal_buffer(self._read_size / 2)
+ self._fetch_to_internal_buffer(self._read_size // 2)
line = self._read_from_internal_buffer(
lambda: self._read_buffer.readline())
- io.write(line)
+ bytes_io.write(line)
if line.endswith('\n') or not line:
break # Newline or EOF reached.
- return io.getvalue()
+ return bytes_io.getvalue()
def closed(self):
return not self._file or self._file.closed()
@@ -382,8 +394,8 @@ class FileMetadata(object):
"""Metadata about a file path that is the output of FileSystem.match
"""
def __init__(self, path, size_in_bytes):
- assert isinstance(path, string_types) and path, "Path should be a string"
- assert isinstance(size_in_bytes, integer_types) and size_in_bytes >= 0, \
+ assert isinstance(path, (str, unicode)) and path, "Path should be a string"
+ assert isinstance(size_in_bytes, (int, long)) and size_in_bytes >= 0, \
"Invalid value for size_in_bytes should %s (of type %s)" % (
size_in_bytes, type(size_in_bytes))
self.path = path
@@ -432,14 +444,13 @@ def __init__(self, msg, exception_details=None):
self.exception_details = exception_details
-class FileSystem(BeamPlugin):
+class FileSystem(with_metaclass(abc.ABCMeta, BeamPlugin)):
"""A class that defines the functions that can be performed on a filesystem.
All methods are abstract and they are for file system providers to
implement. Clients should use the FileSystems class to interact with
the correct file system based on the provided file pattern scheme.
"""
- __metaclass__ = abc.ABCMeta
CHUNK_SIZE = 1 # Chuck size in the batch operations
def __init__(self, pipeline_options):
diff --git a/sdks/python/apache_beam/io/filesystem_test.py
b/sdks/python/apache_beam/io/filesystem_test.py
index f50f25e79e5..8c9ffde4fee 100644
--- a/sdks/python/apache_beam/io/filesystem_test.py
+++ b/sdks/python/apache_beam/io/filesystem_test.py
@@ -17,19 +17,28 @@
#
"""Unit tests for filesystem module."""
+from __future__ import absolute_import
+from __future__ import division
+
import bz2
import gzip
import logging
import os
import tempfile
import unittest
-from StringIO import StringIO
+from builtins import range
+from io import BytesIO
+
+from future import standard_library
+from future.utils import iteritems
from apache_beam.io.filesystem import CompressedFile
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
+standard_library.install_aliases()
+
class TestingFileSystem(FileSystem):
@@ -59,7 +68,7 @@ def _insert_random_file(self, path, size):
self._files[path] = size
def _list(self, dir_or_prefix):
- for path, size in self._files.iteritems():
+ for path, size in iteritems(self._files):
if path.startswith(dir_or_prefix):
yield FileMetadata(path, size)
@@ -254,14 +263,15 @@ def test_seek_set(self):
with open(file_name, 'rb') as f:
compressed_fd = CompressedFile(f, compression_type,
read_size=self.read_block_size)
- reference_fd = StringIO(self.content)
+ reference_fd = BytesIO(self.content)
- # Note: content (readline) check must come before position (tell) check
- # because cStringIO's tell() reports out of bound positions (if we seek
- # beyond the file) up until a real read occurs.
+ # Note: BytesIO's tell() reports out of bound positions (if we seek
+ # beyond the file), therefore we need to cap it to max_position
# _CompressedFile.tell() always stays within the bounds of the
# uncompressed content.
- for seek_position in (-1, 0, 1,
+ # Negative seek position argument is not supported for BytesIO with
+ # whence set to SEEK_SET.
+ for seek_position in (0, 1,
len(self.content)-1, len(self.content),
len(self.content) + 1):
compressed_fd.seek(seek_position, os.SEEK_SET)
@@ -273,6 +283,8 @@ def test_seek_set(self):
uncompressed_position = compressed_fd.tell()
reference_position = reference_fd.tell()
+ max_position = len(self.content)
+ reference_position = min(reference_position, max_position)
self.assertEqual(uncompressed_position, reference_position)
def test_seek_cur(self):
@@ -281,13 +293,16 @@ def test_seek_cur(self):
with open(file_name, 'rb') as f:
compressed_fd = CompressedFile(f, compression_type,
read_size=self.read_block_size)
- reference_fd = StringIO(self.content)
+ reference_fd = BytesIO(self.content)
# Test out of bound, inbound seeking in both directions
+ # Note: BytesIO's seek() reports out of bound positions (if we seek
+ # beyond the file), therefore we need to cap it to max_position (to
+ # make it consistent with the old StringIO behavior
for seek_position in (-1, 0, 1,
- len(self.content) / 2,
- len(self.content) / 2,
- -1 * len(self.content) / 2):
+ len(self.content) // 2,
+ len(self.content) // 2,
+ -1 * len(self.content) // 2):
compressed_fd.seek(seek_position, os.SEEK_CUR)
reference_fd.seek(seek_position, os.SEEK_CUR)
@@ -297,6 +312,9 @@ def test_seek_cur(self):
reference_position = reference_fd.tell()
uncompressed_position = compressed_fd.tell()
+ max_position = len(self.content)
+ reference_position = min(reference_position, max_position)
+ reference_fd.seek(reference_position, os.SEEK_SET)
self.assertEqual(uncompressed_position, reference_position)
def test_read_from_end_returns_no_data(self):
diff --git a/sdks/python/apache_beam/io/filesystemio.py
b/sdks/python/apache_beam/io/filesystemio.py
index 35e141bb756..086ae164aef 100644
--- a/sdks/python/apache_beam/io/filesystemio.py
+++ b/sdks/python/apache_beam/io/filesystemio.py
@@ -16,22 +16,25 @@
#
"""Utilities for ``FileSystem`` implementations."""
+from __future__ import absolute_import
+
import abc
import io
import os
+from builtins import object
+
+from future.utils import with_metaclass
__all__ = ['Downloader', 'Uploader', 'DownloaderStream', 'UploaderStream',
'PipeStream']
-class Downloader(object):
+class Downloader(with_metaclass(abc.ABCMeta, object)):
"""Download interface for a single file.
Implementations should support random access reads.
"""
- __metaclass__ = abc.ABCMeta
-
@abc.abstractproperty
def size(self):
"""Size of file to download."""
@@ -52,11 +55,9 @@ def get_range(self, start, end):
"""
-class Uploader(object):
+class Uploader(with_metaclass(abc.ABCMeta, object)):
"""Upload interface for a single file."""
- __metaclass__ = abc.ABCMeta
-
@abc.abstractmethod
def put(self, data):
"""Write data to file sequentially.
diff --git a/sdks/python/apache_beam/io/filesystemio_test.py
b/sdks/python/apache_beam/io/filesystemio_test.py
index 41f23836104..75079a539c4 100644
--- a/sdks/python/apache_beam/io/filesystemio_test.py
+++ b/sdks/python/apache_beam/io/filesystemio_test.py
@@ -16,12 +16,15 @@
#
"""Tests for filesystemio."""
+from __future__ import absolute_import
+
import io
import logging
import multiprocessing
import os
import threading
import unittest
+from builtins import range
from apache_beam.io import filesystemio
diff --git a/sdks/python/apache_beam/io/filesystems.py
b/sdks/python/apache_beam/io/filesystems.py
index 66eff061fb0..55ad5d1e33e 100644
--- a/sdks/python/apache_beam/io/filesystems.py
+++ b/sdks/python/apache_beam/io/filesystems.py
@@ -17,9 +17,10 @@
"""FileSystems interface class for accessing the correct filesystem"""
-import re
+from __future__ import absolute_import
-from six import string_types
+import re
+from builtins import object
from apache_beam.io.filesystem import BeamIOError
from apache_beam.io.filesystem import CompressionTypes
@@ -46,6 +47,12 @@
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
except ImportError:
pass
+
+try:
+ unicode # pylint: disable=unicode-builtin
+except NameError:
+ unicode = str
+
# pylint: enable=wrong-import-position, unused-import
__all__ = ['FileSystems']
@@ -273,7 +280,7 @@ def delete(paths):
Raises:
``BeamIOError`` if any of the delete operations fail
"""
- if isinstance(paths, string_types):
+ if isinstance(paths, (str, unicode)):
raise BeamIOError('Delete passed string argument instead of list: %s' %
paths)
if len(paths) == 0:
diff --git a/sdks/python/apache_beam/io/filesystems_test.py
b/sdks/python/apache_beam/io/filesystems_test.py
index c084a3cf7bb..383eb40ea07 100644
--- a/sdks/python/apache_beam/io/filesystems_test.py
+++ b/sdks/python/apache_beam/io/filesystems_test.py
@@ -18,6 +18,8 @@
"""Unit tests for LocalFileSystem."""
+from __future__ import absolute_import
+
import filecmp
import logging
import os
@@ -124,7 +126,7 @@ def test_match_file_exception(self):
with self.assertRaisesRegexp(BeamIOError,
r'^Unable to get the Filesystem') as error:
FileSystems.match([None])
- self.assertEqual(error.exception.exception_details.keys(), [None])
+ self.assertEqual(list(error.exception.exception_details), [None])
def test_match_directory(self):
path1 = os.path.join(self.tmpdir, 'f1')
@@ -158,7 +160,8 @@ def test_copy_error(self):
with self.assertRaisesRegexp(BeamIOError,
r'^Copy operation failed') as error:
FileSystems.copy([path1], [path2])
- self.assertEqual(error.exception.exception_details.keys(), [(path1,
path2)])
+ self.assertEqual(list(error.exception.exception_details.keys()),
+ [(path1, path2)])
def test_copy_directory(self):
path_t1 = os.path.join(self.tmpdir, 't1')
@@ -190,7 +193,8 @@ def test_rename_error(self):
with self.assertRaisesRegexp(BeamIOError,
r'^Rename operation failed') as error:
FileSystems.rename([path1], [path2])
- self.assertEqual(error.exception.exception_details.keys(), [(path1,
path2)])
+ self.assertEqual(list(error.exception.exception_details.keys()),
+ [(path1, path2)])
def test_rename_directory(self):
path_t1 = os.path.join(self.tmpdir, 't1')
@@ -231,7 +235,7 @@ def test_delete_error(self):
with self.assertRaisesRegexp(BeamIOError,
r'^Delete operation failed') as error:
FileSystems.delete([path1])
- self.assertEqual(error.exception.exception_details.keys(), [path1])
+ self.assertEqual(list(error.exception.exception_details.keys()), [path1])
if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/io/gcp/__init__.py
b/sdks/python/apache_beam/io/gcp/__init__.py
index cce3acad34a..f4f43cbb123 100644
--- a/sdks/python/apache_beam/io/gcp/__init__.py
+++ b/sdks/python/apache_beam/io/gcp/__init__.py
@@ -14,3 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+from __future__ import absolute_import
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 41d0833217b..be5f3cb6cda 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -119,8 +119,11 @@
import re
import time
import uuid
+from builtins import object
+from builtins import zip
-from six import string_types
+from future.utils import iteritems
+from future.utils import itervalues
from apache_beam import coders
from apache_beam.internal.gcp import auth
@@ -143,6 +146,10 @@
pass
# pylint: enable=wrong-import-order, wrong-import-position
+try:
+ unicode # pylint: disable=unicode-builtin
+except NameError:
+ unicode = str
__all__ = [
'TableRowJsonCoder',
@@ -212,7 +219,7 @@ def decode(self, encoded_table_row):
od = json.loads(
encoded_table_row, object_pairs_hook=collections.OrderedDict)
return bigquery.TableRow(
- f=[bigquery.TableCell(v=to_json_value(e)) for e in od.itervalues()])
+ f=[bigquery.TableCell(v=to_json_value(e)) for e in itervalues(od)])
def parse_table_schema_from_json(schema_string):
@@ -524,7 +531,7 @@ def __init__(self, table, dataset=None, project=None,
schema=None,
self.table_reference = _parse_table_reference(table, dataset, project)
# Transform the table schema into a bigquery.TableSchema instance.
- if isinstance(schema, string_types):
+ if isinstance(schema, (str, unicode)):
# TODO(silviuc): Should add a regex-based validation of the format.
table_schema = bigquery.TableSchema()
schema_list = [s.strip(' ') for s in schema.split(',')]
@@ -1103,7 +1110,7 @@ def insert_rows(self, project_id, dataset_id, table_id,
rows):
final_rows = []
for row in rows:
json_object = bigquery.JsonObject()
- for k, v in row.iteritems():
+ for k, v in iteritems(row):
json_object.additionalProperties.append(
bigquery.JsonObject.AdditionalProperty(
key=k, value=to_json_value(v)))
@@ -1415,7 +1422,7 @@ def get_dict_table_schema(schema):
return schema
elif schema is None:
return schema
- elif isinstance(schema, string_types):
+ elif isinstance(schema, (str, unicode)):
table_schema = WriteToBigQuery.get_table_schema_from_string(schema)
return WriteToBigQuery.table_schema_to_dict(table_schema)
elif isinstance(schema, bigquery.TableSchema):
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py
b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index ff6721e6d91..843fc394ff1 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -16,6 +16,7 @@
#
"""Unit tests for BigQuery sources and sinks."""
+from __future__ import absolute_import
import datetime
import json
@@ -26,6 +27,7 @@
import hamcrest as hc
import mock
+from future.utils import iteritems
import apache_beam as beam
from apache_beam.internal.gcp.json_value import to_json_value
@@ -698,7 +700,7 @@ def test_rows_are_written(self):
sample_row = {'i': 1, 'b': True, 's': 'abc', 'f': 3.14}
expected_rows = []
json_object = bigquery.JsonObject()
- for k, v in sample_row.iteritems():
+ for k, v in iteritems(sample_row):
json_object.additionalProperties.append(
bigquery.JsonObject.AdditionalProperty(
key=k, value=to_json_value(v)))
diff --git a/sdks/python/apache_beam/io/gcp/datastore/__init__.py
b/sdks/python/apache_beam/io/gcp/datastore/__init__.py
index cce3acad34a..f4f43cbb123 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/__init__.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/__init__.py
@@ -14,3 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+from __future__ import absolute_import
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/__init__.py
b/sdks/python/apache_beam/io/gcp/datastore/v1/__init__.py
index cce3acad34a..f4f43cbb123 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/__init__.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/__init__.py
@@ -14,3 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+from __future__ import absolute_import
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler.py
b/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler.py
index 7d94f24ca85..f6c65a5fa1c 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler.py
@@ -19,7 +19,11 @@
#
# For internal use only; no backwards-compatibility guarantees.
+from __future__ import absolute_import
+from __future__ import division
+
import random
+from builtins import object
from apache_beam.io.gcp.datastore.v1 import util
diff --git
a/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler_test.py
b/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler_test.py
index 1ac23930f65..e3ccb921188 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/adaptive_throttler_test.py
@@ -15,7 +15,11 @@
# limitations under the License.
#
+from __future__ import absolute_import
+from __future__ import division
+
import unittest
+from builtins import range
from mock import patch
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
index 13209c17bd2..437f38850ba 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
@@ -16,9 +16,12 @@
#
"""A connector for reading from and writing to Google Cloud Datastore"""
+from __future__ import absolute_import
+from __future__ import division
import logging
import time
+from builtins import object
from apache_beam.io.gcp.datastore.v1 import helper
from apache_beam.io.gcp.datastore.v1 import query_splitter
@@ -362,11 +365,11 @@ def get_batch_size(self, now):
return _Mutate._WRITE_BATCH_INITIAL_SIZE
recent_mean_latency_ms = (self._commit_time_per_entity_ms.sum(now)
- / self._commit_time_per_entity_ms.count(now))
+ // self._commit_time_per_entity_ms.count(now))
return max(_Mutate._WRITE_BATCH_MIN_SIZE,
min(_Mutate._WRITE_BATCH_MAX_SIZE,
_Mutate._WRITE_BATCH_TARGET_LATENCY_MS
- / max(recent_mean_latency_ms, 1)
+ // max(recent_mean_latency_ms, 1)
))
def report_latency(self, now, latency_ms, num_mutations):
@@ -444,7 +447,7 @@ def _flush_batch(self):
_, latency_ms = helper.write_mutations(
self._datastore, self._project, self._mutations,
self._throttler, self._update_rpc_stats,
- throttle_delay=_Mutate._WRITE_BATCH_TARGET_LATENCY_MS/1000)
+ throttle_delay=_Mutate._WRITE_BATCH_TARGET_LATENCY_MS//1000)
logging.debug("Successfully wrote %d mutations in %dms.",
len(self._mutations), latency_ms)
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
index e131f93d520..b7bc22a5870 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
@@ -15,9 +15,14 @@
# limitations under the License.
#
+from __future__ import absolute_import
+from __future__ import division
from __future__ import print_function
import unittest
+from builtins import map
+from builtins import range
+from builtins import zip
from mock import MagicMock
from mock import call
@@ -179,7 +184,8 @@ def check_DatastoreWriteFn(self, num_entities):
entities = [e.entity for e in
fake_datastore.create_entities(num_entities)]
- expected_mutations = map(WriteToDatastore.to_upsert_mutation, entities)
+ expected_mutations = list(map(WriteToDatastore.to_upsert_mutation,
+ entities))
actual_mutations = []
self._mock_datastore.commit.side_effect = (
@@ -195,7 +201,7 @@ def check_DatastoreWriteFn(self, num_entities):
self.assertEqual(actual_mutations, expected_mutations)
self.assertEqual(
- (num_entities - 1) / _Mutate._WRITE_BATCH_INITIAL_SIZE + 1,
+ (num_entities - 1) // _Mutate._WRITE_BATCH_INITIAL_SIZE + 1,
self._mock_datastore.commit.call_count)
def test_DatastoreWriteLargeEntities(self):
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py
b/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py
index aa3780558d7..054df9ddbcc 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/fake_datastore.py
@@ -20,7 +20,10 @@
For internal use only; no backwards-compatibility guarantees.
"""
+from __future__ import absolute_import
+
import uuid
+from builtins import range
# Protect against environments where datastore library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
index 87d798bebe3..a27df09ac0e 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
@@ -24,9 +24,11 @@
import logging
import sys
import time
+from builtins import object
from socket import error as SocketError
-import six
+from future.builtins import next
+from past.builtins import unicode
# pylint: disable=ungrouped-imports
from apache_beam.internal.gcp import auth
@@ -90,7 +92,7 @@ def compare_path(p1, p2):
3. If no `id` is defined for both paths, then their `names` are compared.
"""
- result = cmp(p1.kind, p2.kind)
+ result = (p1.kind > p2.kind) - (p1.kind < p2.kind)
if result != 0:
return result
@@ -98,12 +100,12 @@ def compare_path(p1, p2):
if not p2.HasField('id'):
return -1
- return cmp(p1.id, p2.id)
+ return (p1.id > p2.id) - (p1.id < p2.id)
if p2.HasField('id'):
return 1
- return cmp(p1.name, p2.name)
+ return (p1.name > p2.name) - (p1.name < p2.name)
def get_datastore(project):
@@ -255,7 +257,7 @@ def make_kind_stats_query(namespace, kind,
latest_timestamp):
kind_filter = datastore_helper.set_property_filter(
query_pb2.Filter(), 'kind_name', PropertyFilter.EQUAL,
- six.text_type(kind))
+ unicode(kind))
timestamp_filter = datastore_helper.set_property_filter(
query_pb2.Filter(), 'timestamp', PropertyFilter.EQUAL,
latest_timestamp)
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
index 90a366842da..3e3f51762f4 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
@@ -16,10 +16,13 @@
#
"""Tests for datastore helper."""
+from __future__ import absolute_import
+
import errno
import random
import sys
import unittest
+from builtins import map
from socket import error as SocketError
from mock import MagicMock
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py
b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py
index d5674f9cbf1..7723fb7b646 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py
@@ -16,6 +16,10 @@
#
"""Implements a Cloud Datastore query splitter."""
+from __future__ import absolute_import
+from __future__ import division
+
+from builtins import range
from apache_beam.io.gcp.datastore.v1 import helper
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py
b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py
index 52f25facd05..c5bfdd81cf8 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter_test.py
@@ -17,6 +17,8 @@
"""Cloud Datastore query splitter test."""
+from __future__ import absolute_import
+
import unittest
from mock import MagicMock
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/util.py
b/sdks/python/apache_beam/io/gcp/datastore/v1/util.py
index 5670a241ba8..d796225e185 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/util.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/util.py
@@ -19,7 +19,12 @@
#
# For internal use only; no backwards-compatibility guarantees.
+from __future__ import absolute_import
+from __future__ import division
+
import math
+from builtins import object
+from builtins import range
class MovingSum(object):
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index b861446c87e..a52df0ef946 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -18,6 +18,10 @@
from __future__ import absolute_import
+from builtins import zip
+
+from future.utils import iteritems
+
from apache_beam.io.filesystem import BeamIOError
from apache_beam.io.filesystem import CompressedFile
from apache_beam.io.filesystem import CompressionTypes
@@ -117,7 +121,7 @@ def _list(self, dir_or_prefix):
``BeamIOError`` if listing fails, but not if no files were found.
"""
try:
- for path, size in gcsio.GcsIO().list_prefix(dir_or_prefix).iteritems():
+ for path, size in iteritems(gcsio.GcsIO().list_prefix(dir_or_prefix)):
yield FileMetadata(path, size)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("List operation failed", {dir_or_prefix: e})
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
index f0d46f9620b..88f7ce93f46 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
@@ -20,6 +20,7 @@
import logging
import unittest
+from builtins import zip
import mock
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py
b/sdks/python/apache_beam/io/gcp/gcsio.py
index 72d1de40153..a76199e865d 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -20,7 +20,8 @@
https://github.com/GoogleCloudPlatform/appengine-gcs-client.
"""
-import cStringIO
+from __future__ import absolute_import
+
import errno
import io
import logging
@@ -30,6 +31,7 @@
import threading
import time
import traceback
+from builtins import object
import httplib2
@@ -468,7 +470,7 @@ def __init__(self, client, path, buffer_size):
self._get_request.generation = metadata.generation
# Initialize read buffer state.
- self._download_stream = cStringIO.StringIO()
+ self._download_stream = io.BytesIO()
self._downloader = transfer.Download(
self._download_stream, auto_transfer=False,
chunksize=self._buffer_size)
self._client.objects.Get(self._get_request, download=self._downloader)
@@ -483,6 +485,7 @@ def size(self):
return self._size
def get_range(self, start, end):
+ self._download_stream.seek(0)
self._download_stream.truncate(0)
self._downloader.GetRange(start, end - 1)
return self._download_stream.getvalue()
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py
b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index b2bb43e30e8..b10926c4190 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -15,12 +15,15 @@
# limitations under the License.
#
"""Tests for Google Cloud Storage client."""
+from __future__ import division
import errno
import logging
import os
import random
import unittest
+from builtins import object
+from builtins import range
import httplib2
import mock
@@ -606,7 +609,7 @@ def test_context_manager(self):
# Test that exceptions are not swallowed by the context manager.
with self.assertRaises(ZeroDivisionError):
with self.gcs.open(file_name) as f:
- f.read(0 / 0)
+ f.read(0 // 0)
def test_list_prefix(self):
bucket_name = 'gcsio-test'
@@ -637,7 +640,7 @@ def test_list_prefix(self):
expected_file_names = [('gs://%s/%s' % (bucket_name, object_name), size)
for (object_name, size) in expected_object_names]
self.assertEqual(
- set(self.gcs.list_prefix(file_pattern).iteritems()),
+ set(self.gcs.list_prefix(file_pattern).items()),
set(expected_file_names))
diff --git a/sdks/python/apache_beam/io/gcp/internal/__init__.py
b/sdks/python/apache_beam/io/gcp/internal/__init__.py
index cce3acad34a..f4f43cbb123 100644
--- a/sdks/python/apache_beam/io/gcp/internal/__init__.py
+++ b/sdks/python/apache_beam/io/gcp/internal/__init__.py
@@ -14,3 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+from __future__ import absolute_import
diff --git a/sdks/python/apache_beam/io/gcp/internal/clients/__init__.py
b/sdks/python/apache_beam/io/gcp/internal/clients/__init__.py
index cce3acad34a..f4f43cbb123 100644
--- a/sdks/python/apache_beam/io/gcp/internal/clients/__init__.py
+++ b/sdks/python/apache_beam/io/gcp/internal/clients/__init__.py
@@ -14,3 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+from __future__ import absolute_import
diff --git
a/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/__init__.py
b/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/__init__.py
index 732c1c6bd5d..e5d35e8f805 100644
--- a/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/__init__.py
+++ b/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/__init__.py
@@ -18,6 +18,8 @@
"""Common imports for generated bigquery client library."""
# pylint:disable=wildcard-import
+from __future__ import absolute_import
+
import pkgutil
# Protect against environments where apitools library is not available.
diff --git
a/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py
b/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py
index 201a1830b87..9b8cddee03f 100644
---
a/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py
+++
b/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py
@@ -17,9 +17,12 @@
"""Generated client library for bigquery version v2."""
# NOTE: This file is autogenerated and should not be edited by hand.
+from __future__ import absolute_import
+
from apitools.base.py import base_api
-from apache_beam.io.gcp.internal.clients.bigquery import bigquery_v2_messages
as messages
+from apache_beam.io.gcp.internal.clients.bigquery import \
+ bigquery_v2_messages as messages
class BigqueryV2(base_api.BaseApiClient):
diff --git
a/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_messages.py
b/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_messages.py
index 3e741cdbbd6..d96582eaa61 100644
---
a/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_messages.py
+++
b/sdks/python/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_messages.py
@@ -20,10 +20,10 @@
A data platform for customers to create, manage, share and query data.
"""
# NOTE: This file is autogenerated and should not be edited by hand.
+from __future__ import absolute_import
from apitools.base.protorpclite import messages as _messages
-from apitools.base.py import encoding
-from apitools.base.py import extra_types
+from apitools.base.py import encoding, extra_types
package = 'bigquery'
diff --git
a/sdks/python/apache_beam/io/gcp/internal/clients/storage/__init__.py
b/sdks/python/apache_beam/io/gcp/internal/clients/storage/__init__.py
index 8a726ef85da..c26332355b7 100644
--- a/sdks/python/apache_beam/io/gcp/internal/clients/storage/__init__.py
+++ b/sdks/python/apache_beam/io/gcp/internal/clients/storage/__init__.py
@@ -18,6 +18,8 @@
"""Common imports for generated storage client library."""
# pylint:disable=wildcard-import
+from __future__ import absolute_import
+
import pkgutil
# Protect against environments where apitools library is not available.
diff --git
a/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py
b/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py
index 1b46d917f14..1335bd4fe10 100644
---
a/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py
+++
b/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py
@@ -17,9 +17,13 @@
"""Generated client library for storage version v1."""
# NOTE: This file is autogenerated and should not be edited by hand.
+
+from __future__ import absolute_import
+
from apitools.base.py import base_api
-from apache_beam.io.gcp.internal.clients.storage import storage_v1_messages as
messages
+from apache_beam.io.gcp.internal.clients.storage import \
+ storage_v1_messages as messages
class StorageV1(base_api.BaseApiClient):
diff --git
a/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py
b/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py
index 3c180a652bc..95a31c430da 100644
---
a/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py
+++
b/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py
@@ -21,10 +21,11 @@
"""
# NOTE: This file is autogenerated and should not be edited by hand.
+from __future__ import absolute_import
+
from apitools.base.protorpclite import message_types as _message_types
from apitools.base.protorpclite import messages as _messages
-from apitools.base.py import encoding
-from apitools.base.py import extra_types
+from apitools.base.py import encoding, extra_types
package = 'storage'
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py
b/sdks/python/apache_beam/io/gcp/pubsub.py
index 6db45bdbfa5..a57bef8f395 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -33,8 +33,9 @@
from __future__ import absolute_import
import re
+from builtins import object
-from six import text_type
+from past.builtins import basestring
from apache_beam import coders
from apache_beam.io.iobase import Read
@@ -49,7 +50,6 @@
except ImportError:
pubsub_pb2 = None
-
__all__ = ['PubsubMessage', 'ReadFromPubSub', 'ReadStringsFromPubSub',
'WriteStringsToPubSub']
@@ -173,7 +173,7 @@ def expand(self, pvalue):
p = (pvalue.pipeline
| ReadFromPubSub(self.topic, self.subscription, self.id_label)
| 'DecodeString' >> Map(lambda b: b.decode('utf-8')))
- p.element_type = text_type
+ p.element_type = basestring
return p
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py
b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 165c072abb1..01cb0c072ef 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -18,9 +18,14 @@
"""Unit tests for PubSub sources and sinks."""
+from __future__ import absolute_import
+
import functools
import logging
import unittest
+from builtins import object
+from builtins import range
+from builtins import zip
import hamcrest as hc
import mock
diff --git a/sdks/python/apache_beam/io/gcp/tests/__init__.py
b/sdks/python/apache_beam/io/gcp/tests/__init__.py
index cce3acad34a..6569e3fe5de 100644
--- a/sdks/python/apache_beam/io/gcp/tests/__init__.py
+++ b/sdks/python/apache_beam/io/gcp/tests/__init__.py
@@ -14,3 +14,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
+from __future__ import absolute_import
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index 8241a228cf0..c33f0db4998 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -17,6 +17,8 @@
"""Bigquery data verifier for end-to-end test."""
+from __future__ import absolute_import
+
import logging
from hamcrest.core.base_matcher import BaseMatcher
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
index a0977189e06..e6ae9a06dc8 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher_test.py
@@ -17,6 +17,8 @@
"""Unit test for Bigquery verifier"""
+from __future__ import absolute_import
+
import logging
import unittest
diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
index 695bfcd70f6..da906c6c6fe 100644
--- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
@@ -17,6 +17,8 @@
"""PubSub verifier used for end-to-end test."""
+from __future__ import absolute_import
+
import logging
import time
from collections import Counter
diff --git a/sdks/python/apache_beam/io/gcp/tests/utils.py
b/sdks/python/apache_beam/io/gcp/tests/utils.py
index b4b4ba8b11f..81fc4736c04 100644
--- a/sdks/python/apache_beam/io/gcp/tests/utils.py
+++ b/sdks/python/apache_beam/io/gcp/tests/utils.py
@@ -18,6 +18,8 @@
"""Utility methods for testing on GCP."""
+from __future__ import absolute_import
+
import logging
from apache_beam.utils import retry
diff --git a/sdks/python/apache_beam/io/gcp/tests/utils_test.py
b/sdks/python/apache_beam/io/gcp/tests/utils_test.py
index a5c74bb3424..4ea65a9d86b 100644
--- a/sdks/python/apache_beam/io/gcp/tests/utils_test.py
+++ b/sdks/python/apache_beam/io/gcp/tests/utils_test.py
@@ -17,6 +17,8 @@
"""Unittest for GCP testing utils."""
+from __future__ import absolute_import
+
import logging
import unittest
diff --git a/sdks/python/apache_beam/io/hadoopfilesystem.py
b/sdks/python/apache_beam/io/hadoopfilesystem.py
index 1fdd0713dd1..61c16dc4c24 100644
--- a/sdks/python/apache_beam/io/hadoopfilesystem.py
+++ b/sdks/python/apache_beam/io/hadoopfilesystem.py
@@ -24,6 +24,7 @@
import logging
import posixpath
import re
+from builtins import zip
import hdfs
diff --git a/sdks/python/apache_beam/io/hadoopfilesystem_test.py
b/sdks/python/apache_beam/io/hadoopfilesystem_test.py
index d35b8d5b4e0..cce0ac72a33 100644
--- a/sdks/python/apache_beam/io/hadoopfilesystem_test.py
+++ b/sdks/python/apache_beam/io/hadoopfilesystem_test.py
@@ -23,6 +23,9 @@
import logging
import posixpath
import unittest
+from builtins import object
+
+from future.utils import itervalues
from apache_beam.io import hadoopfilesystem as hdfs
from apache_beam.io.filesystem import BeamIOError
@@ -125,7 +128,7 @@ def list(self, path, status=False):
'list must be called on a directory, got file: %s' % path)
result = []
- for file in self.files.itervalues():
+ for file in itervalues(self.files):
if file.stat['path'].startswith(path):
fs = file.get_file_status()
result.append((fs[hdfs._FILE_STATUS_PATH_SUFFIX], fs))
diff --git a/sdks/python/apache_beam/io/iobase.py
b/sdks/python/apache_beam/io/iobase.py
index 7f9750b9609..34a50b54dbe 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -29,9 +29,13 @@
the sink.
"""
+from __future__ import absolute_import
+
import logging
import random
import uuid
+from builtins import object
+from builtins import range
from collections import namedtuple
from apache_beam import coders
diff --git a/sdks/python/apache_beam/io/localfilesystem.py
b/sdks/python/apache_beam/io/localfilesystem.py
index 7e7f88d4e45..23a1e8a846c 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -20,6 +20,7 @@
import os
import shutil
+from builtins import zip
from apache_beam.io.filesystem import BeamIOError
from apache_beam.io.filesystem import CompressedFile
diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py
b/sdks/python/apache_beam/io/localfilesystem_test.py
index 5f825eec379..d6d8eb4c217 100644
--- a/sdks/python/apache_beam/io/localfilesystem_test.py
+++ b/sdks/python/apache_beam/io/localfilesystem_test.py
@@ -18,6 +18,8 @@
"""Unit tests for LocalFileSystem."""
+from __future__ import absolute_import
+
import filecmp
import logging
import os
@@ -146,7 +148,7 @@ def test_match_file_exception(self):
with self.assertRaisesRegexp(BeamIOError,
r'^Match operation failed') as error:
self.fs.match([None])
- self.assertEqual(error.exception.exception_details.keys(), [None])
+ self.assertEqual(list(error.exception.exception_details.keys()), [None])
def test_match_glob(self):
path1 = os.path.join(self.tmpdir, 'f1')
@@ -190,7 +192,8 @@ def test_copy_error(self):
with self.assertRaisesRegexp(BeamIOError,
r'^Copy operation failed') as error:
self.fs.copy([path1], [path2])
- self.assertEqual(error.exception.exception_details.keys(), [(path1,
path2)])
+ self.assertEqual(list(error.exception.exception_details.keys()),
+ [(path1, path2)])
def test_copy_directory(self):
path_t1 = os.path.join(self.tmpdir, 't1')
@@ -222,7 +225,8 @@ def test_rename_error(self):
with self.assertRaisesRegexp(BeamIOError,
r'^Rename operation failed') as error:
self.fs.rename([path1], [path2])
- self.assertEqual(error.exception.exception_details.keys(), [(path1,
path2)])
+ self.assertEqual(list(error.exception.exception_details.keys()),
+ [(path1, path2)])
def test_rename_directory(self):
path_t1 = os.path.join(self.tmpdir, 't1')
@@ -273,7 +277,7 @@ def test_delete_error(self):
with self.assertRaisesRegexp(BeamIOError,
r'^Delete operation failed') as error:
self.fs.delete([path1])
- self.assertEqual(error.exception.exception_details.keys(), [path1])
+ self.assertEqual(list(error.exception.exception_details.keys()), [path1])
if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/io/range_trackers.py
b/sdks/python/apache_beam/io/range_trackers.py
index 2da8736b114..2613b364b6f 100644
--- a/sdks/python/apache_beam/io/range_trackers.py
+++ b/sdks/python/apache_beam/io/range_trackers.py
@@ -17,18 +17,23 @@
"""iobase.RangeTracker implementations provided with Apache Beam.
"""
+from __future__ import absolute_import
+from __future__ import division
import logging
import math
import threading
-from six import integer_types
-
from apache_beam.io import iobase
__all__ = ['OffsetRangeTracker', 'LexicographicKeyRangeTracker',
'OrderedPositionRangeTracker', 'UnsplittableRangeTracker']
+try:
+ long # pylint: disable=long-builtin
+except NameError:
+ long = int
+
class OffsetRangeTracker(iobase.RangeTracker):
"""A 'RangeTracker' for non-negative positions of type 'long'."""
@@ -47,9 +52,9 @@ def __init__(self, start, end):
raise ValueError('Start offset must not be \'None\'')
if end is None:
raise ValueError('End offset must not be \'None\'')
- assert isinstance(start, integer_types)
+ assert isinstance(start, (int, long))
if end != self.OFFSET_INFINITY:
- assert isinstance(end, integer_types)
+ assert isinstance(end, (int, long))
assert start <= end
@@ -123,7 +128,7 @@ def set_current_position(self, record_start):
self._last_record_start = record_start
def try_split(self, split_offset):
- assert isinstance(split_offset, integer_types)
+ assert isinstance(split_offset, (int, long))
with self._lock:
if self._stop_offset == OffsetRangeTracker.OFFSET_INFINITY:
logging.debug('refusing to split %r at %d: stop position unspecified',
diff --git a/sdks/python/apache_beam/io/range_trackers_test.py
b/sdks/python/apache_beam/io/range_trackers_test.py
index 6b8f7c7139c..f0c7ca4ace8 100644
--- a/sdks/python/apache_beam/io/range_trackers_test.py
+++ b/sdks/python/apache_beam/io/range_trackers_test.py
@@ -16,16 +16,21 @@
#
"""Unit tests for the range_trackers module."""
+from __future__ import absolute_import
+from __future__ import division
import copy
import logging
import math
import unittest
-from six import integer_types
-
from apache_beam.io import range_trackers
+try:
+ long # pylint: disable=long-builtin
+except NameError:
+ long = int
+
class OffsetRangeTrackerTest(unittest.TestCase):
@@ -102,7 +107,7 @@ def test_get_position_for_fraction_dense(self):
# Position must be an integer type.
self.assertTrue(isinstance(tracker.position_at_fraction(0.0),
- integer_types))
+ (int, long)))
# [3, 3) represents 0.0 of [3, 6)
self.assertEqual(3, tracker.position_at_fraction(0.0))
# [3, 4) represents up to 1/3 of [3, 6)
@@ -163,7 +168,7 @@ def test_try_split_points(self):
tracker = range_trackers.OffsetRangeTracker(100, 400)
def dummy_callback(stop_position):
- return int(stop_position / 5)
+ return int(stop_position // 5)
tracker.set_split_points_unclaimed_callback(dummy_callback)
diff --git a/sdks/python/apache_beam/io/restriction_trackers.py
b/sdks/python/apache_beam/io/restriction_trackers.py
index 8aeecba3ae1..014125f2900 100644
--- a/sdks/python/apache_beam/io/restriction_trackers.py
+++ b/sdks/python/apache_beam/io/restriction_trackers.py
@@ -16,8 +16,11 @@
#
"""`iobase.RestrictionTracker` implementations provided with Apache Beam."""
+from __future__ import absolute_import
+from __future__ import division
import threading
+from builtins import object
from apache_beam.io.iobase import RestrictionTracker
from apache_beam.io.range_trackers import OffsetRangeTracker
@@ -48,7 +51,7 @@ def split(self, desired_num_offsets_per_split,
min_num_offsets_per_split=1):
remaining = self.stop - current_split_stop
# Avoiding a small split at the end.
- if (remaining < desired_num_offsets_per_split / 4 or
+ if (remaining < desired_num_offsets_per_split // 4 or
remaining < min_num_offsets_per_split):
current_split_stop = self.stop
diff --git a/sdks/python/apache_beam/io/restriction_trackers_test.py
b/sdks/python/apache_beam/io/restriction_trackers_test.py
index e8a799f28f1..2820426690f 100644
--- a/sdks/python/apache_beam/io/restriction_trackers_test.py
+++ b/sdks/python/apache_beam/io/restriction_trackers_test.py
@@ -17,6 +17,8 @@
"""Unit tests for the range_trackers module."""
+from __future__ import absolute_import
+
import logging
import unittest
diff --git a/sdks/python/apache_beam/io/source_test_utils.py
b/sdks/python/apache_beam/io/source_test_utils.py
index 05e6e9c0d6f..f60fafb913e 100644
--- a/sdks/python/apache_beam/io/source_test_utils.py
+++ b/sdks/python/apache_beam/io/source_test_utils.py
@@ -43,10 +43,15 @@
* apache_beam.io.source_test_utils_test.py
* apache_beam.io.avroio_test.py
"""
+from __future__ import absolute_import
+from __future__ import division
import logging
import threading
import weakref
+from builtins import next
+from builtins import object
+from builtins import range
from collections import namedtuple
from multiprocessing.pool import ThreadPool
diff --git a/sdks/python/apache_beam/io/source_test_utils_test.py
b/sdks/python/apache_beam/io/source_test_utils_test.py
index 9a619c4821f..94eb4401f6b 100644
--- a/sdks/python/apache_beam/io/source_test_utils_test.py
+++ b/sdks/python/apache_beam/io/source_test_utils_test.py
@@ -15,9 +15,12 @@
# limitations under the License.
#
+from __future__ import absolute_import
+
import logging
import tempfile
import unittest
+from builtins import range
import apache_beam.io.source_test_utils as source_test_utils
from apache_beam.io.filebasedsource_test import LineSource
diff --git a/sdks/python/apache_beam/io/sources_test.py
b/sdks/python/apache_beam/io/sources_test.py
index 8f885e59fb0..40629ae8395 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -16,6 +16,7 @@
#
"""Unit tests for the sources framework."""
+from __future__ import absolute_import
import logging
import os
diff --git a/sdks/python/apache_beam/io/textio.py
b/sdks/python/apache_beam/io/textio.py
index f5fd2da6056..fee174bb3a1 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -21,6 +21,8 @@
from __future__ import absolute_import
import logging
+from builtins import object
+from builtins import range
from functools import partial
from six import integer_types
diff --git a/sdks/python/apache_beam/io/textio_test.py
b/sdks/python/apache_beam/io/textio_test.py
index 324f52adf75..3606897049d 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -16,6 +16,8 @@
#
"""Tests for textio module."""
+from __future__ import absolute_import
+from __future__ import division
import bz2
import glob
@@ -25,6 +27,7 @@
import shutil
import tempfile
import unittest
+from builtins import range
import apache_beam as beam
import apache_beam.io.source_test_utils as source_test_utils
diff --git a/sdks/python/apache_beam/io/tfrecordio.py
b/sdks/python/apache_beam/io/tfrecordio.py
index 0290bfaf270..989247a96ee 100644
--- a/sdks/python/apache_beam/io/tfrecordio.py
+++ b/sdks/python/apache_beam/io/tfrecordio.py
@@ -20,6 +20,7 @@
import logging
import struct
+from builtins import object
from functools import partial
import crcmod
diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py
b/sdks/python/apache_beam/io/tfrecordio_test.py
index c540cfae050..ded8d794159 100644
--- a/sdks/python/apache_beam/io/tfrecordio_test.py
+++ b/sdks/python/apache_beam/io/tfrecordio_test.py
@@ -15,18 +15,22 @@
# limitations under the License.
#
+from __future__ import absolute_import
+
import binascii
-import cStringIO
import glob
import gzip
+import io
import logging
import os
import pickle
import random
import re
import unittest
+from builtins import range
import crcmod
+from future import standard_library
import apache_beam as beam
from apache_beam import Create
@@ -42,6 +46,8 @@
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
+standard_library.install_aliases()
+
try:
import tensorflow as tf # pylint: disable=import-error
except ImportError:
@@ -81,15 +87,15 @@ def setUp(self):
self.record = binascii.a2b_base64(FOO_RECORD_BASE64)
def _as_file_handle(self, contents):
- result = cStringIO.StringIO()
+ result = io.BytesIO()
result.write(contents)
- result.reset()
+ result.seek(0)
return result
def _increment_value_at_index(self, value, index):
l = list(value)
- l[index] = chr(ord(l[index]) + 1)
- return ''.join(l)
+ l[index] = bytes(ord(l[index]) + 1)
+ return "".join(l)
def _test_error(self, record, error_text):
with self.assertRaisesRegexp(ValueError, re.escape(error_text)):
@@ -122,7 +128,7 @@ def test_masked_crc32c_crcmod(self):
'\x03\x00\x00\x00\x00\x00\x00\x00', crc32c_fn=crc32c_fn))
def test_write_record(self):
- file_handle = cStringIO.StringIO()
+ file_handle = io.BytesIO()
_TFRecordUtil.write_record(file_handle, 'foo')
self.assertEqual(self.record, file_handle.getvalue())
@@ -143,9 +149,9 @@ def test_read_record_invalid_data_mask(self):
def test_compatibility_read_write(self):
for record in ['', 'blah', 'another blah']:
- file_handle = cStringIO.StringIO()
+ file_handle = io.BytesIO()
_TFRecordUtil.write_record(file_handle, record)
- file_handle.reset()
+ file_handle.seek(0)
actual = _TFRecordUtil.read_record(file_handle)
self.assertEqual(record, actual)
@@ -391,7 +397,7 @@ class TestEnd2EndWriteAndRead(unittest.TestCase):
def create_inputs(self):
input_array = [[random.random() - 0.5 for _ in range(15)]
for _ in range(12)]
- memfile = cStringIO.StringIO()
+ memfile = io.BytesIO()
pickle.dump(input_array, memfile)
return memfile.getvalue()
@@ -445,7 +451,7 @@ def test_end2end_example_proto(self):
file_path_prefix = temp_dir.create_temp_file('result')
example = tf.train.Example()
- example.features.feature['int'].int64_list.value.extend(range(3))
+ example.features.feature['int'].int64_list.value.extend(list(range(3)))
example.features.feature['bytes'].bytes_list.value.extend(
[b'foo', b'bar'])
diff --git a/sdks/python/apache_beam/io/utils.py
b/sdks/python/apache_beam/io/utils.py
index d6b312d7a67..1dfadb5b73c 100644
--- a/sdks/python/apache_beam/io/utils.py
+++ b/sdks/python/apache_beam/io/utils.py
@@ -19,6 +19,10 @@
on transforms.ptransform_test.test_read_metrics.
"""
+from __future__ import absolute_import
+
+from builtins import range
+
from apache_beam.io import iobase
from apache_beam.io.range_trackers import OffsetRangeTracker
from apache_beam.metrics import Metrics
diff --git a/sdks/python/apache_beam/io/vcfio.py
b/sdks/python/apache_beam/io/vcfio.py
index 94c740b9964..cd3b617e2c7 100644
--- a/sdks/python/apache_beam/io/vcfio.py
+++ b/sdks/python/apache_beam/io/vcfio.py
@@ -24,9 +24,11 @@
import logging
import traceback
+from builtins import next
+from builtins import object
from collections import namedtuple
-from six import string_types
+from future.utils import iteritems
import vcf
@@ -38,8 +40,10 @@
from apache_beam.transforms import PTransform
try:
- long # Python 2
+ unicode # pylint: disable=unicode-builtin
+ int # Python 2
except NameError:
+ unicode = str
long = int # Python 3
@@ -320,6 +324,9 @@ def __iter__(self):
return self
def next(self):
+ return self.__next__()
+
+ def __next__(self):
try:
record = next(self._vcf_reader)
return self._convert_to_variant_record(record, self._vcf_reader.infos,
@@ -371,7 +378,7 @@ def _convert_to_variant_record(self, record, infos,
formats):
if record.FILTER is not None:
variant.filters.extend(
record.FILTER if record.FILTER else [PASS_FILTER])
- for k, v in record.INFO.iteritems():
+ for k, v in iteritems(record.INFO):
# Special case: END info value specifies end of the record, so adjust
# variant.end and do not include it as part of variant.info.
if k == END_INFO_KEY:
@@ -406,7 +413,7 @@ def _convert_to_variant_record(self, record, infos,
formats):
# Note: this is already done for INFO fields in PyVCF.
if (field in formats and
formats[field].num is None and
- isinstance(data, (int, float, long, string_types, bool))):
+ isinstance(data, (int, float, int, str, unicode, bool))):
data = [data]
call.info[field] = data
variant.calls.append(call)
diff --git a/sdks/python/apache_beam/io/vcfio_test.py
b/sdks/python/apache_beam/io/vcfio_test.py
index 029515fe341..25b5d0cfa09 100644
--- a/sdks/python/apache_beam/io/vcfio_test.py
+++ b/sdks/python/apache_beam/io/vcfio_test.py
@@ -17,6 +17,8 @@
"""Tests for vcfio module."""
+from __future__ import absolute_import
+
import logging
import os
import unittest
@@ -74,9 +76,10 @@ def get_full_dir():
def _variant_comparator(v1, v2):
if v1.reference_name == v2.reference_name:
if v1.start == v2.start:
- return cmp(v1.end, v2.end)
- return cmp(v1.start, v2.start)
- return cmp(v1.reference_name, v2.reference_name)
+ return (v1.end > v2.end) - (v1.end < v2.end)
+ return (v1.start > v2.start) - (v1.start < v2.start)
+ return (v1.reference_name > v2.reference_name) - \
+ (v1.reference_name < v2.reference_name)
# Helper method for verifying equal count on PCollection.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 120037)
Time Spent: 6h (was: 5h 50m)
> Futurize and fix python 2 compatibility for io subpackage
> ---------------------------------------------------------
>
> Key: BEAM-4000
> URL: https://issues.apache.org/jira/browse/BEAM-4000
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Robbe
> Assignee: Matthias Feys
> Priority: Major
> Time Spent: 6h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)