This is an automated email from the ASF dual-hosted git repository.

heejong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2db2f6e1fb3 [BEAM-14236] Parquet IO support for list to conform with 
Apache Parquet specification for Python SDK.
     new 5b0e92f6914 Merge pull request #17279 from Shiv22Wabale/BEAM-14236
2db2f6e1fb3 is described below

commit 2db2f6e1fb3895a7003467451b9e6332880ed40a
Author: Luke Cwik <[email protected]>
AuthorDate: Mon Apr 4 14:32:41 2022 -0700

    [BEAM-14236] Parquet IO support for list to conform with Apache Parquet 
specification for Python SDK.
---
 sdks/python/apache_beam/io/parquetio.py      | 21 +++++++++-
 sdks/python/apache_beam/io/parquetio_test.py | 58 +++++++++++++++++++++++++++-
 2 files changed, 77 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/io/parquetio.py 
b/sdks/python/apache_beam/io/parquetio.py
index 872140d5d7f..67edb832099 100644
--- a/sdks/python/apache_beam/io/parquetio.py
+++ b/sdks/python/apache_beam/io/parquetio.py
@@ -369,6 +369,7 @@ class WriteToParquet(PTransform):
       record_batch_size=1000,
       codec='none',
       use_deprecated_int96_timestamps=False,
+      use_compliant_nested_type=False,
       file_name_suffix='',
       num_shards=0,
       shard_name_template=None,
@@ -428,6 +429,7 @@ class WriteToParquet(PTransform):
         by the pyarrow specification is accepted.
       use_deprecated_int96_timestamps: Write nanosecond resolution timestamps 
to
         INT96 Parquet format. Defaults to False.
+      use_compliant_nested_type: Write compliant Parquet nested type (lists).
       file_name_suffix: Suffix for the files written.
       num_shards: The number of files (shards) used for output. If not set, the
         service will decide on the optimal number of shards.
@@ -456,6 +458,7 @@ class WriteToParquet(PTransform):
           row_group_buffer_size,
           record_batch_size,
           use_deprecated_int96_timestamps,
+          use_compliant_nested_type,
           file_name_suffix,
           num_shards,
           shard_name_template,
@@ -476,6 +479,7 @@ def _create_parquet_sink(
     row_group_buffer_size,
     record_batch_size,
     use_deprecated_int96_timestamps,
+    use_compliant_nested_type,
     file_name_suffix,
     num_shards,
     shard_name_template,
@@ -488,6 +492,7 @@ def _create_parquet_sink(
         row_group_buffer_size,
         record_batch_size,
         use_deprecated_int96_timestamps,
+        use_compliant_nested_type,
         file_name_suffix,
         num_shards,
         shard_name_template,
@@ -505,6 +510,7 @@ class _ParquetSink(filebasedsink.FileBasedSink):
       row_group_buffer_size,
       record_batch_size,
       use_deprecated_int96_timestamps,
+      use_compliant_nested_type,
       file_name_suffix,
       num_shards,
       shard_name_template,
@@ -528,6 +534,12 @@ class _ParquetSink(filebasedsink.FileBasedSink):
           f"codec. Your pyarrow version: {pa.__version__}")
     self._row_group_buffer_size = row_group_buffer_size
     self._use_deprecated_int96_timestamps = use_deprecated_int96_timestamps
+    if use_compliant_nested_type and ARROW_MAJOR_VERSION < 4:
+      raise ValueError(
+          "With ARROW-11497, use_compliant_nested_type is only supported in "
+          "pyarrow version >= 4.x, please use a different pyarrow version. "
+          f"Your pyarrow version: {pa.__version__}")
+    self._use_compliant_nested_type = use_compliant_nested_type
     self._buffer = [[] for _ in range(len(schema.names))]
     self._buffer_size = record_batch_size
     self._record_batches = []
@@ -536,11 +548,18 @@ class _ParquetSink(filebasedsink.FileBasedSink):
 
   def open(self, temp_path):
     self._file_handle = super().open(temp_path)
+    if ARROW_MAJOR_VERSION < 4:
+      return pq.ParquetWriter(
+          self._file_handle,
+          self._schema,
+          compression=self._codec,
+          
use_deprecated_int96_timestamps=self._use_deprecated_int96_timestamps)
     return pq.ParquetWriter(
         self._file_handle,
         self._schema,
         compression=self._codec,
-        use_deprecated_int96_timestamps=self._use_deprecated_int96_timestamps)
+        use_deprecated_int96_timestamps=self._use_deprecated_int96_timestamps,
+        use_compliant_nested_type=self._use_compliant_nested_type)
 
   def write_record(self, writer, value):
     if len(self._buffer[0]) >= self._buffer_size:
