This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0e97ae3549f77578b36ee9d6da97d98fc6e76fa4
Author: Tartarus0zm <[email protected]>
AuthorDate: Wed Sep 20 11:55:39 2023 +0800

    [FLINK-33050][table] Atomicity is not supported prompting the user to 
disable
---
 .../table/api/internal/TableEnvironmentImpl.java   | 36 ++++++++++++++--------
 1 file changed, 24 insertions(+), 12 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 1b9648f6cb6..d25df607fd1 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -143,6 +143,7 @@ import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYN
  */
 @Internal
 public class TableEnvironmentImpl implements TableEnvironmentInternal {
+
     // Flag that tells if the TableSource/TableSink used in this environment 
is stream table
     // source/sink,
     // and this should always be true. This avoids too many hard code.
@@ -951,18 +952,29 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                     createTableOperation.getTableIdentifier(),
                     catalogTable,
                     createTableOperation.isTemporary())) {
-                DynamicTableSink dynamicTableSink =
-                        ExecutableOperationUtils.createDynamicTableSink(
-                                catalog,
-                                () -> 
moduleManager.getFactory((Module::getTableSinkFactory)),
-                                createTableOperation.getTableIdentifier(),
-                                catalogTable,
-                                Collections.emptyMap(),
-                                tableConfig,
-                                resourceManager.getUserClassLoader(),
-                                createTableOperation.isTemporary());
-                if (dynamicTableSink instanceof SupportsStaging) {
-                    return Optional.of(dynamicTableSink);
+                try {
+                    DynamicTableSink dynamicTableSink =
+                            ExecutableOperationUtils.createDynamicTableSink(
+                                    catalog,
+                                    () -> 
moduleManager.getFactory((Module::getTableSinkFactory)),
+                                    createTableOperation.getTableIdentifier(),
+                                    catalogTable,
+                                    Collections.emptyMap(),
+                                    tableConfig,
+                                    resourceManager.getUserClassLoader(),
+                                    createTableOperation.isTemporary());
+                    if (dynamicTableSink instanceof SupportsStaging) {
+                        return Optional.of(dynamicTableSink);
+                    }
+                } catch (Exception e) {
+                    throw new TableException(
+                            String.format(
+                                    "Fail to create DynamicTableSink for the 
table %s, "
+                                            + "maybe the table does not 
support atomicity of CTAS/RTAS, "
+                                            + "please set %s to false and try 
again.",
+                                    createTableOperation.getTableIdentifier(),
+                                    
TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED.key()),
+                            e);
                 }
             }
         }

Reply via email to