This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 11c58f2 [BEAM-11211] Allow pyarrow up to 2.x, raise helpful error
when trying to write wit… (#13302)
11c58f2 is described below
commit 11c58f2711ac510642ef222600948ce664f42613
Author: Brian Hulette <[email protected]>
AuthorDate: Fri Nov 13 12:45:27 2020 -0800
[BEAM-11211] Allow pyarrow up to 2.x, raise helpful error when trying to
write wit… (#13302)
* Allow pyarrow up to 2.x, raise helpful error when trying to write with
LZ4 compression
* lint
---
sdks/python/apache_beam/io/parquetio.py | 8 ++++++++
sdks/python/apache_beam/io/parquetio_test.py | 4 ++++
sdks/python/setup.py | 2 +-
3 files changed, 13 insertions(+), 1 deletion(-)
diff --git a/sdks/python/apache_beam/io/parquetio.py
b/sdks/python/apache_beam/io/parquetio.py
index 0b19f18..50ee938 100644
--- a/sdks/python/apache_beam/io/parquetio.py
+++ b/sdks/python/apache_beam/io/parquetio.py
@@ -50,6 +50,9 @@ try:
except ImportError:
pa = None
pq = None
+ ARROW_MAJOR_VERSION = None
+else:
+ ARROW_MAJOR_VERSION, _, _ = map(int, pa.__version__.split('.'))
__all__ = [
'ReadFromParquet',
@@ -506,6 +509,11 @@ class _ParquetSink(filebasedsink.FileBasedSink):
compression_type=CompressionTypes.UNCOMPRESSED)
self._schema = schema
self._codec = codec
+ if ARROW_MAJOR_VERSION == 1 and self._codec.lower() == "lz4":
+ raise ValueError(
+ "Due to ARROW-9424, writing with LZ4 compression is not supported in
"
+ "pyarrow 1.x, please use a different pyarrow version or a different "
+ f"codec. Your pyarrow version: {pa.__version__}")
self._row_group_buffer_size = row_group_buffer_size
self._use_deprecated_int96_timestamps = use_deprecated_int96_timestamps
self._buffer = [[] for _ in range(len(schema.names))]
diff --git a/sdks/python/apache_beam/io/parquetio_test.py
b/sdks/python/apache_beam/io/parquetio_test.py
index 49707df..23d4b41 100644
--- a/sdks/python/apache_beam/io/parquetio_test.py
+++ b/sdks/python/apache_beam/io/parquetio_test.py
@@ -336,6 +336,10 @@ class TestParquet(unittest.TestCase):
param(compression_type='zstd')
])
def test_sink_transform_compressed(self, compression_type):
+ if compression_type == 'lz4' and int(pa.__version__.split('.')[0]) == 1:
+ return unittest.skip(
+ "Writing with LZ4 compression is not supported in "
+ "pyarrow 1.x")
with TemporaryDirectory() as tmp_dirname:
path = os.path.join(tmp_dirname + "tmp_filename")
with TestPipeline() as p:
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 0afc602..e029298 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -147,7 +147,7 @@ REQUIRED_PACKAGES = [
'pymongo>=3.8.0,<4.0.0',
'oauth2client>=2.0.1,<5',
'protobuf>=3.12.2,<4',
- 'pyarrow>=0.15.1,<0.18.0',
+ 'pyarrow>=0.15.1,<3.0.0',
'pydot>=1.2.0,<2',
'python-dateutil>=2.8.0,<3',
'pytz>=2018.3',