diff --git a/sdks/python/apache_beam/io/parquetio_test.py 
b/sdks/python/apache_beam/io/parquetio_test.py
index 0232bac45e0..6f47a0a5558 100644
--- a/sdks/python/apache_beam/io/parquetio_test.py
+++ b/sdks/python/apache_beam/io/parquetio_test.py
@@ -57,6 +57,8 @@ except ImportError:
   pl = None
   pq = None
 
+ARROW_MAJOR_VERSION, _, _ = map(int, pa.__version__.split('.'))
+
 
 @unittest.skipIf(pa is None, "PyArrow is not installed.")
 @pytest.mark.uses_pyarrow
@@ -109,6 +111,37 @@ class TestParquet(unittest.TestCase):
                                ('favorite_number', pa.timestamp('ns'), False),
                                ('favorite_color', pa.string())])
 
+    self.RECORDS_NESTED = [{
+        'items': [
+            {
+                'name': 'Thomas',
+                'favorite_number': 1,
+                'favorite_color': 'blue'
+            },
+            {
+                'name': 'Henry',
+                'favorite_number': 3,
+                'favorite_color': 'green'
+            },
+        ]
+    },
+                           {
+                               'items': [
+                                   {
+                                       'name': 'Toby',
+                                       'favorite_number': 7,
+                                       'favorite_color': 'brown'
+                                   },
+                               ]
+                           }]
+
+    self.SCHEMA_NESTED = pa.schema([(
+        'items',
+        pa.list_(
+            pa.struct([('name', pa.string(), False),
+                       ('favorite_number', pa.int64(), False),
+                       ('favorite_color', pa.string())])))])
+
   def tearDown(self):
     shutil.rmtree(self.temp_dir)
 
@@ -254,6 +287,7 @@ class TestParquet(unittest.TestCase):
         1024 * 1024,
         1000,
         False,
+        False,
         '.end',
         0,
         None,
@@ -314,6 +348,28 @@ class TestParquet(unittest.TestCase):
             | Map(json.dumps)
         assert_that(readback, equal_to([json.dumps(r) for r in self.RECORDS]))
 
+  def test_sink_transform_compliant_nested_type(self):
+    if ARROW_MAJOR_VERSION < 4:
+      return unittest.skip(
+          'Writing with compliant nested type is only '
+          'supported in pyarrow 4.x and above')
+    with TemporaryDirectory() as tmp_dirname:
+      path = os.path.join(tmp_dirname + 'tmp_filename')
+      with TestPipeline() as p:
+        _ = p \
+        | Create(self.RECORDS_NESTED) \
+        | WriteToParquet(
+            path, self.SCHEMA_NESTED, num_shards=1,
+            shard_name_template='', use_compliant_nested_type=True)
+      with TestPipeline() as p:
+        # json used for stable sortability
+        readback = \
+            p \
+            | ReadFromParquet(path) \
+            | Map(json.dumps)
+        assert_that(
+            readback, equal_to([json.dumps(r) for r in self.RECORDS_NESTED]))
+
   def test_batched_read(self):
     with TemporaryDirectory() as tmp_dirname:
       path = os.path.join(tmp_dirname + "tmp_filename")
@@ -337,7 +393,7 @@ class TestParquet(unittest.TestCase):
       param(compression_type='zstd')
   ])
   def test_sink_transform_compressed(self, compression_type):
-    if compression_type == 'lz4' and int(pa.__version__.split('.')[0]) == 1:
+    if compression_type == 'lz4' and ARROW_MAJOR_VERSION == 1:
       return unittest.skip(
           "Writing with LZ4 compression is not supported in "
           "pyarrow 1.x")

Reply via email to