WeiZhong94 commented on a change in pull request #12148:
URL: https://github.com/apache/flink/pull/12148#discussion_r425097800



##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
##########
@@ -588,6 +607,89 @@ private static void readFully(ReadableByteChannel channel, 
ByteBuffer dst) throw
                }
        }
 
+       /**
+        * Convert Flink table to Pandas DataFrame.
+        */
+       public static Iterator<byte[]> collectAsPandasDataFrame(Table table, 
int maxArrowBatchSize) throws Exception {
+               BufferAllocator allocator = 
getRootAllocator().newChildAllocator("collectAsPandasDataFrame", 0, 
Long.MAX_VALUE);
+               RowType rowType = (RowType) 
table.getSchema().toRowDataType().getLogicalType();
+               VectorSchemaRoot root = 
VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               ArrowStreamWriter arrowStreamWriter = new 
ArrowStreamWriter(root, null, baos);
+               arrowStreamWriter.start();
+
+               ArrowWriter arrowWriter;
+               Iterator<Row> results = table.execute().collect();
+               Iterator convertedResults;
+               if (isBlinkPlanner(table)) {
+                       arrowWriter = createRowDataArrowWriter(root, rowType);
+                       convertedResults = new Iterator<RowData>() {
+                               @Override
+                               public boolean hasNext() {
+                                       return results.hasNext();
+                               }
+
+                               @Override
+                               public RowData next() {
+                                       // The SelectTableSink of blink planner 
will convert the table schema and we
+                                       // need to keep the table schema used 
here be consistent with the converted table schema
+                                       TableSchema convertedTableSchema = 
SelectTableSinkSchemaConverter.changeDefaultConversionClass(table.getSchema());
+                                       
DataFormatConverters.DataFormatConverter converter = 
DataFormatConverters.getConverterForDataType(convertedTableSchema.toRowDataType());
+                                       return (RowData) 
converter.toInternal(results.next());
+                               }
+                       };
+               } else {
+                       arrowWriter = createRowArrowWriter(root, rowType);
+                       convertedResults = results;
+               }
+
+               return new Iterator<byte[]>() {
+                       @Override
+                       public boolean hasNext() {
+                               return convertedResults.hasNext();
+                       }
+
+                       @Override
+                       public byte[] next() {
+                               try {
+                                       int i = 0;
+                                       while (convertedResults.hasNext() && i 
< maxArrowBatchSize) {
+                                               i++;
+                                               
arrowWriter.write(convertedResults.next());
+                                       }
+                                       arrowWriter.finish();
+                                       arrowStreamWriter.writeBatch();
+                                       return baos.toByteArray();
+                               } catch (Throwable t) {
+                                       String msg = "Failed to serialize the 
data of the table";
+                                       LOG.error(msg, t);
+                                       throw new RuntimeException(msg, t);
+                               } finally {
+                                       arrowWriter.reset();
+                                       baos.reset();
+
+                                       if (!hasNext()) {
+                                               root.close();
+                                               allocator.close();
+                                       }
+                               }
+                       }
+               };
+       }
+
+       private static boolean isBlinkPlanner(Table table) {
+               TableEnvironment tableEnv = ((TableImpl) 
table).getTableEnvironment();
+               if (tableEnv instanceof BatchTableEnvironment || tableEnv 
instanceof TableEnvImpl) {

Review comment:
       The `tableEnv instanceof BatchTableEnvironment` is unnecessary.

##########
File path: docs/dev/table/python/conversion_of_pandas.md
##########
@@ -57,3 +57,24 @@ table = t_env.from_pandas(pdf,
                           DataTypes.ROW([DataTypes.FIELD("f0", 
DataTypes.DOUBLE()),
                                          DataTypes.FIELD("f1", 
DataTypes.DOUBLE())])
 {% endhighlight %}
+
+## Convert PyFlink Table to Pandas DataFrame
+
+It also supports to convert a PyFlink Table to a Pandas DataFrame. Internally, 
it will materialize the results of the 

Review comment:
       supports to convert -> supports converting?

##########
File path: docs/dev/table/python/conversion_of_pandas.md
##########
@@ -57,3 +57,24 @@ table = t_env.from_pandas(pdf,
                           DataTypes.ROW([DataTypes.FIELD("f0", 
DataTypes.DOUBLE()),
                                          DataTypes.FIELD("f1", 
DataTypes.DOUBLE())])
 {% endhighlight %}
+
+## Convert PyFlink Table to Pandas DataFrame
+
+It also supports to convert a PyFlink Table to a Pandas DataFrame. Internally, 
it will materialize the results of the 
+table and serialize them into multiple arrow batches of Arrow columnar format 
at client side. The maximum arrow batch size

Review comment:
       arrow -> Arrow?

##########
File path: flink-python/pyflink/table/tests/test_pandas_conversion.py
##########
@@ -130,6 +132,17 @@ def test_from_pandas(self):
                             "1970-01-01 00:00:00.123,[hello, 中文],1,hello,"
                             "1970-01-01 00:00:00.123,[1, 2]"])
 
+    def test_to_pandas(self):
+        table = self.t_env.from_pandas(self.pdf, self.data_type)
+        result_pdf = table.to_pandas()
+        self.assertTrue(2, len(result_pdf))

Review comment:
       assertEqual?

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
##########
@@ -588,6 +607,89 @@ private static void readFully(ReadableByteChannel channel, 
ByteBuffer dst) throw
                }
        }
 
+       /**
+        * Convert Flink table to Pandas DataFrame.
+        */
+       public static Iterator<byte[]> collectAsPandasDataFrame(Table table, 
int maxArrowBatchSize) throws Exception {
+               BufferAllocator allocator = 
getRootAllocator().newChildAllocator("collectAsPandasDataFrame", 0, 
Long.MAX_VALUE);
+               RowType rowType = (RowType) 
table.getSchema().toRowDataType().getLogicalType();
+               VectorSchemaRoot root = 
VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               ArrowStreamWriter arrowStreamWriter = new 
ArrowStreamWriter(root, null, baos);
+               arrowStreamWriter.start();
+
+               ArrowWriter arrowWriter;
+               Iterator<Row> results = table.execute().collect();
+               Iterator convertedResults;
+               if (isBlinkPlanner(table)) {
+                       arrowWriter = createRowDataArrowWriter(root, rowType);
+                       convertedResults = new Iterator<RowData>() {
+                               @Override
+                               public boolean hasNext() {
+                                       return results.hasNext();
+                               }
+
+                               @Override
+                               public RowData next() {
+                                       // The SelectTableSink of blink planner 
will convert the table schema and we
+                                       // need to keep the table schema used 
here be consistent with the converted table schema
+                                       TableSchema convertedTableSchema = 
SelectTableSinkSchemaConverter.changeDefaultConversionClass(table.getSchema());
+                                       
DataFormatConverters.DataFormatConverter converter = 
DataFormatConverters.getConverterForDataType(convertedTableSchema.toRowDataType());

Review comment:
       Break this line as it is too long?

##########
File path: flink-python/pyflink/table/serializers.py
##########
@@ -51,3 +53,26 @@ def load_from_stream(self, stream):
         reader = pa.ipc.open_stream(stream)
         for batch in reader:
             yield arrow_to_pandas(self._timezone, self._field_types, [batch])
+
+    def load_from_iterator(self, itor):
+        class IteratorIO(io.RawIOBase):
+            def __init__(self, itor):

Review comment:
       add super().__init__() method?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to