This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0e97ae3549f77578b36ee9d6da97d98fc6e76fa4 Author: Tartarus0zm <[email protected]> AuthorDate: Wed Sep 20 11:55:39 2023 +0800 [FLINK-33050][table] Atomicity is not supported prompting the user to disable --- .../table/api/internal/TableEnvironmentImpl.java | 36 ++++++++++++++-------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 1b9648f6cb6..d25df607fd1 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -143,6 +143,7 @@ import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYN */ @Internal public class TableEnvironmentImpl implements TableEnvironmentInternal { + // Flag that tells if the TableSource/TableSink used in this environment is stream table // source/sink, // and this should always be true. This avoids too many hard code. @@ -951,18 +952,29 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { createTableOperation.getTableIdentifier(), catalogTable, createTableOperation.isTemporary())) { - DynamicTableSink dynamicTableSink = - ExecutableOperationUtils.createDynamicTableSink( - catalog, - () -> moduleManager.getFactory((Module::getTableSinkFactory)), - createTableOperation.getTableIdentifier(), - catalogTable, - Collections.emptyMap(), - tableConfig, - resourceManager.getUserClassLoader(), - createTableOperation.isTemporary()); - if (dynamicTableSink instanceof SupportsStaging) { - return Optional.of(dynamicTableSink); + try { + DynamicTableSink dynamicTableSink = + ExecutableOperationUtils.createDynamicTableSink( + catalog, + () -> moduleManager.getFactory((Module::getTableSinkFactory)), + createTableOperation.getTableIdentifier(), + catalogTable, + Collections.emptyMap(), + tableConfig, + resourceManager.getUserClassLoader(), + createTableOperation.isTemporary()); + if (dynamicTableSink instanceof SupportsStaging) { + return Optional.of(dynamicTableSink); + } + } catch (Exception e) { + throw new TableException( + String.format( + "Fail to create DynamicTableSink for the table %s, " + + "maybe the table does not support atomicity of CTAS/RTAS, " + + "please set %s to false and try again.", + createTableOperation.getTableIdentifier(), + TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED.key()), + e); } } }
