This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6844be2128d [FLINK-28895][python] Perform RowRowConverter automically 
when writing RowData into sink
6844be2128d is described below

commit 6844be2128dd4c6ec62fe6e0b5230d885393e1ce
Author: Juntao Hu <[email protected]>
AuthorDate: Wed Aug 10 10:21:38 2022 +0800

    [FLINK-28895][python] Perform RowRowConverter automically when writing 
RowData into sink
    
    This closes #20525.
---
 .../docs/connectors/datastream/filesystem.md       | 23 +++-------
 .../docs/connectors/datastream/formats/csv.md      | 15 +++----
 .../docs/connectors/datastream/filesystem.md       | 23 +++-------
 .../docs/connectors/datastream/formats/csv.md      |  7 +---
 flink-python/pom.xml                               |  4 ++
 flink-python/pyflink/datastream/__init__.py        |  2 +-
 .../pyflink/datastream/connectors/file_system.py   | 18 +++++++-
 flink-python/pyflink/datastream/formats/csv.py     | 14 +------
 flink-python/pyflink/datastream/formats/orc.py     | 46 +++++++++-----------
 flink-python/pyflink/datastream/formats/parquet.py | 49 ++++++++++------------
 .../pyflink/datastream/formats/tests/test_csv.py   |  2 +-
 .../pyflink/datastream/formats/tests/test_orc.py   | 32 ++++----------
 .../datastream/formats/tests/test_parquet.py       | 42 +++++++------------
 13 files changed, 106 insertions(+), 171 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/filesystem.md 
b/docs/content.zh/docs/connectors/datastream/filesystem.md
index 20d7a0bee38..80f49f4cb02 100644
--- a/docs/content.zh/docs/connectors/datastream/filesystem.md
+++ b/docs/content.zh/docs/connectors/datastream/filesystem.md
@@ -488,27 +488,22 @@ input.sinkTo(sink)
 {{< /tab >}}
 {{< /tabs >}}
 
-PyFlink 用户可以使用 `ParquetBulkWriter` 来创建一个将 `Row` 数据写入 Parquet 文件的 
`BulkWriterFactory` 。
+PyFlink 用户可以使用 `ParquetBulkWriters` 来创建一个将 `Row` 数据写入 Parquet 文件的 
`BulkWriterFactory` 。
 
 ```python
 row_type = DataTypes.ROW([
     DataTypes.FIELD('string', DataTypes.STRING()),
     DataTypes.FIELD('int_array', DataTypes.ARRAY(DataTypes.INT()))
 ])
-row_type_info = Types.ROW_NAMED(
-    ['string', 'int_array'],
-    [Types.STRING(), Types.LIST(Types.INT())]
-)
+
 sink = FileSink.for_bulk_format(
-    OUTPUT_DIR, ParquetBulkWriter.for_row_type(
+    OUTPUT_DIR, ParquetBulkWriters.for_row_type(
         row_type,
         hadoop_config=Configuration(),
         utc_timestamp=True,
     )
 ).build()
-# 如果 ds 是一个输出类型为 RowData 的源数据源,可以使用一个 map 来转换为 Row 类型
-ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
-# 否则
+
 ds.sink_to(sink)
 ```
 
