This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new de8ff096a53 [FLINK-28862][python][format/parquet] Support 
ParquetBulkWriter
de8ff096a53 is described below

commit de8ff096a5344d85eb9be497902b99dd4b24e2a9
Author: Juntao Hu <[email protected]>
AuthorDate: Mon Aug 8 16:23:45 2022 +0800

    [FLINK-28862][python][format/parquet] Support ParquetBulkWriter
    
    This closes #20499.
---
 .../docs/connectors/datastream/filesystem.md       |  24 ++
 .../docs/connectors/datastream/formats/parquet.md  |   2 +-
 .../docs/connectors/datastream/filesystem.md       |  24 ++
 .../docs/connectors/datastream/formats/parquet.md  |   2 +-
 flink-python/pyflink/common/typeinfo.py            |   4 +-
 flink-python/pyflink/datastream/__init__.py        |  17 +-
 flink-python/pyflink/datastream/formats/parquet.py | 143 +++++++----
 .../datastream/formats/tests/test_parquet.py       | 273 ++++++++++++++++++---
 flink-python/pyflink/datastream/utils.py           |  10 +-
 flink-python/pyflink/fn_execution/coders.py        |  12 +-
 flink-python/pyflink/table/types.py                |  11 +-
 flink-python/pyflink/testing/test_case_utils.py    |  36 ++-
 .../flink/streaming/api/utils/PythonTypeUtils.java |   2 +-
 13 files changed, 453 insertions(+), 107 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/filesystem.md 
b/docs/content.zh/docs/connectors/datastream/filesystem.md
index 76e77a1a662..f5468862a53 100644
--- a/docs/content.zh/docs/connectors/datastream/filesystem.md
+++ b/docs/content.zh/docs/connectors/datastream/filesystem.md
@@ -488,6 +488,30 @@ input.sinkTo(sink)
 {{< /tab >}}
 {{< /tabs >}}
 
+PyFlink 用户可以使用 `ParquetBulkWriter` 来创建一个将 `Row` 数据写入 Parquet 文件的 
`BulkWriterFactory` 。
+
+```python
+row_type = DataTypes.ROW([
+    DataTypes.FIELD('string', DataTypes.STRING()),
+    DataTypes.FIELD('int_array', DataTypes.ARRAY(DataTypes.INT()))
+])
+row_type_info = Types.ROW_NAMED(
+    ['string', 'int_array'],
+    [Types.STRING(), Types.LIST(Types.INT())]
+)
+sink = FileSink.for_bulk_format(
+    OUTPUT_DIR, ParquetBulkWriter.for_row_type(
+        row_type,
+        hadoop_config=Configuration(),
+        utc_timestamp=True,
+    )
+).build()
+# 如果 ds 是一个输出类型为 RowData 的源数据源,可以使用一个 map 来转换为 Row 类型
+ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
+# 否则
+ds.sink_to(sink)
+```
+
 <a name="avro-format"></a>
 
 ##### Avro Format
