snuyanzin commented on code in PR #26331:
URL: https://github.com/apache/flink/pull/26331#discussion_r2009252120


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java:
##########
@@ -787,4 +827,87 @@ public Table select(Expression... fields) {
                             Arrays.asList(fields), table.operationTree, 
overWindows));
         }
     }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Partitioned Table
+    // 
--------------------------------------------------------------------------------------------
+
+    private static final class PartitionedTableImpl implements 
PartitionedTable {
+
+        private final TableImpl table;
+        private final List<Expression> partitionKeys;
+
+        private PartitionedTableImpl(TableImpl table, List<Expression> 
partitionKeys) {
+            this.table = table;
+            this.partitionKeys = partitionKeys;
+        }
+
+        @Override
+        public ApiExpression asArgument(String name) {
+            return createArgumentExpression(
+                    createPartitionQueryOperation(), table.tableEnvironment, 
name);
+        }
+
+        @Override
+        public Table process(String path, Object... arguments) {
+            return table.tableEnvironment.fromCall(
+                    path,
+                    unionTableAndArguments(
+                            createPartitionQueryOperation(), 
table.tableEnvironment, arguments));
+        }
+
+        @Override
+        public Table process(Class<? extends UserDefinedFunction> function, 
Object... arguments) {
+            return table.tableEnvironment.fromCall(
+                    function,
+                    unionTableAndArguments(
+                            createPartitionQueryOperation(), 
table.tableEnvironment, arguments));
+        }
+
+        private QueryOperation createPartitionQueryOperation() {
+            return table.operationTreeBuilder.partition(partitionKeys, 
table.operationTree);
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Shared methods
+    // 
--------------------------------------------------------------------------------------------
+
+    private TableImpl createTable(QueryOperation operation) {
+        return new TableImpl(tableEnvironment, operation, 
operationTreeBuilder, lookupResolver);
+    }
+
+    private List<Expression> preprocessExpressions(List<Expression> 
expressions) {
+        return preprocessExpressions(expressions.toArray(new Expression[0]));
+    }
+
+    private List<Expression> preprocessExpressions(Expression[] expressions) {
+        return Arrays.stream(expressions)
+                .map(f -> f.accept(lookupResolver))
+                .collect(Collectors.toList());
+    }
+
+    private static Object[] unionTableAndArguments(
+            QueryOperation queryOperation, TableEnvironment env, Object... 
arguments) {
+        return Stream.concat(
+                        Stream.of(ApiExpressionUtils.tableRef("ptf_arg", 
queryOperation, env)),
+                        Stream.of(arguments))
+                .toArray();
+    }
+
+    private static ApiExpression createArgumentExpression(
+            QueryOperation queryOperation, TableEnvironment env, String name) {
+        return new ApiExpression(
+                ApiExpressionUtils.unresolvedCall(
+                        BuiltInFunctionDefinitions.ASSIGNMENT,
+                        lit(name),
+                        ApiExpressionUtils.tableRef(name, queryOperation, 
env)));
+    }
+
+    private void checkCommonTableEnvironment(Table right) {
+        if (((TableImpl) right).getTableEnvironment() != tableEnvironment) {
+            throw new ValidationException(
+                    "Only tables from the same TableEnvironment can be 
joined.");
+        }
+    }

Review Comment:
   Since `Table` is `@PublicEvolving`
   couldn't it happen that someone can have a custom implementation of it?
   In this case shouldn't we first have a separate check for `instanceof` 
before casting? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to