This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new de8ff096a53 [FLINK-28862][python][format/parquet] Support
ParquetBulkWriter
de8ff096a53 is described below
commit de8ff096a5344d85eb9be497902b99dd4b24e2a9
Author: Juntao Hu <[email protected]>
AuthorDate: Mon Aug 8 16:23:45 2022 +0800
[FLINK-28862][python][format/parquet] Support ParquetBulkWriter
This closes #20499.
---
.../docs/connectors/datastream/filesystem.md | 24 ++
.../docs/connectors/datastream/formats/parquet.md | 2 +-
.../docs/connectors/datastream/filesystem.md | 24 ++
.../docs/connectors/datastream/formats/parquet.md | 2 +-
flink-python/pyflink/common/typeinfo.py | 4 +-
flink-python/pyflink/datastream/__init__.py | 17 +-
flink-python/pyflink/datastream/formats/parquet.py | 143 +++++++----
.../datastream/formats/tests/test_parquet.py | 273 ++++++++++++++++++---
flink-python/pyflink/datastream/utils.py | 10 +-
flink-python/pyflink/fn_execution/coders.py | 12 +-
flink-python/pyflink/table/types.py | 11 +-
flink-python/pyflink/testing/test_case_utils.py | 36 ++-
.../flink/streaming/api/utils/PythonTypeUtils.java | 2 +-
13 files changed, 453 insertions(+), 107 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/filesystem.md
b/docs/content.zh/docs/connectors/datastream/filesystem.md
index 76e77a1a662..f5468862a53 100644
--- a/docs/content.zh/docs/connectors/datastream/filesystem.md
+++ b/docs/content.zh/docs/connectors/datastream/filesystem.md
@@ -488,6 +488,30 @@ input.sinkTo(sink)
{{< /tab >}}
{{< /tabs >}}
+PyFlink 用户可以使用 `ParquetBulkWriter` 来创建一个将 `Row` 数据写入 Parquet 文件的
`BulkWriterFactory` 。
+
+```python
+row_type = DataTypes.ROW([
+ DataTypes.FIELD('string', DataTypes.STRING()),
+ DataTypes.FIELD('int_array', DataTypes.ARRAY(DataTypes.INT()))
+])
+row_type_info = Types.ROW_NAMED(
+ ['string', 'int_array'],
+ [Types.STRING(), Types.LIST(Types.INT())]
+)
+sink = FileSink.for_bulk_format(
+ OUTPUT_DIR, ParquetBulkWriter.for_row_type(
+ row_type,
+ hadoop_config=Configuration(),
+ utc_timestamp=True,
+ )
+).build()
+# 如果 ds 是一个输出类型为 RowData 的源数据源,可以使用一个 map 来转换为 Row 类型
+ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
+# 否则
+ds.sink_to(sink)
+```
+
<a name="avro-format"></a>
##### Avro Format
diff --git a/docs/content.zh/docs/connectors/datastream/formats/parquet.md
b/docs/content.zh/docs/connectors/datastream/formats/parquet.md
index b8182fac55f..40afa9cdbd7 100644
--- a/docs/content.zh/docs/connectors/datastream/formats/parquet.md
+++ b/docs/content.zh/docs/connectors/datastream/formats/parquet.md
@@ -184,8 +184,8 @@ row_type = DataTypes.ROW([
DataTypes.FIELD('f99', DataTypes.VARCHAR()),
])
source = FileSource.for_bulk_file_format(ParquetColumnarRowInputFormat(
- hadoop_config=Configuration(),
row_type=row_type,
+ hadoop_config=Configuration(),
batch_size=500,
is_utc_timestamp=False,
is_case_sensitive=True,
diff --git a/docs/content/docs/connectors/datastream/filesystem.md
b/docs/content/docs/connectors/datastream/filesystem.md
index b024052827d..f5a4517aa95 100644
--- a/docs/content/docs/connectors/datastream/filesystem.md
+++ b/docs/content/docs/connectors/datastream/filesystem.md
@@ -486,6 +486,30 @@ input.sinkTo(sink)
{{< /tab >}}
{{< /tabs >}}
+For PyFlink users, `ParquetBulkWriter` could be used to create a
`BulkWriterFactory` that writes `Row`s into Parquet files.
+
+```python
+row_type = DataTypes.ROW([
+ DataTypes.FIELD('string', DataTypes.STRING()),
+ DataTypes.FIELD('int_array', DataTypes.ARRAY(DataTypes.INT()))
+])
+row_type_info = Types.ROW_NAMED(
+ ['string', 'int_array'],
+ [Types.STRING(), Types.LIST(Types.INT())]
+)
+sink = FileSink.for_bulk_format(
+ OUTPUT_DIR, ParquetBulkWriter.for_row_type(
+ row_type,
+ hadoop_config=Configuration(),
+ utc_timestamp=True,
+ )
+).build()
+# If ds is a source stream producing RowData records, a map could be added to
help converting RowData records into Row records.
+ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
+# Else
+ds.sink_to(sink)
+```
+
##### Avro format
Flink also provides built-in support for writing data into Avro files. A list
of convenience methods to create
diff --git a/docs/content/docs/connectors/datastream/formats/parquet.md
b/docs/content/docs/connectors/datastream/formats/parquet.md
index 217a88fc107..9a686c25925 100644
--- a/docs/content/docs/connectors/datastream/formats/parquet.md
+++ b/docs/content/docs/connectors/datastream/formats/parquet.md
@@ -181,8 +181,8 @@ row_type = DataTypes.ROW([
DataTypes.FIELD('f99', DataTypes.VARCHAR()),
])
source = FileSource.for_bulk_file_format(ParquetColumnarRowInputFormat(
- hadoop_config=Configuration(),
row_type=row_type,
+ hadoop_config=Configuration(),
batch_size=500,
is_utc_timestamp=False,
is_case_sensitive=True,
diff --git a/flink-python/pyflink/common/typeinfo.py
b/flink-python/pyflink/common/typeinfo.py
index 5558fad683e..c1ece76f9d9 100644
--- a/flink-python/pyflink/common/typeinfo.py
+++ b/flink-python/pyflink/common/typeinfo.py
@@ -439,7 +439,7 @@ class RowTypeInfo(TypeInformation):
zip(self.get_field_names(), self._field_types,
self._need_conversion))
elif isinstance(obj, Row) and hasattr(obj, "_fields"):
return (obj.get_row_kind().value,) + tuple(
- f.to_internal_type(obj.get(n)) if c else obj.get(n)
+ f.to_internal_type(obj[n]) if c else obj[n]
for n, f, c in
zip(self.get_field_names(), self._field_types,
self._need_conversion))
elif isinstance(obj, Row):
@@ -463,7 +463,7 @@ class RowTypeInfo(TypeInformation):
return (RowKind.INSERT.value,) + tuple(obj.get(n) for n in
self.get_field_names())
elif isinstance(obj, Row) and hasattr(obj, "_fields"):
return (obj.get_row_kind().value,) + tuple(
- obj.get(n) for n in self.get_field_names())
+ obj[n] for n in self.get_field_names())
elif isinstance(obj, Row):
return (obj.get_row_kind().value,) + tuple(obj)
elif isinstance(obj, (list, tuple)):
diff --git a/flink-python/pyflink/datastream/__init__.py
b/flink-python/pyflink/datastream/__init__.py
index 11ebea667f4..4bcf0050446 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -211,20 +211,23 @@ Classes to define source & sink:
Classes to define formats used together with source & sink:
- :class:`formats.csv.CsvReaderFormat`:
- A :class:`connectors.file_system.StreamFormat` to read CSV files into
Row data.
+ A :class:`~connectors.file_system.StreamFormat` to read CSV files into
Row data.
- :class:`formats.csv.CsvBulkWriter`:
- Creates :class:`connectors.file_system.BulkWriterFactory` to write Row
data into CSV files.
+ Creates :class:`~connectors.file_system.BulkWriterFactory` to write Row
data into CSV files.
- :class:`formats.avro.GenericRecordAvroTypeInfo`:
- A :class:`pyflink.common.typeinfo.TypeInformation` to indicate vanilla
Python records will be
+ A :class:`~pyflink.common.typeinfo.TypeInformation` to indicate vanilla
Python records will be
translated to GenericRecordAvroTypeInfo on the Java side.
- :class:`formats.avro.AvroInputFormat`:
- A :class:`connector.file_system.InputFormat` to read avro files in a
streaming fashion.
+ A :class:`~connector.file_system.InputFormat` to read avro files in a
streaming fashion.
- :class:`formats.avro.AvroWriters`:
- A class to provide :class:`connector.file_system.BulkWriterFactory` to
write vanilla Python
+ A class to provide :class:`~connector.file_system.BulkWriterFactory` to
write vanilla Python
objects into avro files in a batch fashion.
- :class:`formats.parquet.ParquetColumnarRowInputFormat`:
- A :class:`connectors.file_system.BulkFormat` to read columnar parquet
files into Row data in a
- batch-processing fashion.
+ A :class:`~connectors.file_system.BulkFormat` to read columnar parquet
files into Row data in
+ a batch-processing fashion.
+ - :class:`formats.parquet.ParquetBulkWriter`:
+ Convenient builder to create a
:class:`~connectors.file_system.BulkWriterFactory` that writes
+ Rows with a defined RowType into Parquet files in a batch fashion.
- :class:`formats.parquet.AvroParquetReaders`:
A convenience builder to create reader format that reads individual Avro
records from a
Parquet stream. Only GenericRecord is supported in PyFlink.
diff --git a/flink-python/pyflink/datastream/formats/parquet.py
b/flink-python/pyflink/datastream/formats/parquet.py
index 39bc6a7d321..687bc35aa4d 100644
--- a/flink-python/pyflink/datastream/formats/parquet.py
+++ b/flink-python/pyflink/datastream/formats/parquet.py
@@ -15,9 +15,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
+from typing import Optional
+
from pyflink.common import Configuration
-from pyflink.datastream.connectors.file_system import StreamFormat,
BulkFormat, BulkWriterFactory
+from pyflink.datastream.connectors.file_system import StreamFormat,
BulkFormat, BulkWriterFactory, \
+ RowDataBulkWriterFactory
from pyflink.datastream.formats.avro import AvroSchema
+from pyflink.datastream.utils import create_hadoop_configuration
from pyflink.java_gateway import get_gateway
from pyflink.table.types import RowType, _to_java_data_type
@@ -25,7 +29,8 @@ from pyflink.table.types import RowType, _to_java_data_type
__all__ = [
'AvroParquetReaders',
'AvroParquetWriters',
- 'ParquetColumnarRowInputFormat'
+ 'ParquetBulkWriter',
+ 'ParquetColumnarRowInputFormat',
]
@@ -67,9 +72,53 @@ class AvroParquetReaders(object):
return
StreamFormat(JAvroParquetReaders.forGenericRecord(schema._j_schema))
+class AvroParquetWriters(object):
+ """
+ Convenient builder to create ParquetWriterFactory instances for Avro
types. Only GenericRecord
+ is supported in PyFlink.
+
+ .. versionadded:: 1.16.0
+ """
+
+ @staticmethod
+ def for_generic_record(schema: 'AvroSchema') -> 'BulkWriterFactory':
+ """
+ Creates a ParquetWriterFactory that accepts and writes Avro generic
types. The Parquet
+ writers will use the given schema to build and write the columnar data.
+
+ Note that to make this works in PyFlink, you need to declare the
output type of the
+ predecessor before FileSink to be :class:`GenericRecordAvroTypeInfo`,
and the predecessor
+ cannot be :meth:`StreamExecutionEnvironment.from_collection`, you can
add a pass-through map
+ function before the sink, as the example shown below.
+
+ The Python data records should match the Avro schema, and have the
same behavior with
+ vanilla Python data structure, e.g. an object for Avro array should
behave like Python list,
+ an object for Avro map should behave like Python dict.
+
+ Example:
+ ::
+
+ >>> env = StreamExecutionEnvironment.get_execution_environment()
+ >>> schema = AvroSchema(JSON_SCHEMA)
+ >>> avro_type_info = GenericRecordAvroTypeInfo(schema)
+ >>> ds = env.from_collection([{'array': [1, 2]}],
type_info=Types.PICKLED_BYTE_ARRAY())
+ >>> sink = FileSink.for_bulk_format(
+ ... OUTPUT_DIR,
AvroParquetWriters.for_generic_record(schema)).build()
+ >>> # A map to indicate its Avro type info is necessary for
serialization
+ >>> ds.map(lambda e: e,
output_type=GenericRecordAvroTypeInfo(schema)) \\
+ ... .sink_to(sink)
+
+ :param schema: The avro schema.
+ :return: The BulkWriterFactory to write generic records into parquet
files.
+ """
+ jvm = get_gateway().jvm
+ JAvroParquetWriters =
jvm.org.apache.flink.formats.parquet.avro.AvroParquetWriters
+ return
BulkWriterFactory(JAvroParquetWriters.forGenericRecord(schema._j_schema))
+
+
class ParquetColumnarRowInputFormat(BulkFormat):
"""
- A ParquetVectorizedInputFormat to provide :class:`RowData` iterator. Using
ColumnarRowData to
+ A ParquetVectorizedInputFormat to provide RowData iterator. Using
ColumnarRowData to
provide a row view of column batch. Only **primitive** types are supported
for a column,
composite types such as array, map are not supported.
@@ -81,10 +130,10 @@ class ParquetColumnarRowInputFormat(BulkFormat):
... DataTypes.FIELD('b', DataTypes.STRING()),
... ])
>>> source =
FileSource.for_bulk_file_format(ParquetColumnarRowInputFormat(
- ... hadoop_config=Configuration(),
... row_type=row_type,
- ... batch_size=500,
- ... is_utc_timestamp=True,
+ ... hadoop_config=Configuration(),
+ ... batch_size=2048,
+ ... is_utc_timestamp=False,
... is_case_sensitive=True,
... ), PARQUET_FILE_PATH).build()
>>> ds = env.from_source(source, WatermarkStrategy.no_watermarks(),
"parquet-source")
@@ -92,66 +141,72 @@ class ParquetColumnarRowInputFormat(BulkFormat):
.. versionadded:: 1.16.0
"""
- def __init__(self, hadoop_config: Configuration, row_type: RowType,
batch_size: int,
- is_utc_timestamp: bool, is_case_sensitive: bool):
+ def __init__(self,
+ row_type: RowType,
+ hadoop_config: Optional[Configuration] = None,
+ batch_size: int = 2048,
+ is_utc_timestamp: bool = False,
+ is_case_sensitive: bool = True):
+ if not hadoop_config:
+ hadoop_config = Configuration()
+
jvm = get_gateway().jvm
j_row_type = _to_java_data_type(row_type).getLogicalType()
produced_type_info = jvm.org.apache.flink.table.runtime.typeutils. \
InternalTypeInfo.of(j_row_type)
j_parquet_columnar_format = jvm.org.apache.flink.formats.parquet. \
-
ParquetColumnarRowInputFormat(self._create_hadoop_configuration(hadoop_config),
+
ParquetColumnarRowInputFormat(create_hadoop_configuration(hadoop_config),
j_row_type, produced_type_info,
batch_size,
is_utc_timestamp, is_case_sensitive)
super().__init__(j_parquet_columnar_format)
- @staticmethod
- def _create_hadoop_configuration(config: Configuration):
- jvm = get_gateway().jvm
- hadoop_config = jvm.org.apache.hadoop.conf.Configuration()
- for k, v in config.to_dict().items():
- hadoop_config.set(k, v)
- return hadoop_config
-
-class AvroParquetWriters(object):
+class ParquetBulkWriter(object):
"""
- Convenience builder to create ParquetWriterFactory instances for Avro
types. Only GenericRecord
- is supported in PyFlink.
+ Convenient builder to create a :class:`BulkWriterFactory` that writes Rows
with a defined
+ RowType into Parquet files in a batch fashion.
.. versionadded:: 1.16.0
"""
@staticmethod
- def for_generic_record(schema: 'AvroSchema') -> 'BulkWriterFactory':
+ def for_row_type(row_type: RowType, hadoop_config: Optional[Configuration]
= None,
+ utc_timestamp: bool = False) -> 'BulkWriterFactory':
"""
- Creates a ParquetWriterFactory that accepts and writes Avro generic
types. The Parquet
- writers will use the given schema to build and write the columnar data.
-
- Note that to make this works in PyFlink, you need to declare the
output type of the
- predecessor before FileSink to be :class:`GenericRecordAvroTypeInfo`,
and the predecessor
- cannot be :meth:`StreamExecutionEnvironment.from_collection`, you can
add a pass-through map
- function before the sink, as the example shown below.
-
- The Python data records should match the Avro schema, and have the
same behavior with
- vanilla Python data structure, e.g. an object for Avro array should
behave like Python list,
- an object for Avro map should behave like Python dict.
+ Create a RowDataBulkWriterFactory that writes Rows records with a
defined RowType into
+ Parquet files in a batch fashion.
Example:
::
- >>> env = StreamExecutionEnvironment.get_execution_environment()
- >>> schema = AvroSchema(JSON_SCHEMA)
- >>> avro_type_info = GenericRecordAvroTypeInfo(schema)
- >>> ds = env.from_collection([{'array': [1, 2]}],
type_info=Types.PICKLED_BYTE_ARRAY())
+ >>> row_type = DataTypes.ROW([
+ ... DataTypes.FIELD('string', DataTypes.STRING()),
+ ... DataTypes.FIELD('int_array',
DataTypes.ARRAY(DataTypes.INT()))
+ ... ])
+ >>> row_type_info = Types.ROW_NAMED(
+ ... ['string', 'int_array'],
+ ... [Types.STRING(), Types.LIST(Types.INT())]
+ ... )
>>> sink = FileSink.for_bulk_format(
- ... OUTPUT_DIR,
AvroParquetWriters.for_generic_record(schema)).build()
- >>> # A map to indicate its Avro type info is necessary for
serialization
- >>> ds.map(lambda e: e,
output_type=GenericRecordAvroTypeInfo(schema)) \\
- ... .sink_to(sink)
+ ... OUTPUT_DIR, ParquetBulkWriter.for_row_type(
+ ... row_type,
+ ... hadoop_config=Configuration(),
+ ... utc_timestamp=True,
+ ... )
+ ... ).build()
+ >>> ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
- :param schema: The avro schema.
- :return: The BulkWriterFactory to write generic records into parquet
files.
+ Note that in the above example, an identity map to indicate its
RowTypeInfo is necessary
+ before ``sink_to`` when ``ds`` is a source stream producing
**RowData** records, because
+ RowDataBulkWriterFactory assumes the input record type is **Row** .
"""
+ if not hadoop_config:
+ hadoop_config = Configuration()
+
jvm = get_gateway().jvm
- JAvroParquetWriters =
jvm.org.apache.flink.formats.parquet.avro.AvroParquetWriters
- return
BulkWriterFactory(JAvroParquetWriters.forGenericRecord(schema._j_schema))
+ JParquetRowDataBuilder =
jvm.org.apache.flink.formats.parquet.row.ParquetRowDataBuilder
+ return
RowDataBulkWriterFactory(JParquetRowDataBuilder.createWriterFactory(
+ _to_java_data_type(row_type).getLogicalType(),
+ create_hadoop_configuration(hadoop_config),
+ utc_timestamp
+ ), row_type)
diff --git a/flink-python/pyflink/datastream/formats/tests/test_parquet.py
b/flink-python/pyflink/datastream/formats/tests/test_parquet.py
index b914d185e9f..16bf47edbfe 100644
--- a/flink-python/pyflink/datastream/formats/tests/test_parquet.py
+++ b/flink-python/pyflink/datastream/formats/tests/test_parquet.py
@@ -15,15 +15,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
+import calendar
+import datetime
import glob
import os
import tempfile
+import time
import unittest
-from typing import List
+from decimal import Decimal
+from typing import List, Optional, Tuple
-from pyflink.common import Configuration
+import pandas as pd
+import pytz
+from pyflink.common.time import Instant
+
+from pyflink.common import Configuration, Row
+from pyflink.common.typeinfo import RowTypeInfo, Types
from pyflink.common.watermark_strategy import WatermarkStrategy
-from pyflink.datastream import MapFunction
from pyflink.datastream.connectors.file_system import FileSource, FileSink
from pyflink.datastream.formats.tests.test_avro import \
_create_basic_avro_schema_and_py_objects,
_check_basic_avro_schema_results, \
@@ -36,11 +44,12 @@ from pyflink.datastream.formats.tests.test_avro import \
_create_basic_avro_schema_and_records, _import_avro_classes
from pyflink.datastream.formats.avro import GenericRecordAvroTypeInfo,
AvroSchema
from pyflink.datastream.formats.parquet import AvroParquetReaders,
ParquetColumnarRowInputFormat, \
- AvroParquetWriters
+ AvroParquetWriters, ParquetBulkWriter
from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
+from pyflink.datastream.utils import create_hadoop_configuration
from pyflink.java_gateway import get_gateway
-from pyflink.table.types import RowType, DataTypes
-from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
+from pyflink.table.types import RowType, DataTypes, _to_java_data_type
+from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase,
to_java_data_structure
@unittest.skipIf(os.environ.get('HADOOP_CLASSPATH') is None,
@@ -106,7 +115,7 @@ class
FileSourceAvroParquetReadersTests(PyFlinkStreamingTestCase):
WatermarkStrategy.for_monotonous_timestamps(),
"parquet-source"
)
- ds.map(PassThroughMapFunction()).add_sink(self.test_sink)
+ ds.map(lambda e: e).add_sink(self.test_sink)
@staticmethod
def _create_parquet_avro_file(file_path: str, schema: AvroSchema, records:
list):
@@ -192,40 +201,234 @@ class
FileSourceParquetColumnarRowInputFormatTests(PyFlinkStreamingTestCase):
def setUp(self):
super().setUp()
self.test_sink = DataStreamTestSinkFunction()
- _import_avro_classes()
+ self.parquet_file_name = tempfile.mktemp(suffix='.parquet',
dir=self.tempdir)
- def test_parquet_columnar_basic(self):
- parquet_file_name = tempfile.mktemp(suffix='.parquet',
dir=self.tempdir)
- schema, records = _create_basic_avro_schema_and_records()
- FileSourceAvroParquetReadersTests._create_parquet_avro_file(
- parquet_file_name, schema, records)
- row_type = DataTypes.ROW([
- DataTypes.FIELD('null', DataTypes.STRING()), # DataTypes.NULL
cannot be serialized
- DataTypes.FIELD('boolean', DataTypes.BOOLEAN()),
- DataTypes.FIELD('int', DataTypes.INT()),
- DataTypes.FIELD('long', DataTypes.BIGINT()),
- DataTypes.FIELD('float', DataTypes.FLOAT()),
- DataTypes.FIELD('double', DataTypes.DOUBLE()),
- DataTypes.FIELD('string', DataTypes.STRING()),
- DataTypes.FIELD('unknown', DataTypes.STRING())
- ])
- self._build_parquet_columnar_job(row_type, parquet_file_name)
- self.env.execute('test_parquet_columnar_basic')
+ def test_parquet_columnar_basic_read(self):
+ os.environ['TZ'] = 'Asia/Shanghai'
+ time.tzset()
+ row_type, _, data = _create_parquet_basic_row_and_data()
+ _write_row_data_to_parquet_file(self.parquet_file_name, row_type, data)
+ self._build_parquet_columnar_job(row_type)
+ self.env.execute('test_parquet_columnar_basic_read')
results = self.test_sink.get_results(True, False)
- _check_basic_avro_schema_results(self, results)
- self.assertIsNone(results[0]['unknown'])
- self.assertIsNone(results[1]['unknown'])
+ _check_parquet_basic_results(self, results)
- def _build_parquet_columnar_job(self, row_type: RowType,
parquet_file_name: str):
+ def _build_parquet_columnar_job(self, row_type: RowType):
source = FileSource.for_bulk_file_format(
- ParquetColumnarRowInputFormat(Configuration(), row_type, 10, True,
True),
- parquet_file_name
+ ParquetColumnarRowInputFormat(row_type, Configuration(), 10, True,
False),
+ self.parquet_file_name
).build()
ds = self.env.from_source(source, WatermarkStrategy.no_watermarks(),
'parquet-source')
- ds.map(PassThroughMapFunction()).add_sink(self.test_sink)
+ ds.map(lambda e: e).add_sink(self.test_sink)
+
+
[email protected](os.environ.get('HADOOP_CLASSPATH') is None,
+ 'Some Hadoop lib is needed for Parquet RowData format tests')
+class FileSinkParquetBulkWriterTests(PyFlinkStreamingTestCase):
+
+ def setUp(self):
+ super().setUp()
+ # NOTE: parallelism == 1 is required to keep the order of results
+ self.env.set_parallelism(1)
+ self.parquet_dir_name = tempfile.mkdtemp(dir=self.tempdir)
+
+ def test_parquet_row_data_basic_write(self):
+ os.environ['TZ'] = 'Asia/Shanghai'
+ time.tzset()
+ row_type, row_type_info, data = _create_parquet_basic_row_and_data()
+ self._build_parquet_job(row_type, row_type_info, data)
+ self.env.execute('test_parquet_row_data_basic_write')
+ results = self._read_parquet_file()
+ _check_parquet_basic_results(self, results)
+
+ def test_parquet_row_data_array_write(self):
+ (
+ row_type,
+ row_type_info,
+ conversion_row_type_info,
+ data,
+ ) = _create_parquet_array_row_and_data()
+ self._build_parquet_job(row_type, row_type_info, data,
conversion_row_type_info)
+ self.env.execute('test_parquet_row_data_array_write')
+ results = self._read_parquet_file()
+ _check_parquet_array_results(self, results)
+
+ @unittest.skip('ParquetSchemaConverter in flink-parquet annotate map keys
as optional, but '
+ 'Arrow restricts them to be required')
+ def test_parquet_row_data_map_write(self):
+ row_type, row_type_info, data = _create_parquet_map_row_and_data()
+ self._build_parquet_job(row_type, row_type_info, data)
+ self.env.execute('test_parquet_row_data_map_write')
+ results = self._read_parquet_file()
+ _check_parquet_map_results(self, results)
+
+ def _build_parquet_job(
+ self,
+ row_type: RowType,
+ row_type_info: RowTypeInfo,
+ data: List[Row],
+ conversion_type_info: Optional[RowTypeInfo] = None,
+ ):
+ sink = FileSink.for_bulk_format(
+ self.parquet_dir_name, ParquetBulkWriter.for_row_type(row_type,
utc_timestamp=True)
+ ).build()
+ ds = self.env.from_collection(data, type_info=row_type_info)
+ if conversion_type_info:
+ ds = ds.map(lambda e: e, output_type=conversion_type_info)
+ ds.sink_to(sink)
+
+ def _read_parquet_file(self):
+ records = []
+ for file in glob.glob(os.path.join(os.path.join(self.parquet_dir_name,
'**/*'))):
+ df = pd.read_parquet(file)
+ for i in range(df.shape[0]):
+ records.append(df.loc[i])
+ return records
+
+
+def _write_row_data_to_parquet_file(path: str, row_type: RowType, rows:
List[Row]):
+ jvm = get_gateway().jvm
+ flink = jvm.org.apache.flink
+
+ j_output_stream =
flink.core.fs.local.LocalDataOutputStream(jvm.java.io.File(path))
+ j_bulk_writer =
flink.formats.parquet.row.ParquetRowDataBuilder.createWriterFactory(
+ _to_java_data_type(row_type).getLogicalType(),
+ create_hadoop_configuration(Configuration()),
+ True,
+ ).create(j_output_stream)
+ row_row_converter = flink.table.data.conversion.RowRowConverter.create(
+ _to_java_data_type(row_type)
+ )
+ row_row_converter.open(row_row_converter.getClass().getClassLoader())
+ for row in rows:
+
j_bulk_writer.addElement(row_row_converter.toInternal(to_java_data_structure(row)))
+ j_bulk_writer.finish()
+
+
+def _create_parquet_basic_row_and_data() -> Tuple[RowType, RowTypeInfo,
List[Row]]:
+ row_type = DataTypes.ROW([
+ DataTypes.FIELD('char', DataTypes.CHAR(10)),
+ DataTypes.FIELD('varchar', DataTypes.VARCHAR(10)),
+ DataTypes.FIELD('binary', DataTypes.BINARY(10)),
+ DataTypes.FIELD('varbinary', DataTypes.VARBINARY(10)),
+ DataTypes.FIELD('boolean', DataTypes.BOOLEAN()),
+ DataTypes.FIELD('decimal', DataTypes.DECIMAL(2, 0)),
+ DataTypes.FIELD('int', DataTypes.INT()),
+ DataTypes.FIELD('bigint', DataTypes.BIGINT()),
+ DataTypes.FIELD('double', DataTypes.DOUBLE()),
+ DataTypes.FIELD('date', DataTypes.DATE().bridged_to('java.sql.Date')),
+ DataTypes.FIELD('time', DataTypes.TIME().bridged_to('java.sql.Time')),
+ DataTypes.FIELD('timestamp',
DataTypes.TIMESTAMP(3).bridged_to('java.sql.Timestamp')),
+ DataTypes.FIELD('timestamp_ltz', DataTypes.TIMESTAMP_LTZ(3)),
+ ])
+ row_type_info = Types.ROW_NAMED(
+ ['char', 'varchar', 'binary', 'varbinary', 'boolean', 'decimal',
'int', 'bigint', 'double',
+ 'date', 'time', 'timestamp', 'timestamp_ltz'],
+ [Types.STRING(), Types.STRING(), Types.PRIMITIVE_ARRAY(Types.BYTE()),
+ Types.PRIMITIVE_ARRAY(Types.BYTE()), Types.BOOLEAN(),
Types.BIG_DEC(), Types.INT(),
+ Types.LONG(), Types.DOUBLE(), Types.SQL_DATE(), Types.SQL_TIME(),
Types.SQL_TIMESTAMP(),
+ Types.INSTANT()]
+ )
+ datetime_ltz = datetime.datetime(1970, 2, 3, 4, 5, 6, 700000,
tzinfo=pytz.timezone('UTC'))
+ timestamp_ltz = Instant.of_epoch_milli(
+ (
+ calendar.timegm(datetime_ltz.utctimetuple()) +
+ calendar.timegm(time.localtime(0))
+ ) * 1000 + datetime_ltz.microsecond // 1000
+ )
+ data = [Row(
+ char='char',
+ varchar='varchar',
+ binary=b'binary',
+ varbinary=b'varbinary',
+ boolean=True,
+ decimal=Decimal(1.5),
+ int=2147483647,
+ bigint=-9223372036854775808,
+ double=2e-308,
+ date=datetime.date(1970, 1, 1),
+ time=datetime.time(1, 1, 1),
+ timestamp=datetime.datetime(1970, 1, 2, 3, 4, 5, 600000),
+ timestamp_ltz=timestamp_ltz
+ )]
+ return row_type, row_type_info, data
+
+
+def _check_parquet_basic_results(test, results):
+ row = results[0]
+ test.assertEqual(row['char'], 'char')
+ test.assertEqual(row['varchar'], 'varchar')
+ test.assertEqual(row['binary'], b'binary')
+ test.assertEqual(row['varbinary'], b'varbinary')
+ test.assertEqual(row['boolean'], True)
+ test.assertAlmostEqual(row['decimal'], 2)
+ test.assertEqual(row['int'], 2147483647)
+ test.assertEqual(row['bigint'], -9223372036854775808)
+ test.assertAlmostEqual(row['double'], 2e-308, delta=1e-311)
+ test.assertEqual(row['date'], datetime.date(1970, 1, 1))
+ test.assertEqual(row['time'], datetime.time(1, 1, 1))
+ ts = row['timestamp']
+ if isinstance(ts, pd.Timestamp):
+ ts = ts.to_pydatetime()
+ test.assertEqual(ts, datetime.datetime(1970, 1, 2, 3, 4, 5, 600000))
+ ts_ltz = row['timestamp_ltz']
+ if isinstance(ts_ltz, pd.Timestamp):
+ ts_ltz =
pytz.timezone('Asia/Shanghai').localize(ts_ltz.to_pydatetime())
+ test.assertEqual(
+ ts_ltz,
+ pytz.timezone('Asia/Shanghai').localize(datetime.datetime(1970, 2, 3,
12, 5, 6, 700000))
+ )
+
+
+def _create_parquet_array_row_and_data() -> Tuple[RowType, RowTypeInfo,
RowTypeInfo, List[Row]]:
+ row_type = DataTypes.ROW([
+ DataTypes.FIELD('string_array', DataTypes.ARRAY(DataTypes.STRING())),
+ DataTypes.FIELD('int_array', DataTypes.ARRAY(DataTypes.INT())),
+ ])
+ row_type_info = Types.ROW_NAMED([
+ 'string_array',
+ 'int_array',
+ ], [
+ Types.LIST(Types.STRING()),
+ Types.LIST(Types.INT()),
+ ])
+ conversion_row_type_info = Types.ROW_NAMED([
+ 'string_array',
+ 'int_array',
+ ], [
+ Types.OBJECT_ARRAY(Types.STRING()),
+ Types.OBJECT_ARRAY(Types.INT()),
+ ])
+ data = [Row(
+ string_array=['a', 'b', 'c'],
+ int_array=[1, 2, 3],
+ )]
+ return row_type, row_type_info, conversion_row_type_info, data
+
+
+def _check_parquet_array_results(test, results):
+ row = results[0]
+ test.assertEqual(row['string_array'][0], 'a')
+ test.assertEqual(row['string_array'][1], 'b')
+ test.assertEqual(row['string_array'][2], 'c')
+ test.assertEqual(row['int_array'][0], 1)
+ test.assertEqual(row['int_array'][1], 2)
+ test.assertEqual(row['int_array'][2], 3)
+
+def _create_parquet_map_row_and_data() -> Tuple[RowType, RowTypeInfo,
List[Row]]:
+ row_type = DataTypes.ROW([
+ DataTypes.FIELD('map', DataTypes.MAP(DataTypes.INT(),
DataTypes.STRING())),
+ ])
+ row_type_info = Types.ROW_NAMED(['map'], [Types.MAP(Types.INT(),
Types.STRING())])
+ data = [Row(
+ map={0: 'a', 1: 'b', 2: 'c'}
+ )]
+ return row_type, row_type_info, data
-class PassThroughMapFunction(MapFunction):
- def map(self, value):
- return value
+def _check_parquet_map_results(test, results):
+ m = {k: v for k, v in results[0]['map']}
+ test.assertEqual(m[0], 'a')
+ test.assertEqual(m[1], 'b')
+ test.assertEqual(m[2], 'c')
diff --git a/flink-python/pyflink/datastream/utils.py
b/flink-python/pyflink/datastream/utils.py
index 73ec0faa1bf..f1bdb186177 100644
--- a/flink-python/pyflink/datastream/utils.py
+++ b/flink-python/pyflink/datastream/utils.py
@@ -20,7 +20,7 @@ import datetime
import pickle
from abc import abstractmethod
-from pyflink.common import Row, RowKind
+from pyflink.common import Row, RowKind, Configuration
from pyflink.common.typeinfo import (RowTypeInfo, TupleTypeInfo, Types,
BasicArrayTypeInfo,
PrimitiveArrayTypeInfo, MapTypeInfo,
ListTypeInfo,
ObjectArrayTypeInfo, ExternalTypeInfo,
TypeInformation)
@@ -43,6 +43,14 @@ class JavaObjectWrapper(object):
return self._j_object
+def create_hadoop_configuration(config: Configuration):
+ jvm = get_gateway().jvm
+ hadoop_config = jvm.org.apache.hadoop.conf.Configuration()
+ for k, v in config.to_dict().items():
+ hadoop_config.set(k, v)
+ return hadoop_config
+
+
def convert_to_python_obj(data, type_info):
if type_info == Types.PICKLED_BYTE_ARRAY():
return pickle.loads(data)
diff --git a/flink-python/pyflink/fn_execution/coders.py
b/flink-python/pyflink/fn_execution/coders.py
index de1c27777d3..1527adb4e39 100644
--- a/flink-python/pyflink/fn_execution/coders.py
+++ b/flink-python/pyflink/fn_execution/coders.py
@@ -725,13 +725,17 @@ def from_type_info_proto(type_info):
return RowCoder(
[from_type_info_proto(f.field_type) for f in
type_info.row_type_info.fields],
[f.field_name for f in type_info.row_type_info.fields])
- elif field_type_name == type_info_name.PRIMITIVE_ARRAY:
+ elif field_type_name in (
+ type_info_name.PRIMITIVE_ARRAY,
+ type_info_name.LIST,
+ ):
if type_info.collection_element_type.type_name ==
type_info_name.BYTE:
return BinaryCoder()
return
PrimitiveArrayCoder(from_type_info_proto(type_info.collection_element_type))
- elif field_type_name in (type_info_name.BASIC_ARRAY,
- type_info_name.OBJECT_ARRAY,
- type_info_name.LIST):
+ elif field_type_name in (
+ type_info_name.BASIC_ARRAY,
+ type_info_name.OBJECT_ARRAY,
+ ):
return
GenericArrayCoder(from_type_info_proto(type_info.collection_element_type))
elif field_type_name == type_info_name.TUPLE:
return TupleCoder([from_type_info_proto(field_type)
diff --git a/flink-python/pyflink/table/types.py
b/flink-python/pyflink/table/types.py
index add5b042f7d..3c3d2a33b02 100644
--- a/flink-python/pyflink/table/types.py
+++ b/flink-python/pyflink/table/types.py
@@ -1250,7 +1250,7 @@ class RowType(DataType):
for n, f, c in zip(self.names, self.fields,
self._need_conversion))
elif isinstance(obj, Row) and hasattr(obj, "_fields"):
return (obj.get_row_kind().value,) + tuple(
- f.to_sql_type(obj.get(n)) if c else obj.get(n)
+ f.to_sql_type(obj[n]) if c else obj[n]
for n, f, c in zip(self.names, self.fields,
self._need_conversion))
elif isinstance(obj, Row):
return (obj.get_row_kind().value, ) + tuple(
@@ -1931,6 +1931,12 @@ def _to_java_data_type(data_type: DataType):
else:
j_data_type = j_data_type.notNull()
+ if data_type._conversion_cls:
+ j_data_type = j_data_type.bridgedTo(
+
gateway.jvm.org.apache.flink.api.python.shaded.py4j.reflection.ReflectionUtil
+ .classForName(data_type._conversion_cls)
+ )
+
return j_data_type
@@ -2581,7 +2587,8 @@ class DataTypes(object):
This is a shortcut for
``DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(precision, nullable)``.
:param precision: int, the number of digits of fractional seconds.
- It must have a value between 0 and 9 (both
inclusive). (default: 6)
+ It must have a value between 0 and 9 (both
inclusive). (default: 6, only
+ supports 3 when bridged to DataStream)
:param nullable: boolean, whether the type can be null (None) or not.
.. seealso::
:func:`~DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(precision, nullable)`
diff --git a/flink-python/pyflink/testing/test_case_utils.py
b/flink-python/pyflink/testing/test_case_utils.py
index 2ddc2b73707..5ea9cf80987 100644
--- a/flink-python/pyflink/testing/test_case_utils.py
+++ b/flink-python/pyflink/testing/test_case_utils.py
@@ -283,6 +283,27 @@ class TestEnv(object):
return result
+DATE_EPOCH_ORDINAL = datetime.datetime(1970, 1, 1).toordinal()
+TIME_EPOCH_ORDINAL = calendar.timegm(time.localtime(0)) * 10 ** 3
+
+
+def _date_to_millis(d: datetime.date):
+ return (d.toordinal() - DATE_EPOCH_ORDINAL) * 86400 * 1000
+
+
+def _time_to_millis(t: datetime.time):
+ if t.tzinfo is not None:
+ offset = t.utcoffset()
+ offset = offset if offset else datetime.timedelta()
+ offset_millis = \
+ (offset.days * 86400 + offset.seconds) * 10 ** 3 +
offset.microseconds // 1000
+ else:
+ offset_millis = TIME_EPOCH_ORDINAL
+ minutes = t.hour * 60 + t.minute
+ seconds = minutes * 60 + t.second
+ return seconds * 10 ** 3 + t.microsecond // 1000 - offset_millis
+
+
def to_java_data_structure(value):
jvm = get_gateway().jvm
if isinstance(value, (int, float, str, bytes)):
@@ -290,12 +311,10 @@ def to_java_data_structure(value):
elif isinstance(value, Decimal):
return jvm.java.math.BigDecimal.valueOf(float(value))
elif isinstance(value, datetime.datetime):
- local_date_time = jvm.java.time.LocalDateTime.of(
- value.year, value.month, value.day, value.hour, value.minute,
value.second,
- value.microsecond * 1000
- )
if value.tzinfo is None:
- return local_date_time
+ return jvm.java.sql.Timestamp(
+ _date_to_millis(value.date()) + _time_to_millis(value.time())
+ )
return jvm.java.time.Instant.ofEpochMilli(
(
calendar.timegm(value.utctimetuple()) +
@@ -304,12 +323,11 @@ def to_java_data_structure(value):
value.microsecond // 1000
)
elif isinstance(value, datetime.date):
- return jvm.java.time.LocalDate.of(value.year, value.month, value.day)
+ return jvm.java.sql.Date(_date_to_millis(value))
elif isinstance(value, datetime.time):
- return jvm.java.time.LocalTime.of(value.hour, value.minute,
value.second,
- value.microsecond * 1000)
+ return jvm.java.sql.Time(_time_to_millis(value))
elif isinstance(value, Time):
- return jvm.java.time.LocalTime.of()
+ return jvm.java.sql.Time(value.to_milliseconds())
elif isinstance(value, Instant):
return jvm.java.time.Instant.ofEpochMilli(value.to_epoch_milli())
elif isinstance(value, (list, tuple)):
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
index 2cde298ce55..b717d64b606 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java
@@ -418,7 +418,7 @@ public class PythonTypeUtils {
InternalTypeInfo<?> internalTypeInfo, ClassLoader
userClassLoader) {
ArrayType arrayType = (ArrayType) internalTypeInfo.toLogicalType();
return FlinkFnApi.TypeInfo.newBuilder()
- .setTypeName(FlinkFnApi.TypeInfo.TypeName.LIST)
+ .setTypeName(FlinkFnApi.TypeInfo.TypeName.OBJECT_ARRAY)
.setCollectionElementType(
toTypeInfoProto(
InternalTypeInfo.of(arrayType.getElementType()),