This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6844be2128d [FLINK-28895][python] Perform RowRowConverter automically
when writing RowData into sink
6844be2128d is described below
commit 6844be2128dd4c6ec62fe6e0b5230d885393e1ce
Author: Juntao Hu <[email protected]>
AuthorDate: Wed Aug 10 10:21:38 2022 +0800
[FLINK-28895][python] Perform RowRowConverter automically when writing
RowData into sink
This closes #20525.
---
.../docs/connectors/datastream/filesystem.md | 23 +++-------
.../docs/connectors/datastream/formats/csv.md | 15 +++----
.../docs/connectors/datastream/filesystem.md | 23 +++-------
.../docs/connectors/datastream/formats/csv.md | 7 +---
flink-python/pom.xml | 4 ++
flink-python/pyflink/datastream/__init__.py | 2 +-
.../pyflink/datastream/connectors/file_system.py | 18 +++++++-
flink-python/pyflink/datastream/formats/csv.py | 14 +------
flink-python/pyflink/datastream/formats/orc.py | 46 +++++++++-----------
flink-python/pyflink/datastream/formats/parquet.py | 49 ++++++++++------------
.../pyflink/datastream/formats/tests/test_csv.py | 2 +-
.../pyflink/datastream/formats/tests/test_orc.py | 32 ++++----------
.../datastream/formats/tests/test_parquet.py | 42 +++++++------------
13 files changed, 106 insertions(+), 171 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/filesystem.md
b/docs/content.zh/docs/connectors/datastream/filesystem.md
index 20d7a0bee38..80f49f4cb02 100644
--- a/docs/content.zh/docs/connectors/datastream/filesystem.md
+++ b/docs/content.zh/docs/connectors/datastream/filesystem.md
@@ -488,27 +488,22 @@ input.sinkTo(sink)
{{< /tab >}}
{{< /tabs >}}
-PyFlink 用户可以使用 `ParquetBulkWriter` 来创建一个将 `Row` 数据写入 Parquet 文件的
`BulkWriterFactory` 。
+PyFlink 用户可以使用 `ParquetBulkWriters` 来创建一个将 `Row` 数据写入 Parquet 文件的
`BulkWriterFactory` 。
```python
row_type = DataTypes.ROW([
DataTypes.FIELD('string', DataTypes.STRING()),
DataTypes.FIELD('int_array', DataTypes.ARRAY(DataTypes.INT()))
])
-row_type_info = Types.ROW_NAMED(
- ['string', 'int_array'],
- [Types.STRING(), Types.LIST(Types.INT())]
-)
+
sink = FileSink.for_bulk_format(
- OUTPUT_DIR, ParquetBulkWriter.for_row_type(
+ OUTPUT_DIR, ParquetBulkWriters.for_row_type(
row_type,
hadoop_config=Configuration(),
utc_timestamp=True,
)
).build()
-# 如果 ds 是一个输出类型为 RowData 的源数据源,可以使用一个 map 来转换为 Row 类型
-ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
-# 否则
+
ds.sink_to(sink)
```
@@ -811,8 +806,7 @@ class PersonVectorizer(schema: String) extends
Vectorizer[Person](schema) {
{{< /tab >}}
{{< /tabs >}}
-PyFlink 用户可以使用 `OrcBulkWriters.for_row_type` 来创建将 `Row` 数据写入 Orc 文件的
`BulkWriterFactory` 。
-注意如果 sink 的前置算子的输出类型为 `RowData` ,例如 CSV source ,则需要先转换为 `Row` 类型。
+PyFlink 用户可以使用 `OrcBulkWriters` 来创建将数据写入 Orc 文件的 `BulkWriterFactory` 。
{{< py_download_link "orc" >}}
@@ -821,10 +815,6 @@ row_type = DataTypes.ROW([
DataTypes.FIELD('name', DataTypes.STRING()),
DataTypes.FIELD('age', DataTypes.INT()),
])
-row_type_info = Types.ROW_NAMED(
- ['name', 'age'],
- [Types.STRING(), Types.INT()]
-)
sink = FileSink.for_bulk_format(
OUTPUT_DIR,
@@ -835,9 +825,6 @@ sink = FileSink.for_bulk_format(
)
).build()
-# 如果 ds 是产生 RowData 的数据源,可以使用一个 map 函数来指定其对应的 Row 类型。
-ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
-# 否则
ds.sink_to(sink)
```
diff --git a/docs/content.zh/docs/connectors/datastream/formats/csv.md
b/docs/content.zh/docs/connectors/datastream/formats/csv.md
index 2d3ef10455f..94aedd824af 100644
--- a/docs/content.zh/docs/connectors/datastream/formats/csv.md
+++ b/docs/content.zh/docs/connectors/datastream/formats/csv.md
@@ -138,20 +138,17 @@ The corresponding CSV file:
Similarly to the `TextLineInputFormat`, `CsvReaderFormat` can be used in both
continues and batch modes (see [TextLineInputFormat]({{< ref
"docs/connectors/datastream/formats/text_files" >}}) for examples).
-For PyFlink users, `CsvBulkWriters` could be used to create
`BulkWriterFactory` to write `Row` records to files in CSV format.
-It should be noted that if the preceding operator of sink is an operator which
produces `RowData` records, e.g. CSV source, it needs to be converted to `Row`
records before writing to sink.
+For PyFlink users, `CsvBulkWriters` could be used to create
`BulkWriterFactory` to write records to files in CSV format.
+
```python
-schema = CsvSchema.builder()
- .add_number_column('id', number_type=DataTypes.BIGINT())
- .add_array_column('array', separator='#', element_type=DataTypes.INT())
- .set_column_separator(',')
+schema = CsvSchema.builder() \
+ .add_number_column('id', number_type=DataTypes.BIGINT()) \
+ .add_array_column('array', separator='#', element_type=DataTypes.INT()) \
+ .set_column_separator(',') \
.build()
sink = FileSink.for_bulk_format(
OUTPUT_DIR, CsvBulkWriters.for_schema(schema)).build()
-# If ds is a source stream producing RowData records, a map could be added to
help converting RowData records into Row records.
-ds.map(lambda e: e, output_type=schema.get_type_info()).sink_to(sink)
-# Else
ds.sink_to(sink)
```
diff --git a/docs/content/docs/connectors/datastream/filesystem.md
b/docs/content/docs/connectors/datastream/filesystem.md
index e5e39646a43..c1f047c6e86 100644
--- a/docs/content/docs/connectors/datastream/filesystem.md
+++ b/docs/content/docs/connectors/datastream/filesystem.md
@@ -486,27 +486,22 @@ input.sinkTo(sink)
{{< /tab >}}
{{< /tabs >}}
-For PyFlink users, `ParquetBulkWriter` could be used to create a
`BulkWriterFactory` that writes `Row`s into Parquet files.
+For PyFlink users, `ParquetBulkWriters` could be used to create a
`BulkWriterFactory` that writes `Row`s into Parquet files.
```python
row_type = DataTypes.ROW([
DataTypes.FIELD('string', DataTypes.STRING()),
DataTypes.FIELD('int_array', DataTypes.ARRAY(DataTypes.INT()))
])
-row_type_info = Types.ROW_NAMED(
- ['string', 'int_array'],
- [Types.STRING(), Types.LIST(Types.INT())]
-)
+
sink = FileSink.for_bulk_format(
- OUTPUT_DIR, ParquetBulkWriter.for_row_type(
+ OUTPUT_DIR, ParquetBulkWriters.for_row_type(
row_type,
hadoop_config=Configuration(),
utc_timestamp=True,
)
).build()
-# If ds is a source stream producing RowData records, a map could be added to
help converting RowData records into Row records.
-ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
-# Else
+
ds.sink_to(sink)
```
@@ -816,8 +811,7 @@ class PersonVectorizer(schema: String) extends
Vectorizer[Person](schema) {
{{< /tab >}}
{{< /tabs >}}
-For PyFlink users, `OrcBulkWriters.for_row_type` could be used to create
`BulkWriterFactory` to write `Row` records to files in Orc format.
-It should be noted that if the preceding operator of sink is an operator
producing `RowData` records, e.g. CSV source, it needs to be converted to `Row`
records before writing to sink.
+For PyFlink users, `OrcBulkWriters` could be used to create
`BulkWriterFactory` to write records to files in Orc format.
{{< py_download_link "orc" >}}
@@ -826,10 +820,6 @@ row_type = DataTypes.ROW([
DataTypes.FIELD('name', DataTypes.STRING()),
DataTypes.FIELD('age', DataTypes.INT()),
])
-row_type_info = Types.ROW_NAMED(
- ['name', 'age'],
- [Types.STRING(), Types.INT()]
-)
sink = FileSink.for_bulk_format(
OUTPUT_DIR,
@@ -840,9 +830,6 @@ sink = FileSink.for_bulk_format(
)
).build()
-# If ds is a source stream producing RowData records, a map could be added to
help converting RowData records into Row records.
-ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
-# Else
ds.sink_to(sink)
```
diff --git a/docs/content/docs/connectors/datastream/formats/csv.md
b/docs/content/docs/connectors/datastream/formats/csv.md
index 60cacc9d5c8..94aedd824af 100644
--- a/docs/content/docs/connectors/datastream/formats/csv.md
+++ b/docs/content/docs/connectors/datastream/formats/csv.md
@@ -138,8 +138,8 @@ The corresponding CSV file:
Similarly to the `TextLineInputFormat`, `CsvReaderFormat` can be used in both
continues and batch modes (see [TextLineInputFormat]({{< ref
"docs/connectors/datastream/formats/text_files" >}}) for examples).
-For PyFlink users, `CsvBulkWriters` could be used to create
`BulkWriterFactory` to write `Row` records to files in CSV format.
-It should be noted that if the preceding operator of sink is an operator which
produces `RowData` records, e.g. CSV source, it needs to be converted to `Row`
records before writing to sink.
+For PyFlink users, `CsvBulkWriters` could be used to create
`BulkWriterFactory` to write records to files in CSV format.
+
```python
schema = CsvSchema.builder() \
.add_number_column('id', number_type=DataTypes.BIGINT()) \
@@ -150,8 +150,5 @@ schema = CsvSchema.builder() \
sink = FileSink.for_bulk_format(
OUTPUT_DIR, CsvBulkWriters.for_schema(schema)).build()
-# If ds is a source stream producing RowData records, a map could be added to
help converting RowData records into Row records.
-ds.map(lambda e: e, output_type=schema.get_type_info()).sink_to(sink)
-# Else
ds.sink_to(sink)
```
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index 5922e98a7e4..cb61c211806 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -512,6 +512,10 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-parquet</artifactId>
</artifactItem>
+ <artifactItem>
+
<groupId>org.apache.flink</groupId>
+
<artifactId>flink-sql-orc</artifactId>
+ </artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
diff --git a/flink-python/pyflink/datastream/__init__.py
b/flink-python/pyflink/datastream/__init__.py
index b9ce763bd53..6c3ba5e9559 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -228,7 +228,7 @@ Classes to define formats used together with source & sink:
- :class:`formats.parquet.ParquetColumnarRowInputFormat`:
A :class:`~connectors.file_system.BulkFormat` to read columnar parquet
files into Row data in
a batch-processing fashion.
- - :class:`formats.parquet.ParquetBulkWriter`:
+ - :class:`formats.parquet.ParquetBulkWriters`:
Convenient builder to create a
:class:`~pyflink.common.serialization.BulkWriterFactory` that
writes Rows with a defined RowType into Parquet files in a batch fashion.
- :class:`formats.parquet.AvroParquetReaders`:
diff --git a/flink-python/pyflink/datastream/connectors/file_system.py
b/flink-python/pyflink/datastream/connectors/file_system.py
index 94fb040d3a8..2494693c9c2 100644
--- a/flink-python/pyflink/datastream/connectors/file_system.py
+++ b/flink-python/pyflink/datastream/connectors/file_system.py
@@ -31,7 +31,7 @@ from pyflink.datastream.connectors.base import
SupportsPreprocessing, StreamTran
from pyflink.datastream.functions import SinkFunction
from pyflink.common.utils import JavaObjectWrapper
from pyflink.java_gateway import get_gateway
-from pyflink.util.java_utils import to_jarray
+from pyflink.util.java_utils import to_jarray, is_instance_of
__all__ = [
'FileCompactor',
@@ -711,10 +711,26 @@ class FileSink(Sink, SupportsPreprocessing):
from pyflink.datastream.data_stream import DataStream
from pyflink.table.types import _to_java_data_type
+ def _check_if_row_data_type(ds) -> bool:
+ j_type_info = ds._j_data_stream.getType()
+ if not is_instance_of(
+ j_type_info,
+ 'org.apache.flink.table.runtime.typeutils.InternalTypeInfo'
+ ):
+ return False
+ return is_instance_of(
+ j_type_info.toLogicalType(),
+ 'org.apache.flink.table.types.logical.RowType'
+ )
+
class RowRowTransformer(StreamTransformer):
def apply(self, ds):
jvm = get_gateway().jvm
+
+ if _check_if_row_data_type(ds):
+ return ds
+
j_map_function =
jvm.org.apache.flink.python.util.PythonConnectorUtils \
.RowRowMapper(_to_java_data_type(row_type))
return
DataStream(ds._j_data_stream.process(j_map_function))
diff --git a/flink-python/pyflink/datastream/formats/csv.py
b/flink-python/pyflink/datastream/formats/csv.py
index 377319493ac..53f85a15441 100644
--- a/flink-python/pyflink/datastream/formats/csv.py
+++ b/flink-python/pyflink/datastream/formats/csv.py
@@ -19,7 +19,7 @@ from typing import Optional, TYPE_CHECKING
from pyflink.common.serialization import BulkWriterFactory,
RowDataBulkWriterFactory, \
SerializationSchema, DeserializationSchema
-from pyflink.common.typeinfo import _from_java_type, TypeInformation
+from pyflink.common.typeinfo import TypeInformation
from pyflink.datastream.connectors.file_system import StreamFormat
from pyflink.java_gateway import get_gateway
@@ -56,15 +56,6 @@ class CsvSchema(object):
"""
return CsvSchemaBuilder()
- def get_type_info(self):
- if self._type_info is None:
- from pyflink.table.types import _to_java_data_type
- jvm = get_gateway().jvm
- j_type_info =
jvm.org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter \
- .toLegacyTypeInfo(_to_java_data_type(self._row_type))
- self._type_info = _from_java_type(j_type_info)
- return self._type_info
-
def size(self):
return self._j_schema.size()
@@ -351,8 +342,7 @@ class CsvBulkWriters(object):
... .build()
>>> sink = FileSink.for_bulk_format(
... OUTPUT_DIR, CsvBulkWriters.for_schema(schema)).build()
- >>> # If ds is a source stream, an identity map before sink is required
- >>> ds.map(lambda e: e,
output_type=schema.get_type_info()).sink_to(sink)
+ >>> ds.sink_to(sink)
.. versionadded:: 1.16.0
"""
diff --git a/flink-python/pyflink/datastream/formats/orc.py
b/flink-python/pyflink/datastream/formats/orc.py
index 459690dadbe..b329eaf6101 100644
--- a/flink-python/pyflink/datastream/formats/orc.py
+++ b/flink-python/pyflink/datastream/formats/orc.py
@@ -36,6 +36,22 @@ class OrcBulkWriters(object):
Convenient builder to create a
:class:`~pyflink.common.serialization.BulkWriterFactory` that
writes records with a predefined schema into Orc files in a batch fashion.
+ Example:
+ ::
+
+ >>> row_type = DataTypes.ROW([
+ ... DataTypes.FIELD('string', DataTypes.STRING()),
+ ... DataTypes.FIELD('int_array', DataTypes.ARRAY(DataTypes.INT()))
+ ... ])
+ >>> sink = FileSink.for_bulk_format(
+ ... OUTPUT_DIR, OrcBulkWriters.for_row_type(
+ ... row_type=row_type,
+ ... writer_properties=Configuration(),
+ ... hadoop_config=Configuration(),
+ ... )
+ ... ).build()
+ >>> ds.sink_to(sink)
+
.. versionadded:: 1.16.0
"""
@@ -48,33 +64,9 @@ class OrcBulkWriters(object):
Create a :class:`~pyflink.common.serialization.BulkWriterFactory` that
writes records
with a predefined schema into Orc files in a batch fashion.
- Example:
- ::
-
- >>> row_type = DataTypes.ROW([
- ... DataTypes.FIELD('string', DataTypes.STRING()),
- ... DataTypes.FIELD('int_array',
DataTypes.ARRAY(DataTypes.INT()))
- ... ])
- >>> row_type_info = Types.ROW_NAMED(
- ... ['string', 'int_array'],
- ... [Types.STRING(), Types.LIST(Types.INT())]
- ... )
- >>> sink = FileSink.for_bulk_format(
- ... OUTPUT_DIR, OrcBulkWriters.for_row_type(
- ... row_type=row_type,
- ... writer_properties=Configuration(),
- ... hadoop_config=Configuration(),
- ... )
- ... ).build()
- >>> ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
-
- Note that in the above example, an identity map to indicate its
RowTypeInfo is necessary
- before ``sink_to`` when ``ds`` is a source stream producing
**RowData** records,
- because RowDataBulkWriterFactory assumes the input record type is Row.
-
- :param row_type: Row type of orc table.
- :param writer_properties: Properties that can be used in ORC
WriterOptions.
- :param hadoop_config: Hadoop configurations used in ORC WriterOptions.
+ :param row_type: The RowType of records, it should match the
RowTypeInfo of Row records.
+ :param writer_properties: Orc writer options.
+ :param hadoop_config: Hadoop configuration.
"""
from pyflink.table.types import RowType
if not isinstance(row_type, RowType):
diff --git a/flink-python/pyflink/datastream/formats/parquet.py
b/flink-python/pyflink/datastream/formats/parquet.py
index 83aef589f9f..b6600d46b04 100644
--- a/flink-python/pyflink/datastream/formats/parquet.py
+++ b/flink-python/pyflink/datastream/formats/parquet.py
@@ -168,6 +168,22 @@ class ParquetBulkWriters(object):
Convenient builder to create a
:class:`~pyflink.common.serialization.BulkWriterFactory` that
writes records with a predefined schema into Parquet files in a batch
fashion.
+ Example:
+ ::
+
+ >>> row_type = DataTypes.ROW([
+ ... DataTypes.FIELD('string', DataTypes.STRING()),
+ ... DataTypes.FIELD('int_array', DataTypes.ARRAY(DataTypes.INT()))
+ ... ])
+ >>> sink = FileSink.for_bulk_format(
+ ... OUTPUT_DIR, ParquetBulkWriters.for_row_type(
+ ... row_type,
+ ... hadoop_config=Configuration(),
+ ... utc_timestamp=True,
+ ... )
+ ... ).build()
+ >>> ds.sink_to(sink)
+
.. versionadded:: 1.16.0
"""
@@ -179,34 +195,11 @@ class ParquetBulkWriters(object):
Create a :class:`~pyflink.common.serialization.BulkWriterFactory` that
writes records
with a predefined schema into Parquet files in a batch fashion.
- Example:
- ::
-
- >>> row_type = DataTypes.ROW([
- ... DataTypes.FIELD('string', DataTypes.STRING()),
- ... DataTypes.FIELD('int_array',
DataTypes.ARRAY(DataTypes.INT()))
- ... ])
- >>> row_type_info = Types.ROW_NAMED(
- ... ['string', 'int_array'],
- ... [Types.STRING(), Types.LIST(Types.INT())]
- ... )
- >>> sink = FileSink.for_bulk_format(
- ... OUTPUT_DIR, ParquetBulkWriters.for_row_type(
- ... row_type,
- ... hadoop_config=Configuration(),
- ... utc_timestamp=True,
- ... )
- ... ).build()
- >>> ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
-
- Note that in the above example, an identity map to indicate its
RowTypeInfo is necessary
- before ``sink_to`` when ``ds`` is a source stream producing
**RowData** records, because
- RowDataBulkWriterFactory assumes the input record type is **Row**.
-
- :param row_type: Row type of parquet table.
- :param hadoop_config: Haodop configurations.
- :param utc_timestamp: Whether to use UTC timezone or local timezone to
the conversion
- between epoch time and LocalDateTime.
+ :param row_type: The RowType of records, it should match the
RowTypeInfo of Row records.
+ :param hadoop_config: Hadoop configuration.
+ :param utc_timestamp: Use UTC timezone or local timezone to the
conversion between epoch
+ time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But
Hive 3.x use UTC
+ timezone.
"""
if not hadoop_config:
hadoop_config = Configuration()
diff --git a/flink-python/pyflink/datastream/formats/tests/test_csv.py
b/flink-python/pyflink/datastream/formats/tests/test_csv.py
index c2756923446..d1c62f618e2 100644
--- a/flink-python/pyflink/datastream/formats/tests/test_csv.py
+++ b/flink-python/pyflink/datastream/formats/tests/test_csv.py
@@ -167,7 +167,7 @@ class FileSinkCsvBulkWriterTests(PyFlinkStreamingTestCase):
sink = FileSink.for_bulk_format(
self.csv_dir_name, CsvBulkWriters.for_schema(schema)
).build()
- ds.map(lambda e: e, output_type=schema.get_type_info()).sink_to(sink)
+ ds.sink_to(sink)
def _read_csv_file(self) -> List[str]:
lines = []
diff --git a/flink-python/pyflink/datastream/formats/tests/test_orc.py
b/flink-python/pyflink/datastream/formats/tests/test_orc.py
index 77ce3b085de..9566d7b079c 100644
--- a/flink-python/pyflink/datastream/formats/tests/test_orc.py
+++ b/flink-python/pyflink/datastream/formats/tests/test_orc.py
@@ -21,7 +21,7 @@ import tempfile
import unittest
from datetime import date, datetime
from decimal import Decimal
-from typing import List, Optional, Tuple
+from typing import List, Tuple
import pandas as pd
@@ -39,7 +39,7 @@ from pyflink.testing.test_case_utils import
PyFlinkStreamingTestCase, to_java_da
@unittest.skipIf(os.environ.get('HADOOP_CLASSPATH') is None,
'Some Hadoop lib is needed for Orc format tests')
-class FileSinkOrcBulkWritersTests(PyFlinkStreamingTestCase):
+class FileSinkOrcBulkWriterTests(PyFlinkStreamingTestCase):
def setUp(self):
super().setUp()
@@ -54,13 +54,8 @@ class FileSinkOrcBulkWritersTests(PyFlinkStreamingTestCase):
_check_orc_basic_results(self, results)
def test_orc_array_write(self):
- (
- row_type,
- row_type_info,
- conversion_row_type_info,
- data,
- ) = _create_parquet_array_row_and_data()
- self._build_orc_job(row_type, row_type_info, data,
conversion_row_type_info)
+ row_type, row_type_info, data = _create_parquet_array_row_and_data()
+ self._build_orc_job(row_type, row_type_info, data)
self.env.execute()
results = self._read_orc_file()
_check_parquet_array_results(self, results)
@@ -72,13 +67,7 @@ class FileSinkOrcBulkWritersTests(PyFlinkStreamingTestCase):
results = self._read_orc_file()
_check_parquet_map_results(self, results)
- def _build_orc_job(
- self,
- row_type: RowType,
- row_type_info: RowTypeInfo,
- data: List[Row],
- conversion_type_info: Optional[RowTypeInfo] = None,
- ):
+ def _build_orc_job(self, row_type: RowType, row_type_info: RowTypeInfo,
data: List[Row]):
jvm = get_gateway().jvm
sink = FileSink.for_bulk_format(
self.orc_dir_name, OrcBulkWriters.for_row_type(row_type)
@@ -90,8 +79,6 @@ class FileSinkOrcBulkWritersTests(PyFlinkStreamingTestCase):
j_list,
row_type_info.get_java_type_info()
))
- if conversion_type_info:
- ds = ds.map(lambda e: e, output_type=conversion_type_info)
ds.sink_to(sink)
def _read_orc_file(self):
@@ -104,7 +91,6 @@ class FileSinkOrcBulkWritersTests(PyFlinkStreamingTestCase):
def _create_orc_basic_row_and_data() -> Tuple[RowType, RowTypeInfo, List[Row]]:
- jvm = get_gateway().jvm
row_type = DataTypes.ROW([
DataTypes.FIELD('char', DataTypes.CHAR(10)),
DataTypes.FIELD('varchar', DataTypes.VARCHAR(10)),
@@ -114,15 +100,15 @@ def _create_orc_basic_row_and_data() -> Tuple[RowType,
RowTypeInfo, List[Row]]:
DataTypes.FIELD('int', DataTypes.INT()),
DataTypes.FIELD('bigint', DataTypes.BIGINT()),
DataTypes.FIELD('double', DataTypes.DOUBLE()),
- DataTypes.FIELD('date', DataTypes.DATE()),
- DataTypes.FIELD('timestamp', DataTypes.TIMESTAMP(3)),
+ DataTypes.FIELD('date', DataTypes.DATE().bridged_to('java.sql.Date')),
+ DataTypes.FIELD('timestamp',
DataTypes.TIMESTAMP(3).bridged_to('java.sql.Timestamp')),
])
row_type_info = Types.ROW_NAMED(
['char', 'varchar', 'bytes', 'boolean', 'decimal', 'int', 'bigint',
'double',
'date', 'timestamp'],
[Types.STRING(), Types.STRING(), Types.PRIMITIVE_ARRAY(Types.BYTE()),
Types.BOOLEAN(),
- Types.BIG_DEC(), Types.INT(), Types.LONG(), Types.DOUBLE(),
- Types.JAVA(jvm.java.time.LocalTime),
Types.JAVA(jvm.java.time.LocalDateTime)]
+ Types.BIG_DEC(), Types.INT(), Types.LONG(), Types.DOUBLE(),
Types.SQL_DATE(),
+ Types.SQL_TIMESTAMP()]
)
data = [Row(
char='char',
diff --git a/flink-python/pyflink/datastream/formats/tests/test_parquet.py
b/flink-python/pyflink/datastream/formats/tests/test_parquet.py
index d7ee2cdd30e..40e8ad3cc97 100644
--- a/flink-python/pyflink/datastream/formats/tests/test_parquet.py
+++ b/flink-python/pyflink/datastream/formats/tests/test_parquet.py
@@ -23,7 +23,7 @@ import tempfile
import time
import unittest
from decimal import Decimal
-from typing import List, Optional, Tuple
+from typing import List, Tuple
import pandas as pd
import pytz
@@ -242,13 +242,8 @@ class
FileSinkParquetBulkWriterTests(PyFlinkStreamingTestCase):
_check_parquet_basic_results(self, results)
def test_parquet_row_data_array_write(self):
- (
- row_type,
- row_type_info,
- conversion_row_type_info,
- data,
- ) = _create_parquet_array_row_and_data()
- self._build_parquet_job(row_type, row_type_info, data,
conversion_row_type_info)
+ row_type, row_type_info, data = _create_parquet_array_row_and_data()
+ self._build_parquet_job(row_type, row_type_info, data)
self.env.execute('test_parquet_row_data_array_write')
results = self._read_parquet_file()
_check_parquet_array_results(self, results)
@@ -262,19 +257,11 @@ class
FileSinkParquetBulkWriterTests(PyFlinkStreamingTestCase):
results = self._read_parquet_file()
_check_parquet_map_results(self, results)
- def _build_parquet_job(
- self,
- row_type: RowType,
- row_type_info: RowTypeInfo,
- data: List[Row],
- conversion_type_info: Optional[RowTypeInfo] = None,
- ):
+ def _build_parquet_job(self, row_type: RowType, row_type_info:
RowTypeInfo, data: List[Row]):
sink = FileSink.for_bulk_format(
self.parquet_dir_name, ParquetBulkWriters.for_row_type(row_type,
utc_timestamp=True)
).build()
ds = self.env.from_collection(data, type_info=row_type_info)
- if conversion_type_info:
- ds = ds.map(lambda e: e, output_type=conversion_type_info)
ds.sink_to(sink)
def _read_parquet_file(self):
@@ -380,10 +367,16 @@ def _check_parquet_basic_results(test, results):
)
-def _create_parquet_array_row_and_data() -> Tuple[RowType, RowTypeInfo,
RowTypeInfo, List[Row]]:
+def _create_parquet_array_row_and_data() -> Tuple[RowType, RowTypeInfo,
List[Row]]:
row_type = DataTypes.ROW([
- DataTypes.FIELD('string_array', DataTypes.ARRAY(DataTypes.STRING())),
- DataTypes.FIELD('int_array', DataTypes.ARRAY(DataTypes.INT())),
+ DataTypes.FIELD(
+ 'string_array',
+
DataTypes.ARRAY(DataTypes.STRING()).bridged_to('java.util.ArrayList')
+ ),
+ DataTypes.FIELD(
+ 'int_array',
+ DataTypes.ARRAY(DataTypes.INT()).bridged_to('java.util.ArrayList')
+ ),
])
row_type_info = Types.ROW_NAMED([
'string_array',
@@ -392,18 +385,11 @@ def _create_parquet_array_row_and_data() ->
Tuple[RowType, RowTypeInfo, RowTypeI
Types.LIST(Types.STRING()),
Types.LIST(Types.INT()),
])
- conversion_row_type_info = Types.ROW_NAMED([
- 'string_array',
- 'int_array',
- ], [
- Types.OBJECT_ARRAY(Types.STRING()),
- Types.OBJECT_ARRAY(Types.INT()),
- ])
data = [Row(
string_array=['a', 'b', 'c'],
int_array=[1, 2, 3],
)]
- return row_type, row_type_info, conversion_row_type_info, data
+ return row_type, row_type_info, data
def _check_parquet_array_results(test, results):