dianfu commented on code in PR #20499:
URL: https://github.com/apache/flink/pull/20499#discussion_r940833339


##########
flink-python/pyflink/datastream/__init__.py:
##########
@@ -225,6 +225,9 @@
     - :class:`formats.parquet.ParquetColumnarRowInputFormat`:
       A :class:`connectors.file_system.BulkFormat` to read columnar parquet 
files into Row data in a
       batch-processing fashion.
+    - :class:`formats.parquet.ParquetRowDataWriter`:
+      Convenient builder to create a :class:`BulkWriterFactory` that writes 
Rows with a defined

Review Comment:
   ```suggestion
         Convenient builder to create a 
:class:`connector.file_system.BulkWriterFactory` that writes Rows with a defined
   ```



##########
flink-python/pyflink/datastream/__init__.py:
##########
@@ -225,6 +225,9 @@
     - :class:`formats.parquet.ParquetColumnarRowInputFormat`:
       A :class:`connectors.file_system.BulkFormat` to read columnar parquet 
files into Row data in a
       batch-processing fashion.
+    - :class:`formats.parquet.ParquetRowDataWriter`:
+      Convenient builder to create a :class:`BulkWriterFactory` that writes 
Rows with a defined
+      :class:`RowType` into Parquet files in a batch fashion.

Review Comment:
   ```suggestion
         :class:`pyflink.table.types.RowType` into Parquet files in a batch 
fashion.
   ```



##########
flink-python/pyflink/datastream/formats/parquet.py:
##########
@@ -155,3 +158,54 @@ def for_generic_record(schema: 'AvroSchema') -> 
'BulkWriterFactory':
         jvm = get_gateway().jvm
         JAvroParquetWriters = 
jvm.org.apache.flink.formats.parquet.avro.AvroParquetWriters
         return 
BulkWriterFactory(JAvroParquetWriters.forGenericRecord(schema._j_schema))
+
+
+class ParquetRowDataWriter(object):

Review Comment:
   Could we group the classes in this file a bit to make it more readable? e.g. 
in the following order: AvroParquetReaders, AvroParquetWriters, 
ParquetColumnarRowInputFormat,  ParquetBulkWriter.



##########
flink-python/pyflink/datastream/formats/parquet.py:
##########
@@ -155,3 +158,54 @@ def for_generic_record(schema: 'AvroSchema') -> 
'BulkWriterFactory':
         jvm = get_gateway().jvm
         JAvroParquetWriters = 
jvm.org.apache.flink.formats.parquet.avro.AvroParquetWriters
         return 
