This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c5f2a01073a6324bf01f5422cac7543352c26914 Author: Timo Walther <[email protected]> AuthorDate: Wed Jul 7 16:02:29 2021 +0200 [hotfix][table-api-java] Improve JavaDocs for DML behavior --- .../java/org/apache/flink/table/api/Table.java | 45 ++++++++++++++++---- .../apache/flink/table/api/TableEnvironment.java | 49 +++++++++++++++++----- 2 files changed, 75 insertions(+), 19 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java index e5a75a9..bc1a292 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java @@ -19,6 +19,7 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; @@ -1322,11 +1323,18 @@ public interface Table { * <p>Example: * * <pre>{@code - * Table table = tableEnv.fromQuery("select * from MyTable"); - * TableResult tableResult = table.executeInsert("MySink"); - * tableResult... + * Table table = tableEnv.sqlQuery("SELECT * FROM MyTable"); + * TableResult tableResult = table.executeInsert("MySinkTable"); + * tableResult.await(); * }</pre> * + * <p>If multiple pipelines should insert data into one or more sink tables as part of a single + * execution, use a {@link StatementSet} (see {@link TableEnvironment#createStatementSet()}). + * + * <p>By default, all insertion operations are executed asynchronously. Use {@link + * TableResult#await()} or {@link TableResult#getJobClient()} to monitor the execution. Set + * {@link TableConfigOptions#TABLE_DML_SYNC} for always synchronous execution. + * * @param tablePath The path of the registered table (backed by a {@link DynamicTableSink}). * @return The insert operation execution result. */ @@ -1343,11 +1351,18 @@ public interface Table { * <p>Example: * * <pre>{@code - * Table table = tableEnv.fromQuery("select * from MyTable"); - * TableResult tableResult = table.executeInsert("MySink", true); - * tableResult... + * Table table = tableEnv.sqlQuery("SELECT * FROM MyTable"); + * TableResult tableResult = table.executeInsert("MySinkTable", true); + * tableResult.await(); * }</pre> * + * <p>If multiple pipelines should insert data into one or more sink tables as part of a single + * execution, use a {@link StatementSet} (see {@link TableEnvironment#createStatementSet()}). + * + * <p>By default, all insertion operations are executed asynchronously. Use {@link + * TableResult#await()} or {@link TableResult#getJobClient()} to monitor the execution. Set + * {@link TableConfigOptions#TABLE_DML_SYNC} for always synchronous execution. + * * @param tablePath The path of the registered table (backed by a {@link DynamicTableSink}). * @param overwrite The flag that indicates whether the insert should overwrite existing data or * not. @@ -1380,6 +1395,13 @@ public interface Table { * .build()); * }</pre> * + * <p>If multiple pipelines should insert data into one or more sink tables as part of a single + * execution, use a {@link StatementSet} (see {@link TableEnvironment#createStatementSet()}). + * + * <p>By default, all insertion operations are executed asynchronously. Use {@link + * TableResult#await()} or {@link TableResult#getJobClient()} to monitor the execution. Set + * {@link TableConfigOptions#TABLE_DML_SYNC} for always synchronous execution. + * * @param descriptor Descriptor describing the sink table into which data should be inserted. */ TableResult executeInsert(TableDescriptor descriptor); @@ -1409,6 +1431,13 @@ public interface Table { * .build(), true); * }</pre> * + * <p>If multiple pipelines should insert data into one or more sink tables as part of a single + * execution, use a {@link StatementSet} (see {@link TableEnvironment#createStatementSet()}). + * + * <p>By default, all insertion operations are executed asynchronously. Use {@link + * TableResult#await()} or {@link TableResult#getJobClient()} to monitor the execution. Set + * {@link TableConfigOptions#TABLE_DML_SYNC} for always synchronous execution. + * * @param descriptor Descriptor describing the sink table into which data should be inserted. * @param overwrite Indicates whether existing data should be overwritten. */ @@ -1418,9 +1447,9 @@ public interface Table { * Collects the contents of the current table local client. * * <pre>{@code - * Table table = tableEnv.fromQuery("select * from MyTable"); + * Table table = tableEnv.sqlQuery("SELECT * FROM MyTable"); * TableResult tableResult = table.execute(); - * tableResult... + * tableResult.print(); * }</pre> */ TableResult execute(); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java index 8117682..a5cf949 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java @@ -21,6 +21,7 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogTable; @@ -730,8 +731,12 @@ public interface TableEnvironment { * Table tab = tableEnv.from("catalogName.`db.Name`.Table"); * }</pre> * + * <p>Note that the returned {@link Table} is an API object and only contains a pipeline + * description. It actually corresponds to a <i>view</i> in SQL terms. Call {@link + * Table#execute()} to trigger an execution. + * * @param path The path of a table API object to scan. - * @return Either a table or virtual table (=view). + * @return The {@link Table} object describing the pipeline for further transformations. * @see TableEnvironment#useCatalog(String) * @see TableEnvironment#useDatabase(String) */ @@ -756,6 +761,12 @@ public interface TableEnvironment { * .build()) * .build()); * }</pre> + * + * <p>Note that the returned {@link Table} is an API object and only contains a pipeline + * description. It actually corresponds to a <i>view</i> in SQL terms. Call {@link + * Table#execute()} to trigger an execution. + * + * @return The {@link Table} object describing the pipeline for further transformations. */ Table from(TableDescriptor descriptor); @@ -977,31 +988,47 @@ public interface TableEnvironment { String[] getCompletionHints(String statement, int position); /** - * Evaluates a SQL query on registered tables and retrieves the result as a {@link Table}. + * Evaluates a SQL query on registered tables and returns a {@link Table} object describing the + * pipeline for further transformations. * - * <p>All tables referenced by the query must be registered in the TableEnvironment. A {@link - * Table} is automatically registered when its {@link Table#toString()} method is called, for - * example when it is embedded into a String. Hence, SQL queries can directly reference a {@link - * Table} as follows: + * <p>All tables and other objects referenced by the query must be registered in the {@link + * TableEnvironment}. For example, use {@link #createTemporaryView(String, Table)}) for + * referencing a {@link Table} object or {@link #createTemporarySystemFunction(String, Class)} + * for functions. + * + * <p>Alternatively, a {@link Table} object is automatically registered when its {@link + * Table#toString()} method is called, for example when it is embedded into a string. Hence, SQL + * queries can directly reference a {@link Table} object inline (i.e. anonymous) as follows: * * <pre>{@code * Table table = ...; * String tableName = table.toString(); * // the table is not registered to the table environment - * tEnv.sqlQuery("SELECT * FROM tableName"); + * tEnv.sqlQuery("SELECT * FROM " + tableName + " WHERE a > 12"); * }</pre> * + * <p>Note that the returned {@link Table} is an API object and only contains a pipeline + * description. It actually corresponds to a <i>view</i> in SQL terms. Call {@link + * Table#execute()} to trigger an execution or use {@link #executeSql(String)} directly. + * * @param query The SQL query to evaluate. - * @return The result of the query as Table + * @return The {@link Table} object describing the pipeline for further transformations. */ Table sqlQuery(String query); /** - * Execute the given single statement, and return the execution result. + * Executes the given single statement and returns the execution result. * * <p>The statement can be DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE. For DML and DQL, this method - * returns TableResult once the job has been submitted. For DDL and DCL statements, TableResult - * is returned once the operation has finished. + * returns {@link TableResult} once the job has been submitted. For DDL and DCL statements, + * {@link TableResult} is returned once the operation has finished. + * + * <p>If multiple pipelines should insert data into one or more sink tables as part of a single + * execution, use a {@link StatementSet} (see {@link TableEnvironment#createStatementSet()}). + * + * <p>By default, all DML operations are executed asynchronously. Use {@link + * TableResult#await()} or {@link TableResult#getJobClient()} to monitor the execution. Set + * {@link TableConfigOptions#TABLE_DML_SYNC} for always synchronous execution. * * @return content for DQL/SHOW/DESCRIBE/EXPLAIN, the affected row count for `DML` (-1 means * unknown), or a string message ("OK") for other statements.
