This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new cf3beff586a [FLINK-28876][python][format/orc] Support Orc format
cf3beff586a is described below
commit cf3beff586a088c20833d8a8fc72bbc37d87a2b1
Author: Juntao Hu <[email protected]>
AuthorDate: Mon Aug 8 23:23:44 2022 +0800
[FLINK-28876][python][format/orc] Support Orc format
This closes #20505.
---
.../docs/connectors/datastream/filesystem.md | 30 ++++
.../docs/connectors/datastream/filesystem.md | 30 ++++
flink-python/pom.xml | 8 ++
flink-python/pyflink/datastream/__init__.py | 3 +
flink-python/pyflink/datastream/formats/orc.py | 100 +++++++++++++
flink-python/pyflink/datastream/formats/parquet.py | 4 +-
.../pyflink/datastream/formats/tests/test_orc.py | 156 +++++++++++++++++++++
flink-python/pyflink/datastream/utils.py | 8 ++
8 files changed, 337 insertions(+), 2 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/filesystem.md
b/docs/content.zh/docs/connectors/datastream/filesystem.md
index f5468862a53..d48e84f52d8 100644
--- a/docs/content.zh/docs/connectors/datastream/filesystem.md
+++ b/docs/content.zh/docs/connectors/datastream/filesystem.md
@@ -811,6 +811,36 @@ class PersonVectorizer(schema: String) extends
Vectorizer[Person](schema) {
{{< /tab >}}
{{< /tabs >}}
+PyFlink 用户可以使用 `OrcBulkWriters.for_row_type` 来创建将 `Row` 数据写入 Orc 文件的
`BulkWriterFactory` 。
+注意如果 sink 的前置算子的输出类型为 `RowData` ,例如 CSV source ,则需要先转换为 `Row` 类型。
+
+{{< py_download_link "orc" >}}
+
+```python
+row_type = DataTypes.ROW([
+ DataTypes.FIELD('name', DataTypes.STRING()),
+ DataTypes.FIELD('age', DataTypes.INT()),
+])
+row_type_info = Types.ROW_NAMED(
+ ['name', 'age'],
+ [Types.STRING(), Types.INT()]
+)
+
+sink = FileSink.for_bulk_format(
+ OUTPUT_DIR,
+ OrcBulkWriters.for_row_type(
+ row_type=row_type,
+ writer_properties=Configuration(),
+ hadoop_config=Configuration(),
+ )
+).build()
+
+# 如果 ds 是产生 RowData 的数据源,可以使用一个 map 函数来指定其对应的 Row 类型。
+ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
+# 否则
+ds.sink_to(sink)
+```
+
<a name="hadoop-sequencefile-format"></a>
##### Hadoop SequenceFile Format
diff --git a/docs/content/docs/connectors/datastream/filesystem.md
b/docs/content/docs/connectors/datastream/filesystem.md
index f5a4517aa95..41eb054015e 100644
--- a/docs/content/docs/connectors/datastream/filesystem.md
+++ b/docs/content/docs/connectors/datastream/filesystem.md
@@ -816,6 +816,36 @@ class PersonVectorizer(schema: String) extends
Vectorizer[Person](schema) {
{{< /tab >}}
{{< /tabs >}}
+For PyFlink users, `OrcBulkWriters.for_row_type` could be used to create
`BulkWriterFactory` to write `Row` records to files in Orc format.
+It should be noted that if the preceding operator of sink is an operator
producing `RowData` records, e.g. CSV source, it needs to be converted to `Row`
records before writing to sink.
+
+{{< py_download_link "orc" >}}
+
+```python
+row_type = DataTypes.ROW([
+ DataTypes.FIELD('name', DataTypes.STRING()),
+ DataTypes.FIELD('age', DataTypes.INT()),
+])
+row_type_info = Types.ROW_NAMED(
+ ['name', 'age'],
+ [Types.STRING(), Types.INT()]
+)
+
+sink = FileSink.for_bulk_format(
+ OUTPUT_DIR,
+ OrcBulkWriters.for_row_type(
+ row_type=row_type,
+ writer_properties=Configuration(),
+ hadoop_config=Configuration(),
+ )
+).build()
+
+# If ds is a source stream producing RowData records, a map could be added to
help converting RowData records into Row records.
+ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
+# Else
+ds.sink_to(sink)
+```
+
##### Hadoop SequenceFile format
To use the `SequenceFile` bulk encoder in your application you need to add the
following dependency:
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index f1a61a9eff6..5922e98a7e4 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -247,6 +247,14 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <!-- Indirectly accessed in pyflink_gateway_server -->
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-sql-orc</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<!-- Indirectly accessed in pyflink_gateway_server -->
<groupId>org.apache.flink</groupId>
diff --git a/flink-python/pyflink/datastream/__init__.py
b/flink-python/pyflink/datastream/__init__.py
index 6684a7120c0..436e8677666 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -236,6 +236,9 @@ Classes to define formats used together with source & sink:
- :class:`formats.parquet.AvroParquetWriters`:
Convenience builder to create ParquetWriterFactory instances for Avro
types. Only
GenericRecord is supported in PyFlink.
+ - :class:`formats.orc.OrcBulkWriters`:
+ Convenient builder to create a :class:`BulkWriterFactory` that writes
Row records with a
+ defined :class:`RowType` into Orc files.
Other important classes:
diff --git a/flink-python/pyflink/datastream/formats/orc.py
b/flink-python/pyflink/datastream/formats/orc.py
new file mode 100644
index 00000000000..a34093d8fa7
--- /dev/null
+++ b/flink-python/pyflink/datastream/formats/orc.py
@@ -0,0 +1,100 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Optional
+
+from pyflink.common import Configuration
+from pyflink.datastream.connectors.file_system import BulkWriterFactory,
RowDataBulkWriterFactory
+from pyflink.datastream.utils import create_hadoop_configuration,
create_java_properties
+from pyflink.java_gateway import get_gateway
+from pyflink.table.types import _to_java_data_type, RowType
+from pyflink.util.java_utils import to_jarray
+
+__all__ = [
+ 'OrcBulkWriters'
+]
+
+
+class OrcBulkWriters(object):
+ """
+ Convenient builder to create a
:class:`~connectors.file_system.BulkWriterFactory` that writes
+ Row records with a defined RowType into Orc files in a batch fashion.
+
+ .. versionadded:: 1.16.0
+ """
+
+ @staticmethod
+ def for_row_type(row_type: RowType,
+ writer_properties: Optional[Configuration] = None,
+ hadoop_config: Optional[Configuration] = None) \
+ -> BulkWriterFactory:
+ """
+ Create a RowDataBulkWriterFactory that writes Row records with a
defined RowType into Orc
+ files in a batch fashion.
+
+ Example:
+ ::
+
+ >>> row_type = DataTypes.ROW([
+ ... DataTypes.FIELD('string', DataTypes.STRING()),
+ ... DataTypes.FIELD('int_array',
DataTypes.ARRAY(DataTypes.INT()))
+ ... ])
+ >>> row_type_info = Types.ROW_NAMED(
+ ... ['string', 'int_array'],
+ ... [Types.STRING(), Types.LIST(Types.INT())]
+ ... )
+ >>> sink = FileSink.for_bulk_format(
+ ... OUTPUT_DIR, OrcBulkWriters.for_row_type(
+ ... row_type=row_type,
+ ... writer_properties=Configuration(),
+ ... hadoop_config=Configuration(),
+ ... )
+ ... ).build()
+ >>> ds.map(lambda e: e, output_type=row_type_info).sink_to(sink)
+
+ Note that in the above example, an identity map to indicate its
RowTypeInfo is necessary
+ before ``sink_to`` when ``ds`` is a source stream producing
**RowData** records,
+ because RowDataBulkWriterFactory assumes the input record type is Row.
+ """
+ if not isinstance(row_type, RowType):
+ raise TypeError('row_type must be an instance of RowType')
+
+ j_data_type = _to_java_data_type(row_type)
+ jvm = get_gateway().jvm
+ j_row_type = j_data_type.getLogicalType()
+ orc_types = to_jarray(
+ jvm.org.apache.flink.table.types.logical.LogicalType,
+ [i for i in j_row_type.getChildren()]
+ )
+ type_description = jvm.org.apache.flink.orc \
+ .OrcSplitReaderUtil.logicalTypeToOrcType(j_row_type)
+ if writer_properties is None:
+ writer_properties = Configuration()
+ if hadoop_config is None:
+ hadoop_config = Configuration()
+
+ return RowDataBulkWriterFactory(
+ jvm.org.apache.flink.orc.writer.OrcBulkWriterFactory(
+ jvm.org.apache.flink.orc.vector.RowDataVectorizer(
+ type_description.toString(),
+ orc_types
+ ),
+ create_java_properties(writer_properties),
+ create_hadoop_configuration(hadoop_config)
+ ),
+ row_type
+ )
diff --git a/flink-python/pyflink/datastream/formats/parquet.py
b/flink-python/pyflink/datastream/formats/parquet.py
index 687bc35aa4d..5d1164b82b8 100644
--- a/flink-python/pyflink/datastream/formats/parquet.py
+++ b/flink-python/pyflink/datastream/formats/parquet.py
@@ -74,8 +74,8 @@ class AvroParquetReaders(object):
class AvroParquetWriters(object):
"""
- Convenient builder to create ParquetWriterFactory instances for Avro
types. Only GenericRecord
- is supported in PyFlink.
+ Convenient builder to create Parquet BulkWriterFactory instances for Avro
types.
+ Only GenericRecord is supported at present.
.. versionadded:: 1.16.0
"""
diff --git a/flink-python/pyflink/datastream/formats/tests/test_orc.py
b/flink-python/pyflink/datastream/formats/tests/test_orc.py
new file mode 100644
index 00000000000..77ce3b085de
--- /dev/null
+++ b/flink-python/pyflink/datastream/formats/tests/test_orc.py
@@ -0,0 +1,156 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import glob
+import os
+import tempfile
+import unittest
+from datetime import date, datetime
+from decimal import Decimal
+from typing import List, Optional, Tuple
+
+import pandas as pd
+
+from pyflink.common import Row
+from pyflink.common.typeinfo import RowTypeInfo, Types
+from pyflink.datastream import DataStream
+from pyflink.datastream.connectors.file_system import FileSink
+from pyflink.datastream.formats.orc import OrcBulkWriters
+from pyflink.datastream.formats.tests.test_parquet import
_create_parquet_array_row_and_data, \
+ _check_parquet_array_results, _create_parquet_map_row_and_data,
_check_parquet_map_results
+from pyflink.java_gateway import get_gateway
+from pyflink.table.types import RowType, DataTypes
+from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase,
to_java_data_structure
+
+
[email protected](os.environ.get('HADOOP_CLASSPATH') is None,
+ 'Some Hadoop lib is needed for Orc format tests')
+class FileSinkOrcBulkWritersTests(PyFlinkStreamingTestCase):
+
+ def setUp(self):
+ super().setUp()
+ self.env.set_parallelism(1)
+ self.orc_dir_name = tempfile.mkdtemp(dir=self.tempdir)
+
+ def test_orc_basic_write(self):
+ row_type, row_type_info, data = _create_orc_basic_row_and_data()
+ self._build_orc_job(row_type, row_type_info, data)
+ self.env.execute('test_orc_basic_write')
+ results = self._read_orc_file()
+ _check_orc_basic_results(self, results)
+
+ def test_orc_array_write(self):
+ (
+ row_type,
+ row_type_info,
+ conversion_row_type_info,
+ data,
+ ) = _create_parquet_array_row_and_data()
+ self._build_orc_job(row_type, row_type_info, data,
conversion_row_type_info)
+ self.env.execute()
+ results = self._read_orc_file()
+ _check_parquet_array_results(self, results)
+
+ def test_orc_map_write(self):
+ row_type, row_type_info, data = _create_parquet_map_row_and_data()
+ self._build_orc_job(row_type, row_type_info, data)
+ self.env.execute()
+ results = self._read_orc_file()
+ _check_parquet_map_results(self, results)
+
+ def _build_orc_job(
+ self,
+ row_type: RowType,
+ row_type_info: RowTypeInfo,
+ data: List[Row],
+ conversion_type_info: Optional[RowTypeInfo] = None,
+ ):
+ jvm = get_gateway().jvm
+ sink = FileSink.for_bulk_format(
+ self.orc_dir_name, OrcBulkWriters.for_row_type(row_type)
+ ).build()
+ j_list = jvm.java.util.ArrayList()
+ for d in data:
+ j_list.add(to_java_data_structure(d))
+ ds =
DataStream(self.env._j_stream_execution_environment.fromCollection(
+ j_list,
+ row_type_info.get_java_type_info()
+ ))
+ if conversion_type_info:
+ ds = ds.map(lambda e: e, output_type=conversion_type_info)
+ ds.sink_to(sink)
+
+ def _read_orc_file(self):
+ records = []
+ for file in glob.glob(os.path.join(os.path.join(self.orc_dir_name,
'**/*'))):
+ df = pd.read_orc(file)
+ for i in range(df.shape[0]):
+ records.append(df.loc[i])
+ return records
+
+
+def _create_orc_basic_row_and_data() -> Tuple[RowType, RowTypeInfo, List[Row]]:
+ jvm = get_gateway().jvm
+ row_type = DataTypes.ROW([
+ DataTypes.FIELD('char', DataTypes.CHAR(10)),
+ DataTypes.FIELD('varchar', DataTypes.VARCHAR(10)),
+ DataTypes.FIELD('bytes', DataTypes.BYTES()),
+ DataTypes.FIELD('boolean', DataTypes.BOOLEAN()),
+ DataTypes.FIELD('decimal', DataTypes.DECIMAL(2, 0)),
+ DataTypes.FIELD('int', DataTypes.INT()),
+ DataTypes.FIELD('bigint', DataTypes.BIGINT()),
+ DataTypes.FIELD('double', DataTypes.DOUBLE()),
+ DataTypes.FIELD('date', DataTypes.DATE()),
+ DataTypes.FIELD('timestamp', DataTypes.TIMESTAMP(3)),
+ ])
+ row_type_info = Types.ROW_NAMED(
+ ['char', 'varchar', 'bytes', 'boolean', 'decimal', 'int', 'bigint',
'double',
+ 'date', 'timestamp'],
+ [Types.STRING(), Types.STRING(), Types.PRIMITIVE_ARRAY(Types.BYTE()),
Types.BOOLEAN(),
+ Types.BIG_DEC(), Types.INT(), Types.LONG(), Types.DOUBLE(),
+ Types.JAVA(jvm.java.time.LocalTime),
Types.JAVA(jvm.java.time.LocalDateTime)]
+ )
+ data = [Row(
+ char='char',
+ varchar='varchar',
+ bytes=b'varbinary',
+ boolean=True,
+ decimal=Decimal(1.5),
+ int=2147483647,
+ bigint=-9223372036854775808,
+ double=2e-308,
+ date=date(1970, 1, 1),
+ timestamp=datetime(1970, 1, 2, 3, 4, 5, 600000),
+ )]
+ return row_type, row_type_info, data
+
+
+def _check_orc_basic_results(test, results):
+ row = results[0]
+ test.assertEqual(row['char'], b'char ')
+ test.assertEqual(row['varchar'], 'varchar')
+ test.assertEqual(row['bytes'], b'varbinary')
+ test.assertEqual(row['boolean'], True)
+ test.assertAlmostEqual(row['decimal'], 2)
+ test.assertEqual(row['int'], 2147483647)
+ test.assertEqual(row['bigint'], -9223372036854775808)
+ test.assertAlmostEqual(row['double'], 2e-308, delta=1e-311)
+ test.assertEqual(row['date'], date(1970, 1, 1))
+ test.assertEqual(
+ row['timestamp'].to_pydatetime(),
+ datetime(1970, 1, 2, 3, 4, 5, 600000),
+ )
diff --git a/flink-python/pyflink/datastream/utils.py
b/flink-python/pyflink/datastream/utils.py
index f1bdb186177..efbdd1b5da1 100644
--- a/flink-python/pyflink/datastream/utils.py
+++ b/flink-python/pyflink/datastream/utils.py
@@ -51,6 +51,14 @@ def create_hadoop_configuration(config: Configuration):
return hadoop_config
+def create_java_properties(config: Configuration):
+ jvm = get_gateway().jvm
+ properties = jvm.java.util.Properties()
+ for k, v in config.to_dict().items():
+ properties.put(k, v)
+ return properties
+
+
def convert_to_python_obj(data, type_info):
if type_info == Types.PICKLED_BYTE_ARRAY():
return pickle.loads(data)