WeiZhong94 commented on a change in pull request #13099:
URL: https://github.com/apache/flink/pull/13099#discussion_r468369160
##########
File path:
flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
##########
@@ -679,6 +707,24 @@ public boolean hasNext() {
};
}
+ private static Iterator<Row> filterOutRetractRows(Iterator<Row> data) {
+ LinkedHashMultiset<Row> result = LinkedHashMultiset.create();
+ while (data.hasNext()) {
+ Row element = data.next();
+ if (element.getKind() == RowKind.INSERT ||
element.getKind() == RowKind.UPDATE_AFTER) {
+ element.setKind(RowKind.INSERT);
+ result.add(element);
+ } else {
+ element.setKind(RowKind.INSERT);
+ if (!result.remove(element)) {
+ throw new RuntimeException(
+ String.format("Could not remove
element '%s', should never happen", element));
Review comment:
ditto
##########
File path:
flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
##########
@@ -611,6 +617,21 @@ private static void readFully(ReadableByteChannel channel,
ByteBuffer dst) throw
*/
public static CustomIterator<byte[]> collectAsPandasDataFrame(Table
table, int maxArrowBatchSize) throws Exception {
checkArrowUsable();
+ boolean isRetractTable = false;
+ if (isStreamingMode(table)) {
+ StreamTableEnvironment tableEnv =
(StreamTableEnvironment) ((TableImpl) table).getTableEnvironment();
+ try {
+ tableEnv.toAppendStream(table, Row.class);
+ } catch (Throwable t) {
+ if (t.getMessage().contains("toAppendStream
doesn't support consuming update changes") ||
+ t.getMessage().contains("Table
is not an append-only table")) {
+ isRetractTable = true;
+ } else {
+ throw new RuntimeException("Failed to
determine whether the given table is append only", t);
Review comment:
Add a period at the end?
##########
File path:
flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
##########
@@ -692,6 +738,22 @@ private static boolean isBlinkPlanner(Table table) {
}
}
+ private static boolean isStreamingMode(Table table) throws Exception {
+ TableEnvironment tableEnv = ((TableImpl)
table).getTableEnvironment();
+ if (tableEnv instanceof BatchTableEnvironment || tableEnv
instanceof BatchTableEnvImpl) {
+ return false;
+ } else if (tableEnv instanceof StreamTableEnvironment) {
+ return true;
+ } else if (tableEnv instanceof TableEnvironmentImpl) {
+ java.lang.reflect.Field isStreamingModeMethod =
TableEnvironmentImpl.class.getDeclaredField("isStreamingMode");
+ isStreamingModeMethod.setAccessible(true);
+ return (boolean) isStreamingModeMethod.get(tableEnv);
+ } else {
+ throw new RuntimeException(String.format(
+ "Could not determine the streaming mode for
table environment class %s", tableEnv.getClass()));
Review comment:
ditto
##########
File path: flink-python/pyflink/table/tests/test_pandas_conversion.py
##########
@@ -157,4 +167,5 @@ class BlinkBatchPandasConversionTests(PandasConversionTests,
class BlinkStreamPandasConversionTests(PandasConversionITTests,
PyFlinkBlinkStreamTableTestCase):
+
Review comment:
unnecessary changes
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]