fsk119 commented on code in PR #22525:
URL: https://github.com/apache/flink/pull/22525#discussion_r1195847470
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java:
##########
@@ -78,6 +87,11 @@ public SupportsRowLevelDelete.RowLevelDeleteMode
getRowLevelDeleteMode() {
return rowLevelDeleteMode;
}
+ @Nonnull
Review Comment:
Actually in the code style, it suggests it's not a necessary behaviour to
mark the field not null. BTW, it's very verbose if we mark every method's
return type not null[1].
[1]
https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#nullability-of-the-mutable-parts
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java:
##########
@@ -647,15 +649,51 @@ private InternalTypeInfo<RowData> getInputTypeInfo() {
}
private int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema
schema) {
- return schema.getPrimaryKey()
- .map(k ->
k.getColumns().stream().mapToInt(sinkRowType::getFieldIndex).toArray())
- .orElse(new int[0]);
+ if (schema.getPrimaryKey().isPresent()) {
+ UniqueConstraint uniqueConstraint = schema.getPrimaryKey().get();
+ int[] primaryKeyIndices = new
int[uniqueConstraint.getColumns().size()];
+ // sinkRowType may not contain full primary keys in case of
row-level update or delete.
+ // in such case, return an empty array
+ for (int i = 0; i < uniqueConstraint.getColumns().size(); i++) {
+ int fieldIndex =
sinkRowType.getFieldIndex(uniqueConstraint.getColumns().get(i));
+ if (fieldIndex == -1) {
+ return new int[0];
+ }
+ primaryKeyIndices[i] = fieldIndex;
+ }
+ return primaryKeyIndices;
+ } else {
+ return new int[0];
+ }
}
private RowType getPhysicalRowType(ResolvedSchema schema) {
+ // row-level modification may only write partial columns
+ if (tableSinkSpec.getSinkAbilities() != null) {
+ for (SinkAbilitySpec sinkAbilitySpec :
tableSinkSpec.getSinkAbilities()) {
+ if (sinkAbilitySpec instanceof RowLevelUpdateSpec) {
+ RowLevelUpdateSpec rowLevelUpdateSpec =
(RowLevelUpdateSpec) sinkAbilitySpec;
+ return getPhysicalRowType(schema,
rowLevelUpdateSpec.getRequireColumnIndices());
+ } else if (sinkAbilitySpec instanceof RowLevelDeleteSpec) {
+ RowLevelDeleteSpec rowLevelDeleteSpec =
(RowLevelDeleteSpec) sinkAbilitySpec;
+ return getPhysicalRowType(
+ schema,
rowLevelDeleteSpec.getRequiredPhysicalColumnIndices());
Review Comment:
Fine. We can modify this if we needed in the future.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java:
##########
@@ -647,15 +649,51 @@ private InternalTypeInfo<RowData> getInputTypeInfo() {
}
private int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema
schema) {
- return schema.getPrimaryKey()
- .map(k ->
k.getColumns().stream().mapToInt(sinkRowType::getFieldIndex).toArray())
- .orElse(new int[0]);
+ if (schema.getPrimaryKey().isPresent()) {
+ UniqueConstraint uniqueConstraint = schema.getPrimaryKey().get();
+ int[] primaryKeyIndices = new
int[uniqueConstraint.getColumns().size()];
+ // sinkRowType may not contain full primary keys in case of
row-level update or delete.
+ // in such case, return an empty array
+ for (int i = 0; i < uniqueConstraint.getColumns().size(); i++) {
+ int fieldIndex =
sinkRowType.getFieldIndex(uniqueConstraint.getColumns().get(i));
+ if (fieldIndex == -1) {
+ return new int[0];
+ }
Review Comment:
I mean why we don't return all available positions here?
```
uniqueConstraint.getColumns().stream()
.mapToInt(sinkRowType::getFieldIndex)
.filter(i -> i == -1)
.toArray();
```
I find the indcies is used as shuffle key, in the update statement we don't
need shuffle?
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java:
##########
@@ -38,27 +38,36 @@
/**
* A sub-class of {@link SinkAbilitySpec} that can not only
serialize/deserialize the row-level
- * delete mode to/from JSON, but also can delete existing data for {@link
- * org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
+ * delete mode & required physical column indices to/from JSON, but also can
delete existing data
+ * for {@link
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeName("RowLevelDelete")
public class RowLevelDeleteSpec implements SinkAbilitySpec {
public static final String FIELD_NAME_ROW_LEVEL_DELETE_MODE =
"rowLevelDeleteMode";
+ public static final String FIELD_NAME_REQUIRED_PHYSICAL_COLUMN_INDICES =
+ "requiredPhysicalColumnIndices";
Review Comment:
oh. I mean it's better to rename to `requiredPhysicalColumn` to align the
name behaviour. It's fine to use `requiredPhysicalColumnIndices`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]