godfreyhe commented on a change in pull request #11862:
URL: https://github.com/apache/flink/pull/11862#discussion_r415500318
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
##########
@@ -1113,4 +1113,61 @@
* </pre>
*/
FlatAggregateTable flatAggregate(Expression tableAggregateFunction);
+
+ /**
+ * Writes the {@link Table} to a {@link TableSink} that was registered
under the specified path,
+ * and then execute the insert operation.
+ *
+ * <p>See the documentation of {@link
TableEnvironment#useDatabase(String)} or
+ * {@link TableEnvironment#useCatalog(String)} for the rules on the
path resolution.
+ *
+ * <p>A batch {@link Table} can only be written to a
+ * {@code org.apache.flink.table.sinks.BatchTableSink}, a streaming
{@link Table} requires a
+ * {@code org.apache.flink.table.sinks.AppendStreamTableSink}, a
+ * {@code org.apache.flink.table.sinks.RetractStreamTableSink}, or an
+ * {@code org.apache.flink.table.sinks.UpsertStreamTableSink}.
+ *
+ * <p>Example:
+ *
+ * <pre>
+ * {@code
+ * Table table = tableEnv.fromQuery("select * from MyTable");
+ * TableResult tableResult = table.executeInsert("MySink");
+ * tableResult...
+ * }
+ * </pre>
+ *
+ * @param tablePath The path of the registered TableSink to which the
Table is written.
+ * @return The insert operation execution result.
+ */
+ TableResult executeInsert(String tablePath);
+
+ /**
+ * Writes the {@link Table} to a {@link TableSink} that was registered
under the specified path,
+ * and then execute the insert operation.
+ *
+ * <p>See the documentation of {@link
TableEnvironment#useDatabase(String)} or
+ * {@link TableEnvironment#useCatalog(String)} for the rules on the
path resolution.
+ *
+ * <p>A batch {@link Table} can only be written to a
+ * {@code org.apache.flink.table.sinks.BatchTableSink}, a streaming
{@link Table} requires a
+ * {@code org.apache.flink.table.sinks.AppendStreamTableSink}, a
+ * {@code org.apache.flink.table.sinks.RetractStreamTableSink}, or an
+ * {@code org.apache.flink.table.sinks.UpsertStreamTableSink}.
+ *
+ * <p>Example:
+ *
+ * <pre>
+ * {@code
+ * Table table = tableEnv.fromQuery("select * from MyTable");
+ * TableResult tableResult = table.executeInsert("MySink", true);
+ * tableResult...
+ * }
+ * </pre>
+ *
+ * @param tablePath The path of the registered TableSink to which the
Table is written.
+ * @param overwrite The flag that indicates whether the insert should
overwrite existing data or not.
+ * @return The insert operation execution result.
+ */
+ TableResult executeInsert(String tablePath, boolean overwrite);
Review comment:
> IIRC not all the table sinks support overwrite. The logic of dealing
with `overwrite=true` seems not complete.
both planners have checked whether a sink is an `OverwritableTableSink`.
I will add some tests about override mode
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]