Vancior commented on code in PR #20499:
URL: https://github.com/apache/flink/pull/20499#discussion_r941101664
##########
flink-python/pyflink/datastream/formats/parquet.py:
##########
@@ -155,3 +158,54 @@ def for_generic_record(schema: 'AvroSchema') ->
'BulkWriterFactory':
jvm = get_gateway().jvm
JAvroParquetWriters =
jvm.org.apache.flink.formats.parquet.avro.AvroParquetWriters
return
BulkWriterFactory(JAvroParquetWriters.forGenericRecord(schema._j_schema))
+
+
+class ParquetRowDataWriter(object):
+ """
+ Convenient builder to create a :class:`BulkWriterFactory` that writes Rows
with a defined
+ :class:`RowType` into Parquet files in a batch fashion.
+
+ .. versionadded:: 1.16.0
+ """
+
+ @staticmethod
+ def for_row_type(row_type: RowType, hadoop_config: Optional[Configuration]
= None,
+ utc_timestamp: bool = False) -> 'BulkWriterFactory':
+ """
+ Create a :class:`RowDataBulkWriterFactory` that writes Rows records
with a defined
+ :class:`RowType` into Parquet files in a batch fashion.
+
+ Example:
+ ::
+
+ >>> row_type = DataTypes.ROW([
+ ... DataTypes.FIELD('string', DataTypes.STRING()),
+ ... DataTypes.FIELD('int_array',
DataTypes.ARRAY(DataTypes.INT()))
+ ... ])
+ >>> row_type_info = Types.ROW_NAMED(
+ ... ['string', 'int_array'],
+ ... [Types.STRING(), Types.LIST(Types.INT())]
+ ... )
+ >>> sink = FileSink.for_bulk_format(
+ ... OUTPUT_DIR, ParquetRowDataWriter.for_row_type(
+ ... row_type,
+ ... hadoop_config=Configuration(),
+ ... utc_timestamp=True,
+ ... )
+ ... ).build()
Review Comment:
Why no `build` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]