luoyuxia commented on code in PR #23389:
URL: https://github.com/apache/flink/pull/23389#discussion_r1329972324


##########
docs/content.zh/docs/dev/table/sql/create.md:
##########
@@ -615,6 +615,7 @@ INSERT INTO my_rtas_table SELECT id, name, age FROM 
source_table WHERE mod(id, 1
 * 暂不支持主键约束。
 
 **注意:** 默认情况下,RTAS 是非原子性的,这意味着如果在向表中插入数据时发生错误,该表不会被自动删除或还原成原来的表。
+**注意:** 在 InMemoryCatalog 场景下,删除表只会删除元数据,并不清理底层数据,因此在重复使用 RTAS 
时,建议先确保底层数据已清理,否则数据看起来是重复的。

Review Comment:
   Also, don't forget to update this according the comment in english version.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -951,18 +957,28 @@ private Optional<DynamicTableSink> 
getSupportsStagingDynamicTableSink(
                     createTableOperation.getTableIdentifier(),
                     catalogTable,
                     createTableOperation.isTemporary())) {
-                DynamicTableSink dynamicTableSink =
-                        ExecutableOperationUtils.createDynamicTableSink(
-                                catalog,
-                                () -> 
moduleManager.getFactory((Module::getTableSinkFactory)),
-                                createTableOperation.getTableIdentifier(),
-                                catalogTable,
-                                Collections.emptyMap(),
-                                tableConfig,
-                                resourceManager.getUserClassLoader(),
-                                createTableOperation.isTemporary());
-                if (dynamicTableSink instanceof SupportsStaging) {
-                    return Optional.of(dynamicTableSink);
+                try {
+                    DynamicTableSink dynamicTableSink =
+                            ExecutableOperationUtils.createDynamicTableSink(
+                                    catalog,
+                                    () -> 
moduleManager.getFactory((Module::getTableSinkFactory)),
+                                    createTableOperation.getTableIdentifier(),
+                                    catalogTable,
+                                    Collections.emptyMap(),
+                                    tableConfig,
+                                    resourceManager.getUserClassLoader(),
+                                    createTableOperation.isTemporary());
+                    if (dynamicTableSink instanceof SupportsStaging) {
+                        return Optional.of(dynamicTableSink);
+                    }
+                } catch (Exception e) {
+                    String msg =
+                            String.format(
+                                    "The Table %s does not support atomicity, 
please set %s to false and try again.",

Review Comment:
   Suggesstion:
   `Fail to create DynamicTableSink for the table %s, maybe the table does not 
support atomicity of CTAS/RTAS, please set %s to false and try again.`



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -951,18 +957,28 @@ private Optional<DynamicTableSink> 
getSupportsStagingDynamicTableSink(
                     createTableOperation.getTableIdentifier(),
                     catalogTable,
                     createTableOperation.isTemporary())) {
-                DynamicTableSink dynamicTableSink =
-                        ExecutableOperationUtils.createDynamicTableSink(
-                                catalog,
-                                () -> 
moduleManager.getFactory((Module::getTableSinkFactory)),
-                                createTableOperation.getTableIdentifier(),
-                                catalogTable,
-                                Collections.emptyMap(),
-                                tableConfig,
-                                resourceManager.getUserClassLoader(),
-                                createTableOperation.isTemporary());
-                if (dynamicTableSink instanceof SupportsStaging) {
-                    return Optional.of(dynamicTableSink);
+                try {
+                    DynamicTableSink dynamicTableSink =
+                            ExecutableOperationUtils.createDynamicTableSink(
+                                    catalog,
+                                    () -> 
moduleManager.getFactory((Module::getTableSinkFactory)),
+                                    createTableOperation.getTableIdentifier(),
+                                    catalogTable,
+                                    Collections.emptyMap(),
+                                    tableConfig,
+                                    resourceManager.getUserClassLoader(),
+                                    createTableOperation.isTemporary());
+                    if (dynamicTableSink instanceof SupportsStaging) {
+                        return Optional.of(dynamicTableSink);
+                    }
+                } catch (Exception e) {
+                    String msg =
+                            String.format(
+                                    "The Table %s does not support atomicity, 
please set %s to false and try again.",
+                                    createTableOperation.getTableIdentifier(),
+                                    
TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED.key());
+                    LOG.error(msg, e);
+                    throw new UnsupportedOperationException(msg + " See logs 
for more details.");

Review Comment:
   +1 for this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to