godfreyhe commented on a change in pull request #12042:
URL: https://github.com/apache/flink/pull/12042#discussion_r422441179
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -654,13 +655,39 @@ public TableResult executeSql(String statement) {
return executeOperation(operations.get(0));
}
+ @Override
+ public StatementSet createStatementSet() {
+ return new StatementSetImpl(this);
+ }
+
@Override
public TableResult executeInternal(List<ModifyOperation> operations) {
- if (operations.size() != 1) {
- throw new TableException("Only one ModifyOperation is
supported now.");
- }
+ List<Transformation<?>> transformations = translate(operations);
+ String jobName = extractJobName(operations);
+ Pipeline pipeline = execEnv.createPipeline(transformations,
tableConfig, jobName);
+ try {
+ JobClient jobClient = execEnv.executeAsync(pipeline);
+ TableSchema.Builder builder = TableSchema.builder();
+ Object[] affectedRowCounts = new
Long[operations.size()];
+ for (int i = 0; i < operations.size(); ++i) {
+ // if only one operation, field name is
'affected_rowcount', else is 'affected_rowcount_$i'
+ String fieldName = "affected_rowcount";
Review comment:
in fact, the table qualified name may be long. in addition, we may need
to add catalog name and database name to distinguish between tables with the
same name.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]