godfreyhe commented on a change in pull request #12042:
URL: https://github.com/apache/flink/pull/12042#discussion_r422232606
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -654,13 +655,39 @@ public TableResult executeSql(String statement) {
return executeOperation(operations.get(0));
}
+ @Override
+ public StatementSet createStatementSet() {
+ return new StatementSetImpl(this);
+ }
+
@Override
public TableResult executeInternal(List<ModifyOperation> operations) {
- if (operations.size() != 1) {
- throw new TableException("Only one ModifyOperation is
supported now.");
- }
+ List<Transformation<?>> transformations = translate(operations);
+ String jobName = extractJobName(operations);
+ Pipeline pipeline = execEnv.createPipeline(transformations,
tableConfig, jobName);
+ try {
+ JobClient jobClient = execEnv.executeAsync(pipeline);
+ TableSchema.Builder builder = TableSchema.builder();
+ Object[] affectedRowCounts = new
Long[operations.size()];
+ for (int i = 0; i < operations.size(); ++i) {
+ // if only one operation, field name is
'affected_rowcount', else is 'affected_rowcount_$i'
+ String fieldName = "affected_rowcount";
Review comment:
I think "affected_rowcount" is necessary, how about
"affected_rowcount_sink_table_name" ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]