BulkWriterFactory(JAvroParquetWriters.forGenericRecord(schema._j_schema))
+
+
+class ParquetRowDataWriter(object):
+    """
+    Convenient builder to create a :class:`BulkWriterFactory` that writes Rows 
with a defined
+    :class:`RowType` into Parquet files in a batch fashion.
+
+    .. versionadded:: 1.16.0
+    """
+
+    @staticmethod
+    def for_row_type(row_type: RowType, hadoop_config: Optional[Configuration] 
= None,
+                     utc_timestamp: bool = False) -> 'BulkWriterFactory':
+        """
+        Create a :class:`RowDataBulkWriterFactory` that writes Rows records 
with a defined

Review Comment:
   ```suggestion
           Create a 
:class:`pyflink.datastream.connectors.file_system.RowDataBulkWriterFactory` 
that writes Rows records with a defined
   ```
   
   Need to make sure it could generate a valid link in the documentation.



##########
flink-python/pyflink/datastream/formats/parquet.py:
##########
@@ -155,3 +158,54 @@ def for_generic_record(schema: 'AvroSchema') -> 
'BulkWriterFactory':
         jvm = get_gateway().jvm
         JAvroParquetWriters = 
jvm.org.apache.flink.formats.parquet.avro.AvroParquetWriters
         return 
BulkWriterFactory(JAvroParquetWriters.forGenericRecord(schema._j_schema))
+
+
+class ParquetRowDataWriter(object):
+    """
+    Convenient builder to create a :class:`BulkWriterFactory` that writes Rows 
with a defined
+    :class:`RowType` into Parquet files in a batch fashion.
+
+    .. versionadded:: 1.16.0
+    """
+
+    @staticmethod
+    def for_row_type(row_type: RowType, hadoop_config: Optional[Configuration] 
= None,
+                     utc_timestamp: bool = False) -> 'BulkWriterFactory':
+        """
+        Create a :class:`RowDataBulkWriterFactory` that writes Rows records 
with a defined
+        :class:`RowType` into Parquet files in a batch fashion.
+
+        Example:
+        ::
+
+            >>> row_type = DataTypes.ROW([
+            ...     DataTypes.FIELD('string', DataTypes.STRING()),
+            ...     DataTypes.FIELD('int_array', 
DataTypes.ARRAY(DataTypes.INT()))
+            ... ])
+            >>> row_type_info = Types.ROW_NAMED(
+            ...     ['string', 'int_array'],
+            ...     [Types.STRING(), Types.LIST(Types.INT())]
+            ... )
+            >>> sink = FileSink.for_bulk_format(
+            ...     OUTPUT_DIR, ParquetRowDataWriter.for_row_type(
+            ...         row_type,
+            ...         hadoop_config=Configuration(),
+            ...         utc_timestamp=True,
+            ...     )
+            ... ).build()

Review Comment:
   ```suggestion
               ... )
   ```



##########
flink-python/pyflink/datastream/formats/parquet.py:
##########
@@ -155,3 +158,54 @@ def for_generic_record(schema: 'AvroSchema') -> 
'BulkWriterFactory':
         jvm = get_gateway().jvm
         JAvroParquetWriters = 
jvm.org.apache.flink.formats.parquet.avro.AvroParquetWriters
         return 
BulkWriterFactory(JAvroParquetWriters.forGenericRecord(schema._j_schema))
+
+
+class ParquetRowDataWriter(object):
+    """
+    Convenient builder to create a :class:`BulkWriterFactory` that writes Rows 
with a defined
+    :class:`RowType` into Parquet files in a batch fashion.
+
+    .. versionadded:: 1.16.0
+    """
+
+    @staticmethod
+    def for_row_type(row_type: RowType, hadoop_config: Optional[Configuration] 
= None,
+                     utc_timestamp: bool = False) -> 'BulkWriterFactory':
+        """
+        Create a :class:`RowDataBulkWriterFactory` that writes Rows records 
with a defined
+        :class:`RowType` into Parquet files in a batch fashion.

Review Comment:
   ```suggestion
           :class:`pyflink.table.types.RowType` into Parquet files in a batch 
fashion.
   ```



##########
flink-python/pyflink/datastream/formats/parquet.py:
##########
@@ -155,3 +158,54 @@ def for_generic_record(schema: 'AvroSchema') -> 
'BulkWriterFactory':
         jvm = get_gateway().jvm
         JAvroParquetWriters = 
jvm.org.apache.flink.formats.parquet.avro.AvroParquetWriters
         return 
BulkWriterFactory(JAvroParquetWriters.forGenericRecord(schema._j_schema))
+
+
+class ParquetRowDataWriter(object):

Review Comment:
   Rename it to ParquetBulkWriter to be consistent the other naming conversion, 
e.g. CsvBulkWriter?



##########
flink-python/pyflink/fn_execution/coders.py:
##########
@@ -725,13 +725,17 @@ def from_type_info_proto(type_info):
             return RowCoder(
                 [from_type_info_proto(f.field_type) for f in 
type_info.row_type_info.fields],
                 [f.field_name for f in type_info.row_type_info.fields])
-        elif field_type_name == type_info_name.PRIMITIVE_ARRAY:
+        elif field_type_name in (
+            type_info_name.PRIMITIVE_ARRAY,

Review Comment:
   What's the purpose of this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to