KurtYoung commented on a change in pull request #11862: URL: https://github.com/apache/flink/pull/11862#discussion_r415193966
########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.internal; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.delegation.Parser; +import org.apache.flink.table.operations.ModifyOperation; + +import java.util.List; + +/** + * An internal interface of {@link TableEnvironment} + * that defines extended methods used for {@link TableImpl}. + */ +@Internal +public interface TableEnvironmentInternal extends TableEnvironment { + + /** + * Return a {@link Parser} that provides methods for parsing a SQL string. + * + * @return initialized {@link Parser}. + */ + Parser getParser(); + + /** + * Returns a {@link CatalogManager} that deals with all catalog objects. + */ + CatalogManager getCatalogManager(); + + /** + * Execute the given operations and return the execution result. + * + * @param operations The operations to be executed. + * @return the affected row counts (-1 means unknown). + */ + TableResult executeOperations(List<ModifyOperation> operations); Review comment: Rename to `executeInserts` to be more presice? ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java ########## @@ -1113,4 +1113,61 @@ * </pre> */ FlatAggregateTable flatAggregate(Expression tableAggregateFunction); + + /** + * Writes the {@link Table} to a {@link TableSink} that was registered under the specified path, + * and then execute the insert operation. + * + * <p>See the documentation of {@link TableEnvironment#useDatabase(String)} or + * {@link TableEnvironment#useCatalog(String)} for the rules on the path resolution. + * + * <p>A batch {@link Table} can only be written to a + * {@code org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a + * {@code org.apache.flink.table.sinks.AppendStreamTableSink}, a + * {@code org.apache.flink.table.sinks.RetractStreamTableSink}, or an + * {@code org.apache.flink.table.sinks.UpsertStreamTableSink}. + * + * <p>Example: + * + * <pre> + * {@code + * Table table = tableEnv.fromQuery("select * from MyTable"); + * TableResult tableResult = table.executeInsert("MySink"); + * tableResult... + * } + * </pre> + * + * @param tablePath The path of the registered TableSink to which the Table is written. + * @return The insert operation execution result. + */ + TableResult executeInsert(String tablePath); + + /** + * Writes the {@link Table} to a {@link TableSink} that was registered under the specified path, + * and then execute the insert operation. + * + * <p>See the documentation of {@link TableEnvironment#useDatabase(String)} or + * {@link TableEnvironment#useCatalog(String)} for the rules on the path resolution. + * + * <p>A batch {@link Table} can only be written to a + * {@code org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a + * {@code org.apache.flink.table.sinks.AppendStreamTableSink}, a + * {@code org.apache.flink.table.sinks.RetractStreamTableSink}, or an + * {@code org.apache.flink.table.sinks.UpsertStreamTableSink}. + * + * <p>Example: + * + * <pre> + * {@code + * Table table = tableEnv.fromQuery("select * from MyTable"); + * TableResult tableResult = table.executeInsert("MySink", true); + * tableResult... + * } + * </pre> + * + * @param tablePath The path of the registered TableSink to which the Table is written. + * @param overwrite The flag that indicates whether the insert should overwrite existing data or not. + * @return The insert operation execution result. + */ + TableResult executeInsert(String tablePath, boolean overwrite); Review comment: IIRC not all the table sinks support overwrite. The logic of dealing with `overwrite=true` seems not complete. ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java ########## @@ -1113,4 +1113,61 @@ * </pre> */ FlatAggregateTable flatAggregate(Expression tableAggregateFunction); + + /** + * Writes the {@link Table} to a {@link TableSink} that was registered under the specified path, + * and then execute the insert operation. + * + * <p>See the documentation of {@link TableEnvironment#useDatabase(String)} or + * {@link TableEnvironment#useCatalog(String)} for the rules on the path resolution. + * + * <p>A batch {@link Table} can only be written to a + * {@code org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a + * {@code org.apache.flink.table.sinks.AppendStreamTableSink}, a + * {@code org.apache.flink.table.sinks.RetractStreamTableSink}, or an + * {@code org.apache.flink.table.sinks.UpsertStreamTableSink}. + * + * <p>Example: + * + * <pre> + * {@code + * Table table = tableEnv.fromQuery("select * from MyTable"); + * TableResult tableResult = table.executeInsert("MySink"); + * tableResult... + * } + * </pre> + * + * @param tablePath The path of the registered TableSink to which the Table is written. + * @return The insert operation execution result. + */ + TableResult executeInsert(String tablePath); + + /** + * Writes the {@link Table} to a {@link TableSink} that was registered under the specified path, + * and then execute the insert operation. + * + * <p>See the documentation of {@link TableEnvironment#useDatabase(String)} or + * {@link TableEnvironment#useCatalog(String)} for the rules on the path resolution. + * + * <p>A batch {@link Table} can only be written to a + * {@code org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a + * {@code org.apache.flink.table.sinks.AppendStreamTableSink}, a + * {@code org.apache.flink.table.sinks.RetractStreamTableSink}, or an + * {@code org.apache.flink.table.sinks.UpsertStreamTableSink}. + * + * <p>Example: + * + * <pre> + * {@code + * Table table = tableEnv.fromQuery("select * from MyTable"); + * TableResult tableResult = table.executeInsert("MySink", true); + * tableResult... + * } + * </pre> + * + * @param tablePath The path of the registered TableSink to which the Table is written. + * @param overwrite The flag that indicates whether the insert should overwrite existing data or not. + * @return The insert operation execution result. + */ + TableResult executeInsert(String tablePath, boolean overwrite); Review comment: BTW, there is also no tests which set this to true ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
