twalthr commented on code in PR #27426:
URL: https://github.com/apache/flink/pull/27426#discussion_r2716031642
##########
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java:
##########
@@ -2258,6 +2258,34 @@ void testInvalidUpsertOverwrite() {
.fails("OVERWRITE expression is only used with INSERT
statement.");
}
+ @Test
+ void testInsertOnConflict() {
Review Comment:
Can we also add tests for statement sets?
##########
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java:
##########
@@ -632,6 +633,42 @@ Table fromChangelogStream(
DataStream<Row> toChangelogStream(
Table table, Schema targetSchema, ChangelogMode changelogMode);
+ /**
+ * Converts the given {@link Table} into a {@link DataStream} of changelog
entries.
+ *
+ * <p>Compared to {@link #toDataStream(Table)}, this method produces
instances of {@link Row}
+ * and sets the {@link RowKind} flag that is contained in every record
during runtime. The
+ * runtime behavior is similar to that of a {@link DynamicTableSink}.
+ *
+ * <p>This method requires an explicitly declared {@link ChangelogMode}.
For example, use {@link
+ * ChangelogMode#upsert()} if the stream will not contain {@link
RowKind#UPDATE_BEFORE}, or
+ * {@link ChangelogMode#insertOnly()} for non-updating streams.
+ *
+ * <p>Note that the type system of the table ecosystem is richer than the
one of the DataStream
+ * API. The table runtime will make sure to properly serialize the output
records to the first
+ * operator of the DataStream API. Afterwards, the {@link Types} semantics
of the DataStream API
+ * need to be considered.
+ *
+ * <p>If the input table contains a single rowtime column, it will be
propagated into a stream
+ * record's timestamp. Watermarks will be propagated as well. However, it
is also possible to
+ * write out the rowtime as a metadata column. See {@link
#toChangelogStream(Table, Schema)} for
+ * more information and examples on how to declare a {@link Schema}.
+ *
+ * @param table The {@link Table} to convert. It can be updating or
insert-only.
+ * @param targetSchema The {@link Schema} that decides about the final
external representation
+ * in {@link DataStream} records.
+ * @param changelogMode The required kinds of changes in the result
changelog. An exception will
+ * be thrown if the given updating table cannot be represented in this
changelog mode.
+ * @param sinkConflictStrategy Conflict strategy to use for conflicts when
an upsert key differs
+ * from the primary key of the sink.
Review Comment:
I would also use `InsertConflictStrategy` here. From a DataStream API
perspective there is no "sink", we just insert records into a stream.
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SinkConflictStrategy.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Defines the conflict resolution strategies for INSERT INTO statements when
the upsert key differs
+ * from the sink's primary key.
+ *
+ * <p>These strategies are used with the ON CONFLICT clause:
+ *
+ * <ul>
+ * <li>{@code ON CONFLICT DO ERROR} - Throw an exception on primary key
constraint violation
+ * <li>{@code ON CONFLICT DO NOTHING} - Keep the first record, ignore
subsequent conflicts
+ * <li>{@code ON CONFLICT DO DEDUPLICATE} - Maintain history for rollback
(current behavior)
+ * </ul>
+ */
+@PublicEvolving
+public enum SinkConflictStrategy {
Review Comment:
For future evolvability, I would suggest this to be a class with builder
pattern. This allows for future complex strategies with parameters.
##########
flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd:
##########
@@ -126,6 +126,7 @@
"org.apache.flink.sql.parser.ddl.SqlWatermark"
"org.apache.flink.sql.parser.dml.RichSqlInsert"
"org.apache.flink.sql.parser.dml.RichSqlInsertKeyword"
+ "org.apache.flink.sql.parser.dml.SinkConflictStrategy"
Review Comment:
nit: The parser doesn't know the concept of a sink.
```suggestion
"org.apache.flink.sql.parser.dml.InsertConflictStrategy"
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]