This is an automated email from the ASF dual-hosted git repository.
heejong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2db2f6e1fb3 [BEAM-14236] Parquet IO support for list to conform with
Apache Parquet specification for Python SDK.
new 5b0e92f6914 Merge pull request #17279 from Shiv22Wabale/BEAM-14236
2db2f6e1fb3 is described below
commit 2db2f6e1fb3895a7003467451b9e6332880ed40a
Author: Luke Cwik <[email protected]>
AuthorDate: Mon Apr 4 14:32:41 2022 -0700
[BEAM-14236] Parquet IO support for list to conform with Apache Parquet
specification for Python SDK.
---
sdks/python/apache_beam/io/parquetio.py | 21 +++++++++-
sdks/python/apache_beam/io/parquetio_test.py | 58 +++++++++++++++++++++++++++-
2 files changed, 77 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/io/parquetio.py
b/sdks/python/apache_beam/io/parquetio.py
index 872140d5d7f..67edb832099 100644
--- a/sdks/python/apache_beam/io/parquetio.py
+++ b/sdks/python/apache_beam/io/parquetio.py
@@ -369,6 +369,7 @@ class WriteToParquet(PTransform):
record_batch_size=1000,
codec='none',
use_deprecated_int96_timestamps=False,
+ use_compliant_nested_type=False,
file_name_suffix='',
num_shards=0,
shard_name_template=None,
@@ -428,6 +429,7 @@ class WriteToParquet(PTransform):
by the pyarrow specification is accepted.
use_deprecated_int96_timestamps: Write nanosecond resolution timestamps
to
INT96 Parquet format. Defaults to False.
+ use_compliant_nested_type: Write compliant Parquet nested type (lists).
file_name_suffix: Suffix for the files written.
num_shards: The number of files (shards) used for output. If not set, the
service will decide on the optimal number of shards.
@@ -456,6 +458,7 @@ class WriteToParquet(PTransform):
row_group_buffer_size,
record_batch_size,
use_deprecated_int96_timestamps,
+ use_compliant_nested_type,
file_name_suffix,
num_shards,
shard_name_template,
@@ -476,6 +479,7 @@ def _create_parquet_sink(
row_group_buffer_size,
record_batch_size,
use_deprecated_int96_timestamps,
+ use_compliant_nested_type,
file_name_suffix,
num_shards,
shard_name_template,
@@ -488,6 +492,7 @@ def _create_parquet_sink(
row_group_buffer_size,
record_batch_size,
use_deprecated_int96_timestamps,
+ use_compliant_nested_type,
file_name_suffix,
num_shards,
shard_name_template,
@@ -505,6 +510,7 @@ class _ParquetSink(filebasedsink.FileBasedSink):
row_group_buffer_size,
record_batch_size,
use_deprecated_int96_timestamps,
+ use_compliant_nested_type,
file_name_suffix,
num_shards,
shard_name_template,
@@ -528,6 +534,12 @@ class _ParquetSink(filebasedsink.FileBasedSink):
f"codec. Your pyarrow version: {pa.__version__}")
self._row_group_buffer_size = row_group_buffer_size
self._use_deprecated_int96_timestamps = use_deprecated_int96_timestamps
+ if use_compliant_nested_type and ARROW_MAJOR_VERSION < 4:
+ raise ValueError(
+ "With ARROW-11497, use_compliant_nested_type is only supported in "
+ "pyarrow version >= 4.x, please use a different pyarrow version. "
+ f"Your pyarrow version: {pa.__version__}")
+ self._use_compliant_nested_type = use_compliant_nested_type
self._buffer = [[] for _ in range(len(schema.names))]
self._buffer_size = record_batch_size
self._record_batches = []
@@ -536,11 +548,18 @@ class _ParquetSink(filebasedsink.FileBasedSink):
def open(self, temp_path):
self._file_handle = super().open(temp_path)
+ if ARROW_MAJOR_VERSION < 4:
+ return pq.ParquetWriter(
+ self._file_handle,
+ self._schema,
+ compression=self._codec,
+
use_deprecated_int96_timestamps=self._use_deprecated_int96_timestamps)
return pq.ParquetWriter(
self._file_handle,
self._schema,
compression=self._codec,
- use_deprecated_int96_timestamps=self._use_deprecated_int96_timestamps)
+ use_deprecated_int96_timestamps=self._use_deprecated_int96_timestamps,
+ use_compliant_nested_type=self._use_compliant_nested_type)
def write_record(self, writer, value):
if len(self._buffer[0]) >= self._buffer_size:
diff --git a/sdks/python/apache_beam/io/parquetio_test.py
b/sdks/python/apache_beam/io/parquetio_test.py
index 0232bac45e0..6f47a0a5558 100644
--- a/sdks/python/apache_beam/io/parquetio_test.py
+++ b/sdks/python/apache_beam/io/parquetio_test.py
@@ -57,6 +57,8 @@ except ImportError:
pl = None
pq = None
+ARROW_MAJOR_VERSION, _, _ = map(int, pa.__version__.split('.'))
+
@unittest.skipIf(pa is None, "PyArrow is not installed.")
@pytest.mark.uses_pyarrow
@@ -109,6 +111,37 @@ class TestParquet(unittest.TestCase):
('favorite_number', pa.timestamp('ns'), False),
('favorite_color', pa.string())])
+ self.RECORDS_NESTED = [{
+ 'items': [
+ {
+ 'name': 'Thomas',
+ 'favorite_number': 1,
+ 'favorite_color': 'blue'
+ },
+ {
+ 'name': 'Henry',
+ 'favorite_number': 3,
+ 'favorite_color': 'green'
+ },
+ ]
+ },
+ {
+ 'items': [
+ {
+ 'name': 'Toby',
+ 'favorite_number': 7,
+ 'favorite_color': 'brown'
+ },
+ ]
+ }]
+
+ self.SCHEMA_NESTED = pa.schema([(
+ 'items',
+ pa.list_(
+ pa.struct([('name', pa.string(), False),
+ ('favorite_number', pa.int64(), False),
+ ('favorite_color', pa.string())])))])
+
def tearDown(self):
shutil.rmtree(self.temp_dir)
@@ -254,6 +287,7 @@ class TestParquet(unittest.TestCase):
1024 * 1024,
1000,
False,
+ False,
'.end',
0,
None,
@@ -314,6 +348,28 @@ class TestParquet(unittest.TestCase):
| Map(json.dumps)
assert_that(readback, equal_to([json.dumps(r) for r in self.RECORDS]))
+ def test_sink_transform_compliant_nested_type(self):
+ if ARROW_MAJOR_VERSION < 4:
+ return unittest.skip(
+ 'Writing with compliant nested type is only '
+ 'supported in pyarrow 4.x and above')
+ with TemporaryDirectory() as tmp_dirname:
+ path = os.path.join(tmp_dirname + 'tmp_filename')
+ with TestPipeline() as p:
+ _ = p \
+ | Create(self.RECORDS_NESTED) \
+ | WriteToParquet(
+ path, self.SCHEMA_NESTED, num_shards=1,
+ shard_name_template='', use_compliant_nested_type=True)
+ with TestPipeline() as p:
+ # json used for stable sortability
+ readback = \
+ p \
+ | ReadFromParquet(path) \
+ | Map(json.dumps)
+ assert_that(
+ readback, equal_to([json.dumps(r) for r in self.RECORDS_NESTED]))
+
def test_batched_read(self):
with TemporaryDirectory() as tmp_dirname:
path = os.path.join(tmp_dirname + "tmp_filename")
@@ -337,7 +393,7 @@ class TestParquet(unittest.TestCase):
param(compression_type='zstd')
])
def test_sink_transform_compressed(self, compression_type):
- if compression_type == 'lz4' and int(pa.__version__.split('.')[0]) == 1:
+ if compression_type == 'lz4' and ARROW_MAJOR_VERSION == 1:
return unittest.skip(
"Writing with LZ4 compression is not supported in "
"pyarrow 1.x")