rdblue commented on code in PR #4519:
URL: https://github.com/apache/iceberg/pull/4519#discussion_r846842598
##########
flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java:
##########
@@ -387,19 +387,13 @@ private String operatorName(String suffix) {
boolean upsertMode = upsert ||
PropertyUtil.propertyAsBoolean(table.properties(),
UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT);
- // Validate the equality fields and partition fields if we enable the
upsert mode.
+ // `upsert` mode is not allowed in Flink 1.12 due to correctness issues.
+ // See in https://github.com/apache/iceberg/pull/4364 for more
information.
if (upsertMode) {
- Preconditions.checkState(!overwrite,
- "OVERWRITE mode shouldn't be enable when configuring to use UPSERT
data stream.");
- Preconditions.checkState(!equalityFieldIds.isEmpty(),
- "Equality field columns shouldn't be empty when configuring to use
UPSERT data stream.");
- if (!table.spec().isUnpartitioned()) {
- for (PartitionField partitionField : table.spec().fields()) {
-
Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()),
- "In UPSERT mode, partition field '%s' should be included in
equality fields: '%s'",
- partitionField, equalityFieldColumns);
- }
- }
+ throw new UnsupportedOperationException(
+ "upsert mode is not supported in Flink 1.12 due to correctness
issues. Please upgrade to Flink 1.13+ if " +
Review Comment:
We typically use sentence case for error messages. Can you use either
`Upsert` or `UPSERT`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]