peridotml commented on code in PR #23030:
URL: https://github.com/apache/beam/pull/23030#discussion_r967534919
##########
sdks/python/apache_beam/io/parquetio.py:
##########
@@ -612,3 +613,111 @@ def _flush_buffer(self):
if b is not None:
size = size + b.size
self._record_batches_byte_size = self._record_batches_byte_size + size
+
+
+class WriteToParquetBatched(PTransform):
+ """Initialize a WriteToParquetBatched transform.
+
+ Writes parquet files from a :class:`~apache_beam.pvalue.PCollection` of
+ batches. Each batch is a pa.Table. Schema must be specified like the
example below.
+ """
+ def __init__(
+ self,
+ file_path_prefix,
+ schema=None,
+ codec='none',
+ use_deprecated_int96_timestamps=False,
+ use_compliant_nested_type=False,
+ file_name_suffix='',
+ num_shards=0,
+ shard_name_template=None,
+ mime_type='application/x-parquet',
+ ):
+ super().__init__()
+ self._sink = _BatchedParquetSink(
+ file_path_prefix=file_path_prefix,
+ schema=schema,
+ codec=codec,
+ use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
+ use_compliant_nested_type=use_compliant_nested_type,
+ file_name_suffix=file_name_suffix,
+ num_shards=num_shards,
+ shard_name_template=shard_name_template,
+ mime_type=mime_type,
+ )
+
+ def expand(self, pcoll):
+ return pcoll | Write(self._sink)
+
+ def display_data(self):
+ return {'sink_dd': self._sink}
+
+
+class _BatchedParquetSink(filebasedsink.FileBasedSink):
+ """A sink for parquet files from batches."""
+ def __init__(
+ self,
+ file_path_prefix,
+ schema,
+ codec,
+ use_deprecated_int96_timestamps,
+ use_compliant_nested_type,
+ file_name_suffix,
+ num_shards,
+ shard_name_template,
+ mime_type):
+ super().__init__(
+ file_path_prefix,
+ file_name_suffix=file_name_suffix,
+ num_shards=num_shards,
+ shard_name_template=shard_name_template,
+ coder=None,
+ mime_type=mime_type,
+ # Compression happens at the block level using the supplied codec, and
+ # not at the file level.
+ compression_type=CompressionTypes.UNCOMPRESSED)
+ self._schema = schema
+ self._codec = codec
+ if ARROW_MAJOR_VERSION == 1 and self._codec.lower() == "lz4":
+ raise ValueError(
+ "Due to ARROW-9424, writing with LZ4 compression is not supported in
"
+ "pyarrow 1.x, please use a different pyarrow version or a different "
+ f"codec. Your pyarrow version: {pa.__version__}")
+ self._use_deprecated_int96_timestamps = use_deprecated_int96_timestamps
+ if use_compliant_nested_type and ARROW_MAJOR_VERSION < 4:
+ raise ValueError(
+ "With ARROW-11497, use_compliant_nested_type is only supported in "
+ "pyarrow version >= 4.x, please use a different pyarrow version. "
+ f"Your pyarrow version: {pa.__version__}")
+ self._use_compliant_nested_type = use_compliant_nested_type
+ self._file_handle = None
+
+ def open(self, temp_path):
+ self._file_handle = super().open(temp_path)
+ if ARROW_MAJOR_VERSION < 4:
+ return pq.ParquetWriter(
+ self._file_handle,
+ self._schema,
+ compression=self._codec,
+
use_deprecated_int96_timestamps=self._use_deprecated_int96_timestamps)
+ return pq.ParquetWriter(
+ self._file_handle,
+ self._schema,
+ compression=self._codec,
+ use_deprecated_int96_timestamps=self._use_deprecated_int96_timestamps,
+ use_compliant_nested_type=self._use_compliant_nested_type)
Review Comment:
@TheNeuralBit Brilliant idea! I will implement it this weekend.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]