diff --git a/docs/content.zh/docs/connectors/datastream/formats/parquet.md 
b/docs/content.zh/docs/connectors/datastream/formats/parquet.md
index b8182fac55f..40afa9cdbd7 100644
--- a/docs/content.zh/docs/connectors/datastream/formats/parquet.md
+++ b/docs/content.zh/docs/connectors/datastream/formats/parquet.md
@@ -184,8 +184,8 @@ row_type = DataTypes.ROW([
     DataTypes.FIELD('f99', DataTypes.VARCHAR()),
 ])
 source = FileSource.for_bulk_file_format(ParquetColumnarRowInputFormat(
-    hadoop_config=Configuration(),
     row_type=row_type,
+    hadoop_config=Configuration(),
     batch_size=500,
     is_utc_timestamp=False,
     is_case_sensitive=True,
diff --git a/docs/content/docs/connectors/datastream/filesystem.md 
b/docs/content/docs/connectors/datastream/filesystem.md
index b024052827d..f5a4517aa95 100644
--- a/docs/content/docs/connectors/datastream/filesystem.md
+++ b/docs/content/docs/connectors/datastream/filesystem.md
@@ -486,6 +486,30 @@ input.sinkTo(sink)
 {{< /tab >}}
 {{< /tabs >}}
 
+For PyFlink users, `ParquetBulkWriter` could be used to create a 
`BulkWriterFactory` that writes `Row`s into Parquet files.
+
+```python
+row_type = DataTypes.ROW([
+    DataTypes.FIELD('string', DataTypes.STRING()),
+    DataTypes.FIELD('int_array', DataTypes.ARRAY(DataTypes.INT()))
+])
+row_type_info = Types.ROW_NAMED(
+    ['string', 'int_array'],
+    [Types.STRING(), Types.LIST(Types.INT())]
+)
+sink = FileSink.for_bulk_format(
+    OUTPUT_DIR, ParquetBulkWriter.for_row_type(
+        row_type,
+        hadoop_config=Configuration(),
+        utc_timestamp=True,
+    )
+).build()
+# If ds is a source stream producing RowData records, a map could be added to 
help converting RowData records into Row records.
+ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
+# Else
+ds.sink_to(sink)
+```
+
 ##### Avro format
 
 Flink also provides built-in support for writing data into Avro files. A list 
of convenience methods to create
diff --git a/docs/content/docs/connectors/datastream/formats/parquet.md 
b/docs/content/docs/connectors/datastream/formats/parquet.md
index 217a88fc107..9a686c25925 100644
--- a/docs/content/docs/connectors/datastream/formats/parquet.md
+++ b/docs/content/docs/connectors/datastream/formats/parquet.md
@@ -181,8 +181,8 @@ row_type = DataTypes.ROW([
     DataTypes.FIELD('f99', DataTypes.VARCHAR()),
 ])
 source = FileSource.for_bulk_file_format(ParquetColumnarRowInputFormat(
-    hadoop_config=Configuration(),
     row_type=row_type,
+    hadoop_config=Configuration(),
     batch_size=500,
     is_utc_timestamp=False,
     is_case_sensitive=True,
diff --git a/flink-python/pyflink/common/typeinfo.py 
b/flink-python/pyflink/common/typeinfo.py
index 5558fad683e..c1ece76f9d9 100644
--- a/flink-python/pyflink/common/typeinfo.py
+++ b/flink-python/pyflink/common/typeinfo.py
@@ -439,7 +439,7 @@ class RowTypeInfo(TypeInformation):
                     zip(self.get_field_names(), self._field_types, 
self._need_conversion))
             elif isinstance(obj, Row) and hasattr(obj, "_fields"):
                 return (obj.get_row_kind().value,) + tuple(
-                    f.to_internal_type(obj.get(n)) if c else obj.get(n)
+                    f.to_internal_type(obj[n]) if c else obj[n]
                     for n, f, c in
                     zip(self.get_field_names(), self._field_types, 
self._need_conversion))
             elif isinstance(obj, Row):
@@ -463,7 +463,7 @@ class RowTypeInfo(TypeInformation):
                 return (RowKind.INSERT.value,) + tuple(obj.get(n) for n in 
self.get_field_names())
             elif isinstance(obj, Row) and hasattr(obj, "_fields"):
                 return (obj.get_row_kind().value,) + tuple(
-                    obj.get(n) for n in self.get_field_names())
+                    obj[n] for n in self.get_field_names())
             elif isinstance(obj, Row):
                 return (obj.get_row_kind().value,) + tuple(obj)
             elif isinstance(obj, (list, tuple)):
diff --git a/flink-python/pyflink/datastream/__init__.py 
b/flink-python/pyflink/datastream/__init__.py
index 11ebea667f4..4bcf0050446 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -211,20 +211,23 @@ Classes to define source & sink:
 Classes to define formats used together with source & sink:
 
     - :class:`formats.csv.CsvReaderFormat`:
-      A :class:`connectors.file_system.StreamFormat` to read CSV files into 
Row data.
+      A :class:`~connectors.file_system.StreamFormat` to read CSV files into 
Row data.
     - :class:`formats.csv.CsvBulkWriter`:
-      Creates :class:`connectors.file_system.BulkWriterFactory` to write Row 
data into CSV files.
+      Creates :class:`~connectors.file_system.BulkWriterFactory` to write Row 
data into CSV files.
     - :class:`formats.avro.GenericRecordAvroTypeInfo`:
-      A :class:`pyflink.common.typeinfo.TypeInformation` to indicate vanilla 
Python records will be
+      A :class:`~pyflink.common.typeinfo.TypeInformation` to indicate vanilla 
Python records will be
       translated to GenericRecordAvroTypeInfo on the Java side.
     - :class:`formats.avro.AvroInputFormat`:
-      A :class:`connector.file_system.InputFormat` to read avro files in a 
streaming fashion.
+      A :class:`~connector.file_system.InputFormat` to read avro files in a 
streaming fashion.
     - :class:`formats.avro.AvroWriters`:
-      A class to provide :class:`connector.file_system.BulkWriterFactory` to 
write vanilla Python
+      A class to provide :class:`~connector.file_system.BulkWriterFactory` to 
write vanilla Python
       objects into avro files in a batch fashion.
     - :class:`formats.parquet.ParquetColumnarRowInputFormat`:
-      A :class:`connectors.file_system.BulkFormat` to read columnar parquet 
files into Row data in a
-      batch-processing fashion.
+      A :class:`~connectors.file_system.BulkFormat` to read columnar parquet 
files into Row data in
+      a batch-processing fashion.
+    - :class:`formats.parquet.ParquetBulkWriter`:
+      Convenient builder to create a 
:class:`~connectors.file_system.BulkWriterFactory` that writes
+      Rows with a defined RowType into Parquet files in a batch fashion.
     - :class:`formats.parquet.AvroParquetReaders`:
       A convenience builder to create reader format that reads individual Avro 
records from a
       Parquet stream. Only GenericRecord is supported in PyFlink.
diff --git a/flink-python/pyflink/datastream/formats/parquet.py 
b/flink-python/pyflink/datastream/formats/parquet.py
index 39bc6a7d321..687bc35aa4d 100644
--- a/flink-python/pyflink/datastream/formats/parquet.py
+++ b/flink-python/pyflink/datastream/formats/parquet.py
@@ -15,9 +15,13 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
+from typing import Optional
+
 from pyflink.common import Configuration
-from pyflink.datastream.connectors.file_system import StreamFormat, 
BulkFormat, BulkWriterFactory
+from pyflink.datastream.connectors.file_system import StreamFormat, 
BulkFormat, BulkWriterFactory, \
+    RowDataBulkWriterFactory
 from pyflink.datastream.formats.avro import AvroSchema
+from pyflink.datastream.utils import create_hadoop_configuration
 from pyflink.java_gateway import get_gateway
 from pyflink.table.types import RowType, _to_java_data_type
 
@@ -25,7 +29,8 @@ from pyflink.table.types import RowType, _to_java_data_type
 __all__ = [
     'AvroParquetReaders',
     'AvroParquetWriters',
-    'ParquetColumnarRowInputFormat'
+    'ParquetBulkWriter',
+    'ParquetColumnarRowInputFormat',
 ]
 
 
@@ -67,9 +72,53 @@ class AvroParquetReaders(object):
         return 
StreamFormat(JAvroParquetReaders.forGenericRecord(schema._j_schema))
 
 
+class AvroParquetWriters(object):
+    """
+    Convenient builder to create ParquetWriterFactory instances for Avro 
types. Only GenericRecord
+    is supported in PyFlink.
+
+    .. versionadded:: 1.16.0
+    """
+
+    @staticmethod
+    def for_generic_record(schema: 'AvroSchema') -> 'BulkWriterFactory':
+        """
+        Creates a ParquetWriterFactory that accepts and writes Avro generic 
types. The Parquet
+        writers will use the given schema to build and write the columnar data.
+
+        Note that to make this works in PyFlink, you need to declare the 
output type of the
+        predecessor before FileSink to be :class:`GenericRecordAvroTypeInfo`, 
and the predecessor
+        cannot be :meth:`StreamExecutionEnvironment.from_collection`, you can 
add a pass-through map
+        function before the sink, as the example shown below.
+
+        The Python data records should match the Avro schema, and have the 
same behavior with
+        vanilla Python data structure, e.g. an object for Avro array should 
behave like Python list,
+        an object for Avro map should behave like Python dict.
+
+        Example:
+        ::
+
+            >>> env = StreamExecutionEnvironment.get_execution_environment()
+            >>> schema = AvroSchema(JSON_SCHEMA)
+            >>> avro_type_info = GenericRecordAvroTypeInfo(schema)
+            >>> ds = env.from_collection([{'array': [1, 2]}], 
type_info=Types.PICKLED_BYTE_ARRAY())
+            >>> sink = FileSink.for_bulk_format(
+            ...     OUTPUT_DIR, 
AvroParquetWriters.for_generic_record(schema)).build()
+            >>> # A map to indicate its Avro type info is necessary for 
serialization
+            >>> ds.map(lambda e: e, 
output_type=GenericRecordAvroTypeInfo(schema)) \\
+            ...     .sink_to(sink)
+
+        :param schema: The avro schema.
+        :return: The BulkWriterFactory to write generic records into parquet 
files.
+        """
+        jvm = get_gateway().jvm
+        JAvroParquetWriters = 
jvm.org.apache.flink.formats.parquet.avro.AvroParquetWriters
+        return 
BulkWriterFactory(JAvroParquetWriters.forGenericRecord(schema._j_schema))
+
+
 class ParquetColumnarRowInputFormat(BulkFormat):
     """
-    A ParquetVectorizedInputFormat to provide :class:`RowData` iterator. Using 
ColumnarRowData to
+    A ParquetVectorizedInputFormat to provide RowData iterator. Using 
ColumnarRowData to
     provide a row view of column batch. Only **primitive** types are supported 
for a column,
     composite types such as array, map are not supported.
 
@@ -81,10 +130,10 @@ class ParquetColumnarRowInputFormat(BulkFormat):
         ...     DataTypes.FIELD('b', DataTypes.STRING()),
         ... ])
         >>> source = 
FileSource.for_bulk_file_format(ParquetColumnarRowInputFormat(
-        ...     hadoop_config=Configuration(),
         ...     row_type=row_type,
-        ...     batch_size=500,
-        ...     is_utc_timestamp=True,
+        ...     hadoop_config=Configuration(),
+        ...     batch_size=2048,
+        ...     is_utc_timestamp=False,
         ...     is_case_sensitive=True,
         ... ), PARQUET_FILE_PATH).build()
         >>> ds = env.from_source(source, WatermarkStrategy.no_watermarks(), 
"parquet-source")
@@ -92,66 +141,72 @@ class ParquetColumnarRowInputFormat(BulkFormat):
     .. versionadded:: 1.16.0
     """
 
-    def __init__(self, hadoop_config: Configuration, row_type: RowType, 
batch_size: int,
-                 is_utc_timestamp: bool, is_case_sensitive: bool):
+    def __init__(self,
+                 row_type: RowType,
+                 hadoop_config: Optional[Configuration] = None,
+                 batch_size: int = 2048,
+                 is_utc_timestamp: bool = False,
+                 is_case_sensitive: bool = True):
+        if not hadoop_config:
+            hadoop_config = Configuration()
+
         jvm = get_gateway().jvm
         j_row_type = _to_java_data_type(row_type).getLogicalType()
         produced_type_info = jvm.org.apache.flink.table.runtime.typeutils. \
             InternalTypeInfo.of(j_row_type)
         j_parquet_columnar_format = jvm.org.apache.flink.formats.parquet. \
-            
ParquetColumnarRowInputFormat(self._create_hadoop_configuration(hadoop_config),
+            
ParquetColumnarRowInputFormat(create_hadoop_configuration(hadoop_config),
                                           j_row_type, produced_type_info, 
batch_size,
                                           is_utc_timestamp, is_case_sensitive)
         super().__init__(j_parquet_columnar_format)
 
-    @staticmethod
-    def _create_hadoop_configuration(config: Configuration):
-        jvm = get_gateway().jvm
-        hadoop_config = jvm.org.apache.hadoop.conf.Configuration()
-        for k, v in config.to_dict().items():
-            hadoop_config.set(k, v)
-        return hadoop_config
-
 
-class AvroParquetWriters(object):
+class ParquetBulkWriter(object):
     """
-    Convenience builder to create ParquetWriterFactory instances for Avro 
types. Only GenericRecord
-    is supported in PyFlink.
+    Convenient builder to create a :class:`BulkWriterFactory` that writes Rows 
with a defined
+    RowType into Parquet files in a batch fashion.
 
     .. versionadded:: 1.16.0
     """
 
     @staticmethod
-    def for_generic_record(schema: 'AvroSchema') -> 'BulkWriterFactory':
+    def for_row_type(row_type: RowType, hadoop_config: Optional[Configuration] 
= None,
+                     utc_timestamp: bool = False) -> 'BulkWriterFactory':
         """
-        Creates a ParquetWriterFactory that accepts and writes Avro generic 
types. The Parquet
-        writers will use the given schema to build and write the columnar data.
-
-        Note that to make this works in PyFlink, you need to declare the 
output type of the
-        predecessor before FileSink to be :class:`GenericRecordAvroTypeInfo`, 
and the predecessor
-        cannot be :meth:`StreamExecutionEnvironment.from_collection`, you can 
add a pass-through map
-        function before the sink, as the example shown below.
-
-        The Python data records should match the Avro schema, and have the 
same behavior with
-        vanilla Python data structure, e.g. an object for Avro array should 
behave like Python list,
-        an object for Avro map should behave like Python dict.
+        Create a RowDataBulkWriterFactory that writes Rows records with a 
defined RowType into
+        Parquet files in a batch fashion.
 
         Example:
         ::
 
-            >>> env = StreamExecutionEnvironment.get_execution_environment()
-            >>> schema = AvroSchema(JSON_SCHEMA)
-            >>> avro_type_info = GenericRecordAvroTypeInfo(schema)
-            >>> ds = env.from_collection([{'array': [1, 2]}], 
type_info=Types.PICKLED_BYTE_ARRAY())
+            >>> row_type = DataTypes.ROW([
+            ...     DataTypes.FIELD('string', DataTypes.STRING()),
+            ...     DataTypes.FIELD('int_array', 
DataTypes.ARRAY(DataTypes.INT()))
+            ... ])
+            >>> row_type_info = Types.ROW_NAMED(
+            ...     ['string', 'int_array'],
+            ...     [Types.STRING(), Types.LIST(Types.INT())]
+            ... )
             >>> sink = FileSink.for_bulk_format(
-            ...     OUTPUT_DIR, 
AvroParquetWriters.for_generic_record(schema)).build()
-            >>> # A map to indicate its Avro type info is necessary for 
serialization
-            >>> ds.map(lambda e: e, 
output_type=GenericRecordAvroTypeInfo(schema)) \\
-            ...     .sink_to(sink)
+            ...     OUTPUT_DIR, ParquetBulkWriter.for_row_type(
+            ...         row_type,
+            ...         hadoop_config=Configuration(),
+            ...         utc_timestamp=True,
+            ...     )
+            ... ).build()
+            >>> ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
 
-        :param schema: The avro schema.
-        :return: The BulkWriterFactory to write generic records into parquet 
files.
+        Note that in the above example, an identity map to indicate its 
RowTypeInfo is necessary
+        before ``sink_to`` when ``ds`` is a source stream producing 
**RowData** records, because
+        RowDataBulkWriterFactory assumes the input record type is **Row** .
         """
+        if not hadoop_config:
+            hadoop_config = Configuration()
+
         jvm = get_gateway().jvm
-        JAvroParquetWriters = 
jvm.org.apache.flink.formats.parquet.avro.AvroParquetWriters
-        return 
BulkWriterFactory(JAvroParquetWriters.forGenericRecord(schema._j_schema))
+        JParquetRowDataBuilder = 
jvm.org.apache.flink.formats.parquet.row.ParquetRowDataBuilder
+        return 
RowDataBulkWriterFactory(JParquetRowDataBuilder.createWriterFactory(
+            _to_java_data_type(row_type).getLogicalType(),
+            create_hadoop_configuration(hadoop_config),
+            utc_timestamp
+        ), row_type)
diff --git a/flink-python/pyflink/datastream/formats/tests/test_parquet.py 
b/flink-python/pyflink/datastream/formats/tests/test_parquet.py
index b914d185e9f..16bf47edbfe 100644
--- a/flink-python/pyflink/datastream/formats/tests/test_parquet.py
+++ b/flink-python/pyflink/datastream/formats/tests/test_parquet.py
@@ -15,15 +15,23 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
+import calendar
+import datetime
 import glob
 import os
 import tempfile
+import time
 import unittest
-from typing import List
+from decimal import Decimal
+from typing import List, Optional, Tuple
 
-from pyflink.common import Configuration
+import pandas as pd
+import pytz
+from pyflink.common.time import Instant
+
+from pyflink.common import Configuration, Row
+from pyflink.common.typeinfo import RowTypeInfo, Types
 from pyflink.common.watermark_strategy import WatermarkStrategy
-from pyflink.datastream import MapFunction
 from pyflink.datastream.connectors.file_system import FileSource, FileSink
 from pyflink.datastream.formats.tests.test_avro import \
     _create_basic_avro_schema_and_py_objects, 
_check_basic_avro_schema_results, \
@@ -36,11 +44,12 @@ from pyflink.datastream.formats.tests.test_avro import \
     _create_basic_avro_schema_and_records, _import_avro_classes
 from pyflink.datastream.formats.avro import GenericRecordAvroTypeInfo, 
AvroSchema
 from pyflink.datastream.formats.parquet import AvroParquetReaders, 
ParquetColumnarRowInputFormat, \
-    AvroParquetWriters
+    AvroParquetWriters, ParquetBulkWriter
 from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
+from pyflink.datastream.utils import create_hadoop_configuration
 from pyflink.java_gateway import get_gateway
-from pyflink.table.types import RowType, DataTypes
-from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
+from pyflink.table.types import RowType, DataTypes, _to_java_data_type
+from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase, 
to_java_data_structure
 
 
 @unittest.skipIf(os.environ.get('HADOOP_CLASSPATH') is None,
@@ -106,7 +115,7 @@ class 
FileSourceAvroParquetReadersTests(PyFlinkStreamingTestCase):
             WatermarkStrategy.for_monotonous_timestamps(),
             "parquet-source"
         )
-        ds.map(PassThroughMapFunction()).add_sink(self.test_sink)
+        ds.map(lambda e: e).add_sink(self.test_sink)
 
     @staticmethod
     def _create_parquet_avro_file(file_path: str, schema: AvroSchema, records: 
list):
@@ -192,40 +201,234 @@ class 
FileSourceParquetColumnarRowInputFormatTests(PyFlinkStreamingTestCase):
     def setUp(self):
         super().setUp()
         self.test_sink = DataStreamTestSinkFunction()
-        _import_avro_classes()
+        self.parquet_file_name = tempfile.mktemp(suffix='.parquet', 
dir=self.tempdir)
 
-    def test_parquet_columnar_basic(self):
-        parquet_file_name = tempfile.mktemp(suffix='.parquet', 
dir=self.tempdir)
-        schema, records = _create_basic_avro_schema_and_records()
-        FileSourceAvroParquetReadersTests._create_parquet_avro_file(
-            parquet_file_name, schema, records)
-        row_type = DataTypes.ROW([
-            DataTypes.FIELD('null', DataTypes.STRING()),  # DataTypes.NULL 
cannot be serialized
-            DataTypes.FIELD('boolean', DataTypes.BOOLEAN()),
-            DataTypes.FIELD('int', DataTypes.INT()),
-            DataTypes.FIELD('long', DataTypes.BIGINT()),
-            DataTypes.FIELD('float', DataTypes.FLOAT()),
-            DataTypes.FIELD('double', DataTypes.DOUBLE()),
-            DataTypes.FIELD('string', DataTypes.STRING()),
-            DataTypes.FIELD('unknown', DataTypes.STRING())
-        ])
-        self._build_parquet_columnar_job(row_type, parquet_file_name)
-        self.env.execute('test_parquet_columnar_basic')
+    def test_parquet_columnar_basic_read(self):
+        os.environ['TZ'] = 'Asia/Shanghai'
+        time.tzset()
+        row_type, _, data = _create_parquet_basic_row_and_data()
+        _write_row_data_to_parquet_file(self.parquet_file_name, row_type, data)
+        self._build_parquet_columnar_job(row_type)
+        self.env.execute('test_parquet_columnar_basic_read')
         results = self.test_sink.get_results(True, False)
-        _check_basic_avro_schema_results(self, results)
-        self.assertIsNone(results[0]['unknown'])
-        self.assertIsNone(results[1]['unknown'])
+        _check_parquet_basic_results(self, results)
 
-    def _build_parquet_columnar_job(self, row_type: RowType, 
parquet_file_name: str):
+    def _build_parquet_columnar_job(self, row_type: RowType):
         source = FileSource.for_bulk_file_format(
-            ParquetColumnarRowInputFormat(Configuration(), row_type, 10, True, 
True),
-            parquet_file_name
+            ParquetColumnarRowInputFormat(row_type, Configuration(), 10, True, 
False),
+            self.parquet_file_name
         ).build()
         ds = self.env.from_source(source, WatermarkStrategy.no_watermarks(), 
'parquet-source')
-        ds.map(PassThroughMapFunction()).add_sink(self.test_sink)
+        ds.map(lambda e: e).add_sink(self.test_sink)
+
+
[email protected](os.environ.get('HADOOP_CLASSPATH') is None,
+                 'Some Hadoop lib is needed for Parquet RowData format tests')
+class FileSinkParquetBulkWriterTests(PyFlinkStreamingTestCase):
+
+    def setUp(self):
+        super().setUp()
+        # NOTE: parallelism == 1 is required to keep the order of results
+        self.env.set_parallelism(1)
+        self.parquet_dir_name = tempfile.mkdtemp(dir=self.tempdir)
+
+    def test_parquet_row_data_basic_write(self):
+        os.environ['TZ'] = 'Asia/Shanghai'
+        time.tzset()
+        row_type, row_type_info, data = _create_parquet_basic_row_and_data()
+        self._build_parquet_job(row_type, row_type_info, data)
+        self.env.execute('test_parquet_row_data_basic_write')
+        results = self._read_parquet_file()
+        _check_parquet_basic_results(self, results)
+
+    def test_parquet_row_data_array_write(self):
+        (
+            row_type,
+            row_type_info,
+            conversion_row_type_info,
+            data,
+        ) = _create_parquet_array_row_and_data()
+        self._build_parquet_job(row_type, row_type_info, data, 
conversion_row_type_info)
+        self.env.execute('test_parquet_row_data_array_write')
+        results = self._read_parquet_file()
+        _check_parquet_array_results(self, results)
+
+    @unittest.skip('ParquetSchemaConverter in flink-parquet annotate map keys 
as optional, but '
+                   'Arrow restricts them to be required')
+    def test_parquet_row_data_map_write(self):
+        row_type, row_type_info, data = _create_parquet_map_row_and_data()
+        self._build_parquet_job(row_type, row_type_info, data)
+        self.env.execute('test_parquet_row_data_map_write')
+        results = self._read_parquet_file()
+        _check_parquet_map_results(self, results)
+
+    def _build_parquet_job(
+        self,
+        row_type: RowType,
+        row_type_info: RowTypeInfo,
+        data: List[Row],
+        conversion_type_info: Optional[RowTypeInfo] = None,
+    ):
+        sink = FileSink.for_bulk_format(
+            self.parquet_dir_name, ParquetBulkWriter.for_row_type(row_type, 
utc_timestamp=True)
+        ).build()
+        ds = self.env.from_collection(data, type_info=row_type_info)
+        if conversion_type_info:
+            ds = ds.map(lambda e: e, output_type=conversion_type_info)
+        ds.sink_to(sink)
+
+    def _read_parquet_file(self):
+        records = []
+        for file in glob.glob(os.path.join(os.path.join(self.parquet_dir_name, 
'**/*'))):
+            df = pd.read_parquet(file)
+            for i in range(df.shape[0]):
+                records.append(df.loc[i])
+        return records
+
+
+def _write_row_data_to_parquet_file(path: str, row_type: RowType, rows: 
List[Row]):
+    jvm = get_gateway().jvm
+    flink = jvm.org.apache.flink
+
+    j_output_stream = 
flink.core.fs.local.LocalDataOutputStream(jvm.java.io.File(path))
+    j_bulk_writer = 
flink.formats.parquet.row.ParquetRowDataBuilder.createWriterFactory(
+        _to_java_data_type(row_type).getLogicalType(),
+        create_hadoop_configuration(Configuration()),
+        True,
+    ).create(j_output_stream)
+    row_row_converter = flink.table.data.conversion.RowRowConverter.create(
+        _to_java_data_type(row_type)
+    )
+    row_row_converter.open(row_row_converter.getClass().getClassLoader())
+    for row in rows:
+        
j_bulk_writer.addElement(row_row_converter.toInternal(to_java_data_structure(row)))
+    j_bulk_writer.finish()
+
+
+def _create_parquet_basic_row_and_data() -> Tuple[RowType, RowTypeInfo, 
List[Row]]:
+    row_type = DataTypes.ROW([
+        DataTypes.FIELD('char', DataTypes.CHAR(10)),
+        DataTypes.FIELD('varchar', DataTypes.VARCHAR(10)),
+        DataTypes.FIELD('binary', DataTypes.BINARY(10)),
+        DataTypes.FIELD('varbinary', DataTypes.VARBINARY(10)),
+        DataTypes.FIELD('boolean', DataTypes.BOOLEAN()),
+        DataTypes.FIELD('decimal', DataTypes.DECIMAL(2, 0)),
+        DataTypes.FIELD('int', DataTypes.INT()),
+        DataTypes.FIELD('bigint', DataTypes.BIGINT()),
+        DataTypes.FIELD('double', DataTypes.DOUBLE()),
+        DataTypes.FIELD('date', DataTypes.DATE().bridged_to('java.sql.Date')),
+        DataTypes.FIELD('time', DataTypes.TIME().bridged_to('java.sql.Time')),
+        DataTypes.FIELD('timestamp', 
DataTypes.TIMESTAMP(3).bridged_to('java.sql.Timestamp')),
+        DataTypes.FIELD('timestamp_ltz', DataTypes.TIMESTAMP_LTZ(3)),
+    ])
+    row_type_info = Types.ROW_NAMED(
+        ['char', 'varchar', 'binary', 'varbinary', 'boolean', 'decimal', 
'int', 'bigint', 'double',
+         'date', 'time', 'timestamp', 'timestamp_ltz'],
+        [Types.STRING(), Types.STRING(), Types.PRIMITIVE_ARRAY(Types.BYTE()),
+         Types.PRIMITIVE_ARRAY(Types.BYTE()), Types.BOOLEAN(), 
Types.BIG_DEC(), Types.INT(),
+         Types.LONG(), Types.DOUBLE(), Types.SQL_DATE(), Types.SQL_TIME(), 
Types.SQL_TIMESTAMP(),
+         Types.INSTANT()]
+    )
+    datetime_ltz = datetime.datetime(1970, 2, 3, 4, 5, 6, 700000, 
tzinfo=pytz.timezone('UTC'))
+    timestamp_ltz = Instant.of_epoch_milli(
+        (
+            calendar.timegm(datetime_ltz.utctimetuple()) +
+            calendar.timegm(time.localtime(0))
+        ) * 1000 + datetime_ltz.microsecond // 1000
+    )
+    data = [Row(
+        char='char',
+        varchar='varchar',
+        binary=b'binary',
+        varbinary=b'varbinary',
+        boolean=True,
+        decimal=Decimal(1.5),
+        int=2147483647,
+        bigint=-9223372036854775808,
+        double=2e-308,
+        date=datetime.date(1970, 1, 1),
+        time=datetime.time(1, 1, 1),
+        timestamp=datetime.datetime(1970, 1, 2, 3, 4, 5, 600000),
+        timestamp_ltz=timestamp_ltz
+    )]
+    return row_type, row_type_info, data
+
+
+def _check_parquet_basic_results(test, results):
+    row = results[0]
+    test.assertEqual(row['char'], 'char')
+    test.assertEqual(row['varchar'], 'varchar')
+    test.assertEqual(row['binary'], b'binary')
+    test.assertEqual(row['varbinary'], b'varbinary')
+    test.assertEqual(row['boolean'], True)
+    test.assertAlmostEqual(row['decimal'], 2)
+    test.assertEqual(row['int'], 2147483647)
+    test.assertEqual(row['bigint'], -9223372036854775808)
+    test.assertAlmostEqual(row['double'], 2e-308, delta=1e-311)
+    test.assertEqual(row['date'], datetime.date(1970, 1, 1))
+    test.assertEqual(row['time'], datetime.time(1, 1, 1))
+    ts = row['timestamp']
+    if isinstance(ts, pd.Timestamp):
+        ts = ts.to_pydatetime()
+    test.assertEqual(ts, datetime.datetime(1970, 1, 2, 3, 4, 5, 600000))
+    ts_ltz = row['timestamp_ltz']
+    if isinstance(ts_ltz, pd.Timestamp):
+        ts_ltz = 
pytz.timezone('Asia/Shanghai').localize(ts_ltz.to_pydatetime())
+    test.assertEqual(
+        ts_ltz,
+        pytz.timezone('Asia/Shanghai').localize(datetime.datetime(1970, 2, 3, 
12, 5, 6, 700000))
+    )
+
+
+def _create_parquet_array_row_and_data() -> Tuple[RowType, RowTypeInfo, 
RowTypeInfo, List[Row]]:
+    row_type = DataTypes.ROW([
+        DataTypes.FIELD('string_array', DataTypes.ARRAY(DataTypes.STRING())),
+        DataTypes.FIELD('int_array', DataTypes.ARRAY(DataTypes.INT())),
+    ])
+    row_type_info = Types.ROW_NAMED([
+        'string_array',
+        'int_array',
+    ], [
+        Types.LIST(Types.STRING()),
+        Types.LIST(Types.INT()),
+    ])
+    conversion_row_type_info = Types.ROW_NAMED([
+        'string_array',
+        'int_array',
+    ], [
+        Types.OBJECT_ARRAY(Types.STRING()),
+        Types.OBJECT_ARRAY(Types.INT()),
+    ])
+    data = [Row(
+        string_array=['a', 'b', 'c'],
+        int_array=[1, 2, 3],
+    )]
+    return row_type, row_type_info, conversion_row_type_info, data
+
+
+def _check_parquet_array_results(test, results):
+    row = results[0]
+    test.assertEqual(row['string_array'][0], 'a')
+    test.assertEqual(row['string_array'][1], 'b')
+    test.assertEqual(row['string_array'][2], 'c')
+    test.assertEqual(row['int_array'][0], 1)
+    test.assertEqual(row['int_array'][1], 2)
+    test.assertEqual(row['int_array'][2], 3)
+
 
+def _create_parquet_map_row_and_data() -> Tuple[RowType, RowTypeInfo, 
List[Row]]:
+    row_type = DataTypes.ROW([
+        DataTypes.FIELD('map', DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING())),
+    ])
+    row_type_info = Types.ROW_NAMED(['map'], [Types.MAP(Types.INT(), 
Types.STRING())])
+    data = [Row(
+        map={0: 'a', 1: 'b', 2: 'c'}
+    )]
+    return row_type, row_type_info, data
 
-class PassThroughMapFunction(MapFunction):
 
-    def map(self, value):
-        return value
+def _check_parquet_map_results(test, results):
+    m = {k: v for k, v in results[0]['map']}
+    test.assertEqual(m[0], 'a')
+    test.assertEqual(m[1], 'b')
+    test.assertEqual(m[2], 'c')
diff --git a/flink-python/pyflink/datastream/utils.py 
b/flink-python/pyflink/datastream/utils.py
index 73ec0faa1bf..f1bdb186177 100644
--- a/flink-python/pyflink/datastream/utils.py
+++ b/flink-python/pyflink/datastream/utils.py
@@ -20,7 +20,7 @@ import datetime
 import pickle
 from abc import abstractmethod
 
-from pyflink.common import Row, RowKind
+from pyflink.common import Row, RowKind, Configuration
 from pyflink.common.typeinfo import (RowTypeInfo, TupleTypeInfo, Types, 
BasicArrayTypeInfo,
                                      PrimitiveArrayTypeInfo, MapTypeInfo, 
ListTypeInfo,
                                      ObjectArrayTypeInfo, ExternalTypeInfo, 
TypeInformation)
@@ -43,6 +43,14 @@ class JavaObjectWrapper(object):
         return self._j_object
 
 
+def create_hadoop_configuration(config: Configuration):
+    jvm = get_gateway().jvm
+    hadoop_config = jvm.org.apache.hadoop.conf.Configuration()
+    for k, v in config.to_dict().items():
+        hadoop_config.set(k, v)
+    return hadoop_config
+
+
 def convert_to_python_obj(data, type_info):
     if type_info == Types.PICKLED_BYTE_ARRAY():
         return pickle.loads(data)
diff --git a/flink-python/pyflink/fn_execution/coders.py 
b/flink-python/pyflink/fn_execution/coders.py
index de1c27777d3..1527adb4e39 100644
--- a/flink-python/pyflink/fn_execution/coders.py
+++ b/flink-python/pyflink/fn_execution/coders.py
@@ -725,13 +725,17 @@ def from_type_info_proto(type_info):
             return RowCoder(
                 [from_type_info_proto(f.field_type) for f in 
type_info.row_type_info.fields],
                 [f.field_name for f in type_info.row_type_info.fields])
-        elif field_type_name == type_info_name.PRIMITIVE_ARRAY:
+        elif field_type_name in (
+            type_info_name.PRIMITIVE_ARRAY,
+            type_info_name.LIST,
+        ):
             if type_info.collection_element_type.type_name == 
type_info_name.BYTE:
                 return BinaryCoder()
             return 
PrimitiveArrayCoder(from_type_info_proto(type_info.collection_element_type))
-        elif field_type_name in (type_info_name.BASIC_ARRAY,
-                                 type_info_name.OBJECT_ARRAY,
-                                 type_info_name.LIST):
+        elif field_type_name in (
+            type_info_name.BASIC_ARRAY,
+            type_info_name.OBJECT_ARRAY,
+        ):
             return 
GenericArrayCoder(from_type_info_proto(type_info.collection_element_type))
         elif field_type_name == type_info_name.TUPLE:
             return TupleCoder([from_type_info_proto(field_type)
diff --git a/flink-python/pyflink/table/types.py 
b/flink-python/pyflink/table/types.py
index add5b042f7d..3c3d2a33b02 100644
--- a/flink-python/pyflink/table/types.py
+++ b/flink-python/pyflink/table/types.py
@@ -1250,7 +1250,7 @@ class RowType(DataType):
                     for n, f, c in zip(self.names, self.fields, 
self._need_conversion))
             elif isinstance(obj, Row) and hasattr(obj, "_fields"):
                 return (obj.get_row_kind().value,) + tuple(
-                    f.to_sql_type(obj.get(n)) if c else obj.get(n)
+                    f.to_sql_type(obj[n]) if c else obj[n]
                     for n, f, c in zip(self.names, self.fields, 
self._need_conversion))
             elif isinstance(obj, Row):
                 return (obj.get_row_kind().value, ) + tuple(
@@ -1931,6 +1931,12 @@ def _to_java_data_type(data_type: DataType):
     else:
         j_data_type = j_data_type.notNull()
 
+    if data_type._conversion_cls:
+        j_data_type = j_data_type.bridgedTo(
+            
gateway.jvm.org.apache.flink.api.python.shaded.py4j.reflection.ReflectionUtil
+            .classForName(data_type._conversion_cls)
+        )
+
     return j_data_type
 
 
@@ -2581,7 +2587,8 @@ class DataTypes(object):
         This is a shortcut for 
``DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(precision, nullable)``.
 
         :param precision: int, the number of digits of fractional seconds.
-                          It must have a value between 0 and 9 (both 
inclusive). (default: 6)
+                          It must have a value between 0 and 9 (both 
inclusive). (default: 6, only
+                          supports 3 when bridged to DataStream)
         :param nullable: boolean, whether the type can be null (None) or not.
 
         .. seealso:: 
:func:`~DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(precision, nullable)`
diff --git a/flink-python/pyflink/testing/test_case_utils.py 
b/flink-python/pyflink/testing/test_case_utils.py
index 2ddc2b73707..5ea9cf80987 100644
--- a/flink-python/pyflink/testing/test_case_utils.py
+++ b/flink-python/pyflink/testing/test_case_utils.py
@@ -283,6 +283,27 @@ class TestEnv(object):
         return result
 
 
+DATE_EPOCH_ORDINAL = datetime.datetime(1970, 1, 1).toordinal()
+TIME_EPOCH_ORDINAL = calendar.timegm(time.localtime(0)) * 10 ** 3
+
+
+def _date_to_millis(d: datetime.date):
+    return (d.toordinal() - DATE_EPOCH_ORDINAL) * 86400 * 1000
+
+
+def _time_to_millis(t: datetime.time):
+    if t.tzinfo is not None:
+        offset = t.utcoffset()
+        offset = offset if offset else datetime.timedelta()
+        offset_millis = \
+            (offset.days * 86400 + offset.seconds) * 10 ** 3 + 
offset.microseconds // 1000
+    else:
+        offset_millis = TIME_EPOCH_ORDINAL
+    minutes = t.hour * 60 + t.minute
+    seconds = minutes * 60 + t.second
+    return seconds * 10 ** 3 + t.microsecond // 1000 - offset_millis
+
+
 def to_java_data_structure(value):
     jvm = get_gateway().jvm
     if isinstance(value, (int, float, str, bytes)):
@@ -290,12 +311,10 @@ def to_java_data_structure(value):
     elif isinstance(value, Decimal):
         return jvm.java.math.BigDecimal.valueOf(float(value))
     elif isinstance(value, datetime.datetime):
-        local_date_time = jvm.java.time.LocalDateTime.of(
-            value.year, value.month, value.day, value.hour, value.minute, 
value.second,
-            value.microsecond * 1000
-        )
         if value.tzinfo is None:
-            return local_date_time
+            return jvm.java.sql.Timestamp(
+                _date_to_millis(value.date()) + _time_to_millis(value.time())
+            )
         return jvm.java.time.Instant.ofEpochMilli(
             (
                 calendar.timegm(value.utctimetuple()) +
@@ -304,12 +323,11 @@ def to_java_data_structure(value):
             value.microsecond // 1000
         )
     elif isinstance(value, datetime.date):
-        return jvm.java.time.LocalDate.of(value.year, value.month, value.day)
+        return jvm.java.sql.Date(_date_to_millis(value))
     elif isinstance(value, datetime.time):
-        return jvm.java.time.LocalTime.of(value.hour, value.minute, 
value.second,
-                                          value.microsecond * 1000)
+        return jvm.java.sql.Time(_time_to_millis(value))
     elif isinstance(value, Time):
-        return jvm.java.time.LocalTime.of()
+        return jvm.java.sql.Time(value.to_milliseconds())
     elif isinstance(value, Instant):
         return jvm.java.time.Instant.ofEpochMilli(value.to_epoch_milli())
     elif isinstance(value, (list, tuple)):
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
index 2cde298ce55..b717d64b606 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
@@ -418,7 +418,7 @@ public class PythonTypeUtils {
                 InternalTypeInfo<?> internalTypeInfo, ClassLoader 
userClassLoader) {
             ArrayType arrayType = (ArrayType) internalTypeInfo.toLogicalType();
             return FlinkFnApi.TypeInfo.newBuilder()
-                    .setTypeName(FlinkFnApi.TypeInfo.TypeName.LIST)
+                    .setTypeName(FlinkFnApi.TypeInfo.TypeName.OBJECT_ARRAY)
                     .setCollectionElementType(
                             toTypeInfoProto(
                                     
InternalTypeInfo.of(arrayType.getElementType()),

Reply via email to