hequn8128 commented on a change in pull request #11832:
URL: https://github.com/apache/flink/pull/11832#discussion_r417135937
##########
File path: flink-python/pyflink/table/table_environment.py
##########
@@ -1119,6 +1119,90 @@ def _from_elements(self, elements, schema):
finally:
os.unlink(temp_file.name)
+ def from_pandas(self, pdf,
+ schema: Union[RowType, List[str], Tuple[str],
List[DataType],
+ Tuple[DataType]] = None,
+ splits_num: int = 1) -> Table:
+ """
+ Creates a table from a pandas DataFrame.
+
+ Example:
+ ::
+
+ # use the second parameter to specify custom field names
Review comment:
Move this comment after the creation of DataFrame.
##########
File path: flink-python/pyflink/table/table_environment.py
##########
@@ -1119,6 +1119,90 @@ def _from_elements(self, elements, schema):
finally:
os.unlink(temp_file.name)
+ def from_pandas(self, pdf,
+ schema: Union[RowType, List[str], Tuple[str],
List[DataType],
+ Tuple[DataType]] = None,
+ splits_num: int = 1) -> Table:
+ """
+ Creates a table from a pandas DataFrame.
+
+ Example:
+ ::
+
+ # use the second parameter to specify custom field names
+ >>> pdf = pd.DataFrame(np.random.rand(1000, 2))
+ >>> table_env.from_pandas(pdf, ["a", "b"])
+ # use the second parameter to specify custom field types
+ >>> table_env.from_pandas(pdf, [DataTypes.DOUBLE(),
DataTypes.DOUBLE()]))
+ # use the second parameter to specify custom table schema
+ >>> table_env.from_pandas(pdf,
+ ... DataTypes.ROW([DataTypes.FIELD("a",
DataTypes.DOUBLE()),
+ ... DataTypes.FIELD("b",
DataTypes.DOUBLE())]))
+
+ :param pdf: The pandas DataFrame.
+ :param schema: The schema of the converted table.
+ :type schema: RowType or list[str] or list[DataType]
Review comment:
duplicate type hint.
##########
File path: flink-python/pyflink/table/table_environment.py
##########
@@ -1119,6 +1119,90 @@ def _from_elements(self, elements, schema):
finally:
os.unlink(temp_file.name)
+ def from_pandas(self, pdf,
+ schema: Union[RowType, List[str], Tuple[str],
List[DataType],
+ Tuple[DataType]] = None,
+ splits_num: int = 1) -> Table:
+ """
+ Creates a table from a pandas DataFrame.
+
+ Example:
+ ::
+
+ # use the second parameter to specify custom field names
+ >>> pdf = pd.DataFrame(np.random.rand(1000, 2))
+ >>> table_env.from_pandas(pdf, ["a", "b"])
+ # use the second parameter to specify custom field types
+ >>> table_env.from_pandas(pdf, [DataTypes.DOUBLE(),
DataTypes.DOUBLE()]))
+ # use the second parameter to specify custom table schema
+ >>> table_env.from_pandas(pdf,
+ ... DataTypes.ROW([DataTypes.FIELD("a",
DataTypes.DOUBLE()),
+ ... DataTypes.FIELD("b",
DataTypes.DOUBLE())]))
+
+ :param pdf: The pandas DataFrame.
+ :param schema: The schema of the converted table.
+ :type schema: RowType or list[str] or list[DataType]
+ :param splits_num: The number of splits the given Pandas DataFrame
will be split into. It
+ determines the number of parallel source tasks.
+ If not specified, the default parallelism will be
used.
+ :type splits_num: int
Review comment:
duplicate type hint.
##########
File path: flink-python/pyflink/table/table_environment.py
##########
@@ -1119,6 +1119,90 @@ def _from_elements(self, elements, schema):
finally:
os.unlink(temp_file.name)
+ def from_pandas(self, pdf,
+ schema: Union[RowType, List[str], Tuple[str],
List[DataType],
+ Tuple[DataType]] = None,
+ splits_num: int = 1) -> Table:
+ """
+ Creates a table from a pandas DataFrame.
+
+ Example:
+ ::
+
+ # use the second parameter to specify custom field names
+ >>> pdf = pd.DataFrame(np.random.rand(1000, 2))
+ >>> table_env.from_pandas(pdf, ["a", "b"])
+ # use the second parameter to specify custom field types
+ >>> table_env.from_pandas(pdf, [DataTypes.DOUBLE(),
DataTypes.DOUBLE()]))
+ # use the second parameter to specify custom table schema
+ >>> table_env.from_pandas(pdf,
+ ... DataTypes.ROW([DataTypes.FIELD("a",
DataTypes.DOUBLE()),
+ ... DataTypes.FIELD("b",
DataTypes.DOUBLE())]))
+
+ :param pdf: The pandas DataFrame.
+ :param schema: The schema of the converted table.
+ :type schema: RowType or list[str] or list[DataType]
+ :param splits_num: The number of splits the given Pandas DataFrame
will be split into. It
+ determines the number of parallel source tasks.
+ If not specified, the default parallelism will be
used.
+ :type splits_num: int
+ :return: The result table.
+ :rtype: Table
Review comment:
duplicate type hint.
##########
File path: docs/dev/table/python/index.md
##########
@@ -32,5 +32,6 @@ Apache Flink has provided Python Table API support since
1.9.0.
- [Installation]({{ site.baseurl }}/dev/table/python/installation.html):
Introduction of how to set up the Python Table API execution environment.
- [User-defined Functions]({{ site.baseurl
}}/dev/table/python/python_udfs.html): Explanation of how to define Python
user-defined functions.
- [Vectorized User-defined Functions]({{ site.baseurl
}}/dev/table/python/vectorized_python_udfs.html): Explanation of how to define
vectorized Python user-defined functions.
+- [Conversion between PyFlink Table and Pandas DataFrame]({{ site.baseurl
}}/dev/table/python/conversion_of_pandas.html): Explanation of how to convert
between PyFlink Table and Pandas DataFrame.
Review comment:
Conversions?
##########
File path: docs/dev/table/python/index.zh.md
##########
@@ -32,5 +32,6 @@ Apache Flink has provided Python Table API support since
1.9.0.
- [环境安装]({{ site.baseurl }}/zh/dev/table/python/installation.html):
Introduction of how to set up the Python Table API execution environment.
- [自定义函数]({{ site.baseurl }}/zh/dev/table/python/python_udfs.html):
Explanation of how to define Python user-defined functions.
- [自定义向量化函数]({{ site.baseurl
}}/zh/dev/table/python/vectorized_python_udfs.html): Explanation of how to
define vectorized Python user-defined functions.
+- [PyFlink Table和Pandas DataFrame互转]({{ site.baseurl
}}/zh/dev/table/python/conversion_of_pandas.html): Explanation of how to
convert between PyFlink Table and Pandas DataFrame.
Review comment:
According to most copywriting guidelines, it's better to leave a blank
between an English word and a Chinese word.
##########
File path: docs/dev/table/python/conversion_of_pandas.zh.md
##########
@@ -0,0 +1,48 @@
+---
+title: "PyFlink Table和Pandas DataFrame互转"
+nav-parent_id: python_tableapi
+nav-pos: 50
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+It supports to convert between PyFlink Table and Pandas DataFrame.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Convert Pandas DataFrame to PyFlink Table
+
+It supports to create a PyFlink Table from a Pandas DataFrame. Internally, it
will serialize the Pandas DataFrame
+using Arrow columnar format at client side and the serialized data will be
processed and deserialized in Arrow source
+during execution. The Arrow source could also be used in streaming jobs and it
will properly handle the checkpoint
+and provides the exactly once guarantees.
+
+The following example shows how to create a PyFlink Table from a Pandas
DataFrame:
+
+{% highlight python %}
+import pandas as pd
+import numpy as np
+
+# Create a Pandas DataFrame
+pdf = pd.DataFrame(np.random.rand(1000, 2))
+
+# Create a PyFlink Table from a Pandas DataFrame
+table = t_env.from_pandas(pdf)
Review comment:
Maybe add more examples here. For example, how to specify table names,
which is commonly required.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]