TheNeuralBit commented on code in PR #23030:
URL: https://github.com/apache/beam/pull/23030#discussion_r964915520
##########
sdks/python/apache_beam/io/parquetio.py:
##########
@@ -612,3 +613,111 @@ def _flush_buffer(self):
if b is not None:
size = size + b.size
self._record_batches_byte_size = self._record_batches_byte_size + size
+
+
+class WriteToParquetBatched(PTransform):
+ """Initialize a WriteToParquetBatched transform.
+
+ Writes parquet files from a :class:`~apache_beam.pvalue.PCollection` of
+ batches. Each batch is a pa.Table. Schema must be specified like the
example below.
+ """
+ def __init__(
+ self,
+ file_path_prefix,
+ schema=None,
+ codec='none',
+ use_deprecated_int96_timestamps=False,
+ use_compliant_nested_type=False,
+ file_name_suffix='',
+ num_shards=0,
+ shard_name_template=None,
+ mime_type='application/x-parquet',
+ ):
+ super().__init__()
+ self._sink = _BatchedParquetSink(
+ file_path_prefix=file_path_prefix,
+ schema=schema,
+ codec=codec,
+ use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
+ use_compliant_nested_type=use_compliant_nested_type,
+ file_name_suffix=file_name_suffix,
+ num_shards=num_shards,
+ shard_name_template=shard_name_template,
+ mime_type=mime_type,
+ )
+
+ def expand(self, pcoll):
+ return pcoll | Write(self._sink)
+
+ def display_data(self):
+ return {'sink_dd': self._sink}
+
+
+class _BatchedParquetSink(filebasedsink.FileBasedSink):
+ """A sink for parquet files from batches."""
+ def __init__(
+ self,
+ file_path_prefix,
+ schema,
+ codec,
+ use_deprecated_int96_timestamps,
+ use_compliant_nested_type,
+ file_name_suffix,
+ num_shards,
+ shard_name_template,
+ mime_type):
+ super().__init__(
+ file_path_prefix,
+ file_name_suffix=file_name_suffix,
+ num_shards=num_shards,
+ shard_name_template=shard_name_template,
+ coder=None,
+ mime_type=mime_type,
+ # Compression happens at the block level using the supplied codec, and
+ # not at the file level.
+ compression_type=CompressionTypes.UNCOMPRESSED)
+ self._schema = schema
+ self._codec = codec
+ if ARROW_MAJOR_VERSION == 1 and self._codec.lower() == "lz4":
+ raise ValueError(
+ "Due to ARROW-9424, writing with LZ4 compression is not supported in
"
+ "pyarrow 1.x, please use a different pyarrow version or a different "
+ f"codec. Your pyarrow version: {pa.__version__}")
+ self._use_deprecated_int96_timestamps = use_deprecated_int96_timestamps
+ if use_compliant_nested_type and ARROW_MAJOR_VERSION < 4:
+ raise ValueError(
+ "With ARROW-11497, use_compliant_nested_type is only supported in "
+ "pyarrow version >= 4.x, please use a different pyarrow version. "
+ f"Your pyarrow version: {pa.__version__}")
+ self._use_compliant_nested_type = use_compliant_nested_type
+ self._file_handle = None
+
+ def open(self, temp_path):
+ self._file_handle = super().open(temp_path)
+ if ARROW_MAJOR_VERSION < 4:
+ return pq.ParquetWriter(
+ self._file_handle,
+ self._schema,
+ compression=self._codec,
+
use_deprecated_int96_timestamps=self._use_deprecated_int96_timestamps)
+ return pq.ParquetWriter(
+ self._file_handle,
+ self._schema,
+ compression=self._codec,
+ use_deprecated_int96_timestamps=self._use_deprecated_int96_timestamps,
+ use_compliant_nested_type=self._use_compliant_nested_type)
Review Comment:
Could you try to re-use this for the element-wise Parquet sink? I think we
could structure this similar to ReadFromParquet andReadFromParquetBatched. They
both use the same underlying source that produces batches, but the former adds
an additional transform to explode the batches into individual dictionaries:
https://github.com/apache/beam/blob/26e5d1c254de24b217e9e049733a3da778d4aa7a/sdks/python/apache_beam/io/parquetio.py#L204-L205
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]