Tartarus0zm commented on code in PR #23389:
URL: https://github.com/apache/flink/pull/23389#discussion_r1329950195


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -951,18 +957,28 @@ private Optional<DynamicTableSink> 
getSupportsStagingDynamicTableSink(
                     createTableOperation.getTableIdentifier(),
                     catalogTable,
                     createTableOperation.isTemporary())) {
-                DynamicTableSink dynamicTableSink =
-                        ExecutableOperationUtils.createDynamicTableSink(
-                                catalog,
-                                () -> 
moduleManager.getFactory((Module::getTableSinkFactory)),
-                                createTableOperation.getTableIdentifier(),
-                                catalogTable,
-                                Collections.emptyMap(),
-                                tableConfig,
-                                resourceManager.getUserClassLoader(),
-                                createTableOperation.isTemporary());
-                if (dynamicTableSink instanceof SupportsStaging) {
-                    return Optional.of(dynamicTableSink);
+                try {
+                    DynamicTableSink dynamicTableSink =
+                            ExecutableOperationUtils.createDynamicTableSink(
+                                    catalog,
+                                    () -> 
moduleManager.getFactory((Module::getTableSinkFactory)),
+                                    createTableOperation.getTableIdentifier(),
+                                    catalogTable,
+                                    Collections.emptyMap(),
+                                    tableConfig,
+                                    resourceManager.getUserClassLoader(),
+                                    createTableOperation.isTemporary());
+                    if (dynamicTableSink instanceof SupportsStaging) {
+                        return Optional.of(dynamicTableSink);
+                    }
+                } catch (Exception e) {
+                    String msg =
+                            String.format(
+                                    "The Table %s does not support atomicity, 
please set %s to false and try again.",
+                                    createTableOperation.getTableIdentifier(),
+                                    
TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED.key());
+                    LOG.error(msg, e);
+                    throw new UnsupportedOperationException(msg + " See logs 
for more details.");

Review Comment:
   thanks for your review!  The root cause is not clear enough if it's executed 
via sql-client, so we want to be able to tell the user that this 
DynamicTableSink doesn't support atomicity and needs to be turned off.
   See [FLINK-32787](https://issues.apache.org/jira/browse/FLINK-32787) for 
details.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to