This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 11c58f2  [BEAM-11211] Allow pyarrow up to 2.x, raise helpful error 
when trying to write wit… (#13302)
11c58f2 is described below

commit 11c58f2711ac510642ef222600948ce664f42613
Author: Brian Hulette <[email protected]>
AuthorDate: Fri Nov 13 12:45:27 2020 -0800

    [BEAM-11211] Allow pyarrow up to 2.x, raise helpful error when trying to 
write wit… (#13302)
    
    * Allow pyarrow up to 2.x, raise helpful error when trying to write with 
LZ4 compression
    
    * lint
---
 sdks/python/apache_beam/io/parquetio.py      | 8 ++++++++
 sdks/python/apache_beam/io/parquetio_test.py | 4 ++++
 sdks/python/setup.py                         | 2 +-
 3 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/io/parquetio.py 
b/sdks/python/apache_beam/io/parquetio.py
index 0b19f18..50ee938 100644
--- a/sdks/python/apache_beam/io/parquetio.py
+++ b/sdks/python/apache_beam/io/parquetio.py
@@ -50,6 +50,9 @@ try:
 except ImportError:
   pa = None
   pq = None
+  ARROW_MAJOR_VERSION = None
+else:
+  ARROW_MAJOR_VERSION, _, _ = map(int, pa.__version__.split('.'))
 
 __all__ = [
     'ReadFromParquet',
@@ -506,6 +509,11 @@ class _ParquetSink(filebasedsink.FileBasedSink):
         compression_type=CompressionTypes.UNCOMPRESSED)
     self._schema = schema
     self._codec = codec
+    if ARROW_MAJOR_VERSION == 1 and self._codec.lower() == "lz4":
+      raise ValueError(
+          "Due to ARROW-9424, writing with LZ4 compression is not supported in 
"
+          "pyarrow 1.x, please use a different pyarrow version or a different "
+          f"codec. Your pyarrow version: {pa.__version__}")
     self._row_group_buffer_size = row_group_buffer_size
     self._use_deprecated_int96_timestamps = use_deprecated_int96_timestamps
     self._buffer = [[] for _ in range(len(schema.names))]
diff --git a/sdks/python/apache_beam/io/parquetio_test.py 
b/sdks/python/apache_beam/io/parquetio_test.py
index 49707df..23d4b41 100644
--- a/sdks/python/apache_beam/io/parquetio_test.py
+++ b/sdks/python/apache_beam/io/parquetio_test.py
@@ -336,6 +336,10 @@ class TestParquet(unittest.TestCase):
       param(compression_type='zstd')
   ])
   def test_sink_transform_compressed(self, compression_type):
+    if compression_type == 'lz4' and int(pa.__version__.split('.')[0]) == 1:
+      return unittest.skip(
+          "Writing with LZ4 compression is not supported in "
+          "pyarrow 1.x")
     with TemporaryDirectory() as tmp_dirname:
       path = os.path.join(tmp_dirname + "tmp_filename")
       with TestPipeline() as p:
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 0afc602..e029298 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -147,7 +147,7 @@ REQUIRED_PACKAGES = [
     'pymongo>=3.8.0,<4.0.0',
     'oauth2client>=2.0.1,<5',
     'protobuf>=3.12.2,<4',
-    'pyarrow>=0.15.1,<0.18.0',
+    'pyarrow>=0.15.1,<3.0.0',
     'pydot>=1.2.0,<2',
     'python-dateutil>=2.8.0,<3',
     'pytz>=2018.3',

Reply via email to