lsyldliu commented on code in PR #23389:
URL: https://github.com/apache/flink/pull/23389#discussion_r1328339016
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -951,18 +957,28 @@ private Optional<DynamicTableSink>
getSupportsStagingDynamicTableSink(
createTableOperation.getTableIdentifier(),
catalogTable,
createTableOperation.isTemporary())) {
- DynamicTableSink dynamicTableSink =
- ExecutableOperationUtils.createDynamicTableSink(
- catalog,
- () ->
moduleManager.getFactory((Module::getTableSinkFactory)),
- createTableOperation.getTableIdentifier(),
- catalogTable,
- Collections.emptyMap(),
- tableConfig,
- resourceManager.getUserClassLoader(),
- createTableOperation.isTemporary());
- if (dynamicTableSink instanceof SupportsStaging) {
- return Optional.of(dynamicTableSink);
+ try {
+ DynamicTableSink dynamicTableSink =
+ ExecutableOperationUtils.createDynamicTableSink(
+ catalog,
+ () ->
moduleManager.getFactory((Module::getTableSinkFactory)),
+ createTableOperation.getTableIdentifier(),
+ catalogTable,
+ Collections.emptyMap(),
+ tableConfig,
+ resourceManager.getUserClassLoader(),
+ createTableOperation.isTemporary());
+ if (dynamicTableSink instanceof SupportsStaging) {
+ return Optional.of(dynamicTableSink);
+ }
+ } catch (Exception e) {
+ String msg =
+ String.format(
+ "The Table %s does not support atomicity,
please set %s to false and try again.",
+ createTableOperation.getTableIdentifier(),
+
TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED.key());
+ LOG.error(msg, e);
+ throw new UnsupportedOperationException(msg + " See logs
for more details.");
Review Comment:
I think we should throw `TableException` here, and we don't need to log the
exception, wrap the e to `TableException` would be more better.
##########
docs/content/docs/dev/table/sql/create.md:
##########
@@ -616,6 +616,7 @@ INSERT INTO my_rtas_table SELECT id, name, age FROM
source_table WHERE mod(id, 1
* Does not support specifying primary key constraints yet.
**Note:** By default, RTAS is non-atomic which means the table won't be
dropped or restored to its origin automatically if occur errors while inserting
data into the table.
+**Note:** In the InMemoryCatalog scenario, the drop table only removes the
metadata and does not clean up the underlying data, when reusing RTAS, it is
recommended to make sure that the underlying data has been cleaned up first,
otherwise it may look like the data is duplicated.
Review Comment:
**Note:** If using the in-memory catalog, the drop table only removes the
metadata, not the data in physical table. Therefore user needs to ensure that
the data is cleaned up first, otherwise, it will lead to data duplication.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]