@@ -811,8 +806,7 @@ class PersonVectorizer(schema: String) extends 
Vectorizer[Person](schema) {
 {{< /tab >}}
 {{< /tabs >}}
 
-PyFlink 用户可以使用 `OrcBulkWriters.for_row_type` 来创建将 `Row` 数据写入 Orc 文件的 
`BulkWriterFactory` 。
-注意如果 sink 的前置算子的输出类型为 `RowData` ,例如 CSV source ,则需要先转换为 `Row` 类型。
+PyFlink 用户可以使用 `OrcBulkWriters` 来创建将数据写入 Orc 文件的 `BulkWriterFactory` 。
 
 {{< py_download_link "orc" >}}
 
@@ -821,10 +815,6 @@ row_type = DataTypes.ROW([
     DataTypes.FIELD('name', DataTypes.STRING()),
     DataTypes.FIELD('age', DataTypes.INT()),
 ])
-row_type_info = Types.ROW_NAMED(
-    ['name', 'age'],
-    [Types.STRING(), Types.INT()]
-)
 
 sink = FileSink.for_bulk_format(
     OUTPUT_DIR,
@@ -835,9 +825,6 @@ sink = FileSink.for_bulk_format(
     )
 ).build()
 
-# 如果 ds 是产生 RowData 的数据源,可以使用一个 map 函数来指定其对应的 Row 类型。
-ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
-# 否则
 ds.sink_to(sink)
 ```
 
diff --git a/docs/content.zh/docs/connectors/datastream/formats/csv.md 
b/docs/content.zh/docs/connectors/datastream/formats/csv.md
index 2d3ef10455f..94aedd824af 100644
--- a/docs/content.zh/docs/connectors/datastream/formats/csv.md
+++ b/docs/content.zh/docs/connectors/datastream/formats/csv.md
@@ -138,20 +138,17 @@ The corresponding CSV file:
 
 Similarly to the `TextLineInputFormat`, `CsvReaderFormat` can be used in both 
continues and batch modes (see [TextLineInputFormat]({{< ref 
"docs/connectors/datastream/formats/text_files" >}})  for examples).
 
-For PyFlink users, `CsvBulkWriters` could be used to create 
`BulkWriterFactory` to write `Row` records to files in CSV format.
-It should be noted that if the preceding operator of sink is an operator which 
produces `RowData` records, e.g. CSV source, it needs to be converted to `Row` 
records before writing to sink.
+For PyFlink users, `CsvBulkWriters` could be used to create 
`BulkWriterFactory` to write records to files in CSV format.
+
 ```python
-schema = CsvSchema.builder()
-    .add_number_column('id', number_type=DataTypes.BIGINT())
-    .add_array_column('array', separator='#', element_type=DataTypes.INT())
-    .set_column_separator(',')
+schema = CsvSchema.builder() \
+    .add_number_column('id', number_type=DataTypes.BIGINT()) \
+    .add_array_column('array', separator='#', element_type=DataTypes.INT()) \
+    .set_column_separator(',') \
     .build()
 
 sink = FileSink.for_bulk_format(
     OUTPUT_DIR, CsvBulkWriters.for_schema(schema)).build()
 
-# If ds is a source stream producing RowData records, a map could be added to 
help converting RowData records into Row records.
-ds.map(lambda e: e, output_type=schema.get_type_info()).sink_to(sink)
-# Else
 ds.sink_to(sink)
 ```
diff --git a/docs/content/docs/connectors/datastream/filesystem.md 
b/docs/content/docs/connectors/datastream/filesystem.md
index e5e39646a43..c1f047c6e86 100644
--- a/docs/content/docs/connectors/datastream/filesystem.md
+++ b/docs/content/docs/connectors/datastream/filesystem.md
@@ -486,27 +486,22 @@ input.sinkTo(sink)
 {{< /tab >}}
 {{< /tabs >}}
 
-For PyFlink users, `ParquetBulkWriter` could be used to create a 
`BulkWriterFactory` that writes `Row`s into Parquet files.
+For PyFlink users, `ParquetBulkWriters` could be used to create a 
`BulkWriterFactory` that writes `Row`s into Parquet files.
 
 ```python
 row_type = DataTypes.ROW([
     DataTypes.FIELD('string', DataTypes.STRING()),
     DataTypes.FIELD('int_array', DataTypes.ARRAY(DataTypes.INT()))
 ])
-row_type_info = Types.ROW_NAMED(
-    ['string', 'int_array'],
-    [Types.STRING(), Types.LIST(Types.INT())]
-)
+
 sink = FileSink.for_bulk_format(
-    OUTPUT_DIR, ParquetBulkWriter.for_row_type(
+    OUTPUT_DIR, ParquetBulkWriters.for_row_type(
         row_type,
         hadoop_config=Configuration(),
         utc_timestamp=True,
     )
 ).build()
-# If ds is a source stream producing RowData records, a map could be added to 
help converting RowData records into Row records.
-ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
-# Else
+
 ds.sink_to(sink)
 ```
 
@@ -816,8 +811,7 @@ class PersonVectorizer(schema: String) extends 
Vectorizer[Person](schema) {
 {{< /tab >}}
 {{< /tabs >}}
 
-For PyFlink users, `OrcBulkWriters.for_row_type` could be used to create 
`BulkWriterFactory` to write `Row` records to files in Orc format.
-It should be noted that if the preceding operator of sink is an operator 
producing `RowData` records, e.g. CSV source, it needs to be converted to `Row` 
records before writing to sink.
+For PyFlink users, `OrcBulkWriters` could be used to create 
`BulkWriterFactory` to write records to files in Orc format.
 
 {{< py_download_link "orc" >}}
 
@@ -826,10 +820,6 @@ row_type = DataTypes.ROW([
     DataTypes.FIELD('name', DataTypes.STRING()),
     DataTypes.FIELD('age', DataTypes.INT()),
 ])
-row_type_info = Types.ROW_NAMED(
-    ['name', 'age'],
-    [Types.STRING(), Types.INT()]
-)
 
 sink = FileSink.for_bulk_format(
     OUTPUT_DIR,
@@ -840,9 +830,6 @@ sink = FileSink.for_bulk_format(
     )
 ).build()
 
-# If ds is a source stream producing RowData records, a map could be added to 
help converting RowData records into Row records.
-ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
-# Else
 ds.sink_to(sink)
 ```
 
diff --git a/docs/content/docs/connectors/datastream/formats/csv.md 
b/docs/content/docs/connectors/datastream/formats/csv.md
index 60cacc9d5c8..94aedd824af 100644
--- a/docs/content/docs/connectors/datastream/formats/csv.md
+++ b/docs/content/docs/connectors/datastream/formats/csv.md
@@ -138,8 +138,8 @@ The corresponding CSV file:
 
 Similarly to the `TextLineInputFormat`, `CsvReaderFormat` can be used in both 
continues and batch modes (see [TextLineInputFormat]({{< ref 
"docs/connectors/datastream/formats/text_files" >}})  for examples).
 
-For PyFlink users, `CsvBulkWriters` could be used to create 
`BulkWriterFactory` to write `Row` records to files in CSV format.
-It should be noted that if the preceding operator of sink is an operator which 
produces `RowData` records, e.g. CSV source, it needs to be converted to `Row` 
records before writing to sink.
+For PyFlink users, `CsvBulkWriters` could be used to create 
`BulkWriterFactory` to write records to files in CSV format.
+
 ```python
 schema = CsvSchema.builder() \
     .add_number_column('id', number_type=DataTypes.BIGINT()) \
@@ -150,8 +150,5 @@ schema = CsvSchema.builder() \
 sink = FileSink.for_bulk_format(
     OUTPUT_DIR, CsvBulkWriters.for_schema(schema)).build()
 
-# If ds is a source stream producing RowData records, a map could be added to 
help converting RowData records into Row records.
-ds.map(lambda e: e, output_type=schema.get_type_info()).sink_to(sink)
-# Else
 ds.sink_to(sink)
 ```
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index 5922e98a7e4..cb61c211806 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -512,6 +512,10 @@ under the License.
                                                                        
<groupId>org.apache.flink</groupId>
                                                                        
<artifactId>flink-sql-parquet</artifactId>
                                                                </artifactItem>
+                                                               <artifactItem>
+                                                                       
<groupId>org.apache.flink</groupId>
+                                                                       
<artifactId>flink-sql-orc</artifactId>
+                                                               </artifactItem>
                                                                <artifactItem>
                                                                        
<groupId>org.apache.flink</groupId>
                                                                        
<artifactId>flink-json</artifactId>
diff --git a/flink-python/pyflink/datastream/__init__.py 
b/flink-python/pyflink/datastream/__init__.py
index b9ce763bd53..6c3ba5e9559 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -228,7 +228,7 @@ Classes to define formats used together with source & sink:
     - :class:`formats.parquet.ParquetColumnarRowInputFormat`:
       A :class:`~connectors.file_system.BulkFormat` to read columnar parquet 
files into Row data in
       a batch-processing fashion.
-    - :class:`formats.parquet.ParquetBulkWriter`:
+    - :class:`formats.parquet.ParquetBulkWriters`:
       Convenient builder to create a 
:class:`~pyflink.common.serialization.BulkWriterFactory` that
       writes Rows with a defined RowType into Parquet files in a batch fashion.
     - :class:`formats.parquet.AvroParquetReaders`:
diff --git a/flink-python/pyflink/datastream/connectors/file_system.py 
b/flink-python/pyflink/datastream/connectors/file_system.py
index 94fb040d3a8..2494693c9c2 100644
--- a/flink-python/pyflink/datastream/connectors/file_system.py
+++ b/flink-python/pyflink/datastream/connectors/file_system.py
@@ -31,7 +31,7 @@ from pyflink.datastream.connectors.base import 
SupportsPreprocessing, StreamTran
 from pyflink.datastream.functions import SinkFunction
 from pyflink.common.utils import JavaObjectWrapper
 from pyflink.java_gateway import get_gateway
-from pyflink.util.java_utils import to_jarray
+from pyflink.util.java_utils import to_jarray, is_instance_of
 
 __all__ = [
     'FileCompactor',
@@ -711,10 +711,26 @@ class FileSink(Sink, SupportsPreprocessing):
             from pyflink.datastream.data_stream import DataStream
             from pyflink.table.types import _to_java_data_type
 
+            def _check_if_row_data_type(ds) -> bool:
+                j_type_info = ds._j_data_stream.getType()
+                if not is_instance_of(
+                    j_type_info,
+                    'org.apache.flink.table.runtime.typeutils.InternalTypeInfo'
+                ):
+                    return False
+                return is_instance_of(
+                    j_type_info.toLogicalType(),
+                    'org.apache.flink.table.types.logical.RowType'
+                )
+
             class RowRowTransformer(StreamTransformer):
 
                 def apply(self, ds):
                     jvm = get_gateway().jvm
+
+                    if _check_if_row_data_type(ds):
+                        return ds
+
                     j_map_function = 
jvm.org.apache.flink.python.util.PythonConnectorUtils \
                         .RowRowMapper(_to_java_data_type(row_type))
                     return 
DataStream(ds._j_data_stream.process(j_map_function))
diff --git a/flink-python/pyflink/datastream/formats/csv.py 
b/flink-python/pyflink/datastream/formats/csv.py
index 377319493ac..53f85a15441 100644
--- a/flink-python/pyflink/datastream/formats/csv.py
+++ b/flink-python/pyflink/datastream/formats/csv.py
@@ -19,7 +19,7 @@ from typing import Optional, TYPE_CHECKING
 
 from pyflink.common.serialization import BulkWriterFactory, 
RowDataBulkWriterFactory, \
     SerializationSchema, DeserializationSchema
-from pyflink.common.typeinfo import _from_java_type, TypeInformation
+from pyflink.common.typeinfo import TypeInformation
 from pyflink.datastream.connectors.file_system import StreamFormat
 from pyflink.java_gateway import get_gateway
 
@@ -56,15 +56,6 @@ class CsvSchema(object):
         """
         return CsvSchemaBuilder()
 
-    def get_type_info(self):
-        if self._type_info is None:
-            from pyflink.table.types import _to_java_data_type
-            jvm = get_gateway().jvm
-            j_type_info = 
jvm.org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter \
-                .toLegacyTypeInfo(_to_java_data_type(self._row_type))
-            self._type_info = _from_java_type(j_type_info)
-        return self._type_info
-
     def size(self):
         return self._j_schema.size()
 
@@ -351,8 +342,7 @@ class CsvBulkWriters(object):
         ...     .build()
         >>> sink = FileSink.for_bulk_format(
         ...     OUTPUT_DIR, CsvBulkWriters.for_schema(schema)).build()
-        >>> # If ds is a source stream, an identity map before sink is required
-        >>> ds.map(lambda e: e, 
output_type=schema.get_type_info()).sink_to(sink)
+        >>> ds.sink_to(sink)
 
     .. versionadded:: 1.16.0
     """
diff --git a/flink-python/pyflink/datastream/formats/orc.py 
b/flink-python/pyflink/datastream/formats/orc.py
index 459690dadbe..b329eaf6101 100644
--- a/flink-python/pyflink/datastream/formats/orc.py
+++ b/flink-python/pyflink/datastream/formats/orc.py
@@ -36,6 +36,22 @@ class OrcBulkWriters(object):
     Convenient builder to create a 
:class:`~pyflink.common.serialization.BulkWriterFactory` that
     writes records with a predefined schema into Orc files in a batch fashion.
 
+    Example:
+    ::
+
+        >>> row_type = DataTypes.ROW([
+        ...     DataTypes.FIELD('string', DataTypes.STRING()),
+        ...     DataTypes.FIELD('int_array', DataTypes.ARRAY(DataTypes.INT()))
+        ... ])
+        >>> sink = FileSink.for_bulk_format(
+        ...     OUTPUT_DIR, OrcBulkWriters.for_row_type(
+        ...         row_type=row_type,
+        ...         writer_properties=Configuration(),
+        ...         hadoop_config=Configuration(),
+        ...     )
+        ... ).build()
+        >>> ds.sink_to(sink)
+
     .. versionadded:: 1.16.0
     """
 
@@ -48,33 +64,9 @@ class OrcBulkWriters(object):
         Create a :class:`~pyflink.common.serialization.BulkWriterFactory` that 
writes records
         with a predefined schema into Orc files in a batch fashion.
 
-        Example:
-        ::
-
-            >>> row_type = DataTypes.ROW([
-            ...     DataTypes.FIELD('string', DataTypes.STRING()),
-            ...     DataTypes.FIELD('int_array', 
DataTypes.ARRAY(DataTypes.INT()))
-            ... ])
-            >>> row_type_info = Types.ROW_NAMED(
-            ...     ['string', 'int_array'],
-            ...     [Types.STRING(), Types.LIST(Types.INT())]
-            ... )
-            >>> sink = FileSink.for_bulk_format(
-            ...     OUTPUT_DIR, OrcBulkWriters.for_row_type(
-            ...         row_type=row_type,
-            ...         writer_properties=Configuration(),
-            ...         hadoop_config=Configuration(),
-            ...     )
-            ... ).build()
-            >>> ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
-
-        Note that in the above example, an identity map to indicate its 
RowTypeInfo is necessary
-        before ``sink_to`` when ``ds`` is a source stream producing 
**RowData** records,
-        because RowDataBulkWriterFactory assumes the input record type is Row.
-
-        :param row_type: Row type of orc table.
-        :param writer_properties: Properties that can be used in ORC 
WriterOptions.
-        :param hadoop_config: Hadoop configurations used in ORC WriterOptions.
+        :param row_type: The RowType of records, it should match the 
RowTypeInfo of Row records.
+        :param writer_properties: Orc writer options.
+        :param hadoop_config: Hadoop configuration.
         """
         from pyflink.table.types import RowType
         if not isinstance(row_type, RowType):
diff --git a/flink-python/pyflink/datastream/formats/parquet.py 
b/flink-python/pyflink/datastream/formats/parquet.py
index 83aef589f9f..b6600d46b04 100644
--- a/flink-python/pyflink/datastream/formats/parquet.py
+++ b/flink-python/pyflink/datastream/formats/parquet.py
@@ -168,6 +168,22 @@ class ParquetBulkWriters(object):
     Convenient builder to create a 
:class:`~pyflink.common.serialization.BulkWriterFactory` that
     writes records with a predefined schema into Parquet files in a batch 
fashion.
 
+    Example:
+    ::
+
+        >>> row_type = DataTypes.ROW([
+        ...     DataTypes.FIELD('string', DataTypes.STRING()),
+        ...     DataTypes.FIELD('int_array', DataTypes.ARRAY(DataTypes.INT()))
+        ... ])
+        >>> sink = FileSink.for_bulk_format(
+        ...     OUTPUT_DIR, ParquetBulkWriters.for_row_type(
+        ...         row_type,
+        ...         hadoop_config=Configuration(),
+        ...         utc_timestamp=True,
+        ...     )
+        ... ).build()
+        >>> ds.sink_to(sink)
+
     .. versionadded:: 1.16.0
     """
 
@@ -179,34 +195,11 @@ class ParquetBulkWriters(object):
         Create a :class:`~pyflink.common.serialization.BulkWriterFactory` that 
writes records
         with a predefined schema into Parquet files in a batch fashion.
 
-        Example:
-        ::
-
-            >>> row_type = DataTypes.ROW([
-            ...     DataTypes.FIELD('string', DataTypes.STRING()),
-            ...     DataTypes.FIELD('int_array', 
DataTypes.ARRAY(DataTypes.INT()))
-            ... ])
-            >>> row_type_info = Types.ROW_NAMED(
-            ...     ['string', 'int_array'],
-            ...     [Types.STRING(), Types.LIST(Types.INT())]
-            ... )
-            >>> sink = FileSink.for_bulk_format(
-            ...     OUTPUT_DIR, ParquetBulkWriters.for_row_type(
-            ...         row_type,
-            ...         hadoop_config=Configuration(),
-            ...         utc_timestamp=True,
-            ...     )
-            ... ).build()
-            >>> ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
-
-        Note that in the above example, an identity map to indicate its 
RowTypeInfo is necessary
-        before ``sink_to`` when ``ds`` is a source stream producing 
**RowData** records, because
-        RowDataBulkWriterFactory assumes the input record type is **Row**.
-
-        :param row_type: Row type of parquet table.
-        :param hadoop_config: Haodop configurations.
-        :param utc_timestamp: Whether to use UTC timezone or local timezone to 
the conversion
-                              between epoch time and LocalDateTime.
+        :param row_type: The RowType of records, it should match the 
RowTypeInfo of Row records.
+        :param hadoop_config: Hadoop configuration.
+        :param utc_timestamp: Use UTC timezone or local timezone to the 
conversion between epoch
+            time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But 
Hive 3.x use UTC
+            timezone.
         """
         if not hadoop_config:
             hadoop_config = Configuration()
diff --git a/flink-python/pyflink/datastream/formats/tests/test_csv.py 
b/flink-python/pyflink/datastream/formats/tests/test_csv.py
index c2756923446..d1c62f618e2 100644
--- a/flink-python/pyflink/datastream/formats/tests/test_csv.py
+++ b/flink-python/pyflink/datastream/formats/tests/test_csv.py
@@ -167,7 +167,7 @@ class FileSinkCsvBulkWriterTests(PyFlinkStreamingTestCase):
         sink = FileSink.for_bulk_format(
             self.csv_dir_name, CsvBulkWriters.for_schema(schema)
         ).build()
-        ds.map(lambda e: e, output_type=schema.get_type_info()).sink_to(sink)
+        ds.sink_to(sink)
 
     def _read_csv_file(self) -> List[str]:
         lines = []
diff --git a/flink-python/pyflink/datastream/formats/tests/test_orc.py 
b/flink-python/pyflink/datastream/formats/tests/test_orc.py
index 77ce3b085de..9566d7b079c 100644
--- a/flink-python/pyflink/datastream/formats/tests/test_orc.py
+++ b/flink-python/pyflink/datastream/formats/tests/test_orc.py
@@ -21,7 +21,7 @@ import tempfile
 import unittest
 from datetime import date, datetime
 from decimal import Decimal
-from typing import List, Optional, Tuple
+from typing import List, Tuple
 
 import pandas as pd
 
@@ -39,7 +39,7 @@ from pyflink.testing.test_case_utils import 
PyFlinkStreamingTestCase, to_java_da
 
 @unittest.skipIf(os.environ.get('HADOOP_CLASSPATH') is None,
                  'Some Hadoop lib is needed for Orc format tests')
-class FileSinkOrcBulkWritersTests(PyFlinkStreamingTestCase):
+class FileSinkOrcBulkWriterTests(PyFlinkStreamingTestCase):
 
     def setUp(self):
         super().setUp()
@@ -54,13 +54,8 @@ class FileSinkOrcBulkWritersTests(PyFlinkStreamingTestCase):
         _check_orc_basic_results(self, results)
 
     def test_orc_array_write(self):
-        (
-            row_type,
-            row_type_info,
-            conversion_row_type_info,
-            data,
-        ) = _create_parquet_array_row_and_data()
-        self._build_orc_job(row_type, row_type_info, data, 
conversion_row_type_info)
+        row_type, row_type_info, data = _create_parquet_array_row_and_data()
+        self._build_orc_job(row_type, row_type_info, data)
         self.env.execute()
         results = self._read_orc_file()
         _check_parquet_array_results(self, results)
@@ -72,13 +67,7 @@ class FileSinkOrcBulkWritersTests(PyFlinkStreamingTestCase):
         results = self._read_orc_file()
         _check_parquet_map_results(self, results)
 
-    def _build_orc_job(
-        self,
-        row_type: RowType,
-        row_type_info: RowTypeInfo,
-        data: List[Row],
-        conversion_type_info: Optional[RowTypeInfo] = None,
-    ):
+    def _build_orc_job(self, row_type: RowType, row_type_info: RowTypeInfo, 
data: List[Row]):
         jvm = get_gateway().jvm
         sink = FileSink.for_bulk_format(
             self.orc_dir_name, OrcBulkWriters.for_row_type(row_type)
@@ -90,8 +79,6 @@ class FileSinkOrcBulkWritersTests(PyFlinkStreamingTestCase):
             j_list,
             row_type_info.get_java_type_info()
         ))
-        if conversion_type_info:
-            ds = ds.map(lambda e: e, output_type=conversion_type_info)
         ds.sink_to(sink)
 
     def _read_orc_file(self):
@@ -104,7 +91,6 @@ class FileSinkOrcBulkWritersTests(PyFlinkStreamingTestCase):
 
 
 def _create_orc_basic_row_and_data() -> Tuple[RowType, RowTypeInfo, List[Row]]:
-    jvm = get_gateway().jvm
     row_type = DataTypes.ROW([
         DataTypes.FIELD('char', DataTypes.CHAR(10)),
         DataTypes.FIELD('varchar', DataTypes.VARCHAR(10)),
@@ -114,15 +100,15 @@ def _create_orc_basic_row_and_data() -> Tuple[RowType, 
RowTypeInfo, List[Row]]:
         DataTypes.FIELD('int', DataTypes.INT()),
         DataTypes.FIELD('bigint', DataTypes.BIGINT()),
         DataTypes.FIELD('double', DataTypes.DOUBLE()),
-        DataTypes.FIELD('date', DataTypes.DATE()),
-        DataTypes.FIELD('timestamp', DataTypes.TIMESTAMP(3)),
+        DataTypes.FIELD('date', DataTypes.DATE().bridged_to('java.sql.Date')),
+        DataTypes.FIELD('timestamp', 
DataTypes.TIMESTAMP(3).bridged_to('java.sql.Timestamp')),
     ])
     row_type_info = Types.ROW_NAMED(
         ['char', 'varchar', 'bytes', 'boolean', 'decimal', 'int', 'bigint', 
'double',
          'date', 'timestamp'],
         [Types.STRING(), Types.STRING(), Types.PRIMITIVE_ARRAY(Types.BYTE()), 
Types.BOOLEAN(),
-         Types.BIG_DEC(), Types.INT(), Types.LONG(), Types.DOUBLE(),
-         Types.JAVA(jvm.java.time.LocalTime), 
Types.JAVA(jvm.java.time.LocalDateTime)]
+         Types.BIG_DEC(), Types.INT(), Types.LONG(), Types.DOUBLE(), 
Types.SQL_DATE(),
+         Types.SQL_TIMESTAMP()]
     )
     data = [Row(
         char='char',
diff --git a/flink-python/pyflink/datastream/formats/tests/test_parquet.py 
b/flink-python/pyflink/datastream/formats/tests/test_parquet.py
index d7ee2cdd30e..40e8ad3cc97 100644
--- a/flink-python/pyflink/datastream/formats/tests/test_parquet.py
+++ b/flink-python/pyflink/datastream/formats/tests/test_parquet.py
@@ -23,7 +23,7 @@ import tempfile
 import time
 import unittest
 from decimal import Decimal
-from typing import List, Optional, Tuple
+from typing import List, Tuple
 
 import pandas as pd
 import pytz
@@ -242,13 +242,8 @@ class 
FileSinkParquetBulkWriterTests(PyFlinkStreamingTestCase):
         _check_parquet_basic_results(self, results)
 
     def test_parquet_row_data_array_write(self):
-        (
-            row_type,
-            row_type_info,
-            conversion_row_type_info,
-            data,
-        ) = _create_parquet_array_row_and_data()
-        self._build_parquet_job(row_type, row_type_info, data, 
conversion_row_type_info)
+        row_type, row_type_info, data = _create_parquet_array_row_and_data()
+        self._build_parquet_job(row_type, row_type_info, data)
         self.env.execute('test_parquet_row_data_array_write')
         results = self._read_parquet_file()
         _check_parquet_array_results(self, results)
@@ -262,19 +257,11 @@ class 
FileSinkParquetBulkWriterTests(PyFlinkStreamingTestCase):
         results = self._read_parquet_file()
         _check_parquet_map_results(self, results)
 
-    def _build_parquet_job(
-        self,
-        row_type: RowType,
-        row_type_info: RowTypeInfo,
-        data: List[Row],
-        conversion_type_info: Optional[RowTypeInfo] = None,
-    ):
+    def _build_parquet_job(self, row_type: RowType, row_type_info: 
RowTypeInfo, data: List[Row]):
         sink = FileSink.for_bulk_format(
             self.parquet_dir_name, ParquetBulkWriters.for_row_type(row_type, 
utc_timestamp=True)
         ).build()
         ds = self.env.from_collection(data, type_info=row_type_info)
-        if conversion_type_info:
-            ds = ds.map(lambda e: e, output_type=conversion_type_info)
         ds.sink_to(sink)
 
     def _read_parquet_file(self):
@@ -380,10 +367,16 @@ def _check_parquet_basic_results(test, results):
     )
 
 
-def _create_parquet_array_row_and_data() -> Tuple[RowType, RowTypeInfo, 
RowTypeInfo, List[Row]]:
+def _create_parquet_array_row_and_data() -> Tuple[RowType, RowTypeInfo, 
List[Row]]:
     row_type = DataTypes.ROW([
-        DataTypes.FIELD('string_array', DataTypes.ARRAY(DataTypes.STRING())),
-        DataTypes.FIELD('int_array', DataTypes.ARRAY(DataTypes.INT())),
+        DataTypes.FIELD(
+            'string_array',
+            
DataTypes.ARRAY(DataTypes.STRING()).bridged_to('java.util.ArrayList')
+        ),
+        DataTypes.FIELD(
+            'int_array',
+            DataTypes.ARRAY(DataTypes.INT()).bridged_to('java.util.ArrayList')
+        ),
     ])
     row_type_info = Types.ROW_NAMED([
         'string_array',
@@ -392,18 +385,11 @@ def _create_parquet_array_row_and_data() -> 
Tuple[RowType, RowTypeInfo, RowTypeI
         Types.LIST(Types.STRING()),
         Types.LIST(Types.INT()),
     ])
-    conversion_row_type_info = Types.ROW_NAMED([
-        'string_array',
-        'int_array',
-    ], [
-        Types.OBJECT_ARRAY(Types.STRING()),
-        Types.OBJECT_ARRAY(Types.INT()),
-    ])
     data = [Row(
         string_array=['a', 'b', 'c'],
         int_array=[1, 2, 3],
     )]
-    return row_type, row_type_info, conversion_row_type_info, data
+    return row_type, row_type_info, data
 
 
 def _check_parquet_array_results(test, results):

Reply via email to