cshuo commented on a change in pull request #16239:
URL: https://github.com/apache/flink/pull/16239#discussion_r659420284



##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
##########
@@ -175,7 +175,7 @@ class TemporalJoinRewriteWithUniqueKeyRule extends 
RelOptRule(
             leftFieldCnt + rightFields.indexOf(f)))
           .toSeq)
       // select shortest unique key as primary key
-      uniqueKeySetInputRefs
+      upsertKeySetInputRefs

Review comment:
       nit: modify corresponding comments as well.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RankProcessStrategy.java
##########
@@ -135,17 +134,18 @@ public String toString() {
     static List<RankProcessStrategy> analyzeRankProcessStrategies(
             StreamPhysicalRel rank, ImmutableBitSet partitionKey, RelCollation 
orderKey) {
 
-        RelMetadataQuery mq = rank.getCluster().getMetadataQuery();
+        FlinkRelMetadataQuery mq = (FlinkRelMetadataQuery) 
rank.getCluster().getMetadataQuery();
         List<RelFieldCollation> fieldCollations = 
orderKey.getFieldCollations();
         boolean isUpdateStream = !ChangelogPlanUtils.inputInsertOnly(rank);
         RelNode input = rank.getInput(0);
 
         if (isUpdateStream) {
-            Set<ImmutableBitSet> uniqueKeys = mq.getUniqueKeys(input);
-            if (uniqueKeys == null
-                    || uniqueKeys.isEmpty()
+            Set<ImmutableBitSet> upsertKeys =
+                    mq.getUpsertKeysInKeyGroupRange(input, 
partitionKey.toArray());
+            if (upsertKeys == null
+                    || upsertKeys.isEmpty()
                     // unique key should contains partition key
-                    || uniqueKeys.stream().noneMatch(k -> 
k.contains(partitionKey))) {
+                    || upsertKeys.stream().noneMatch(k -> 
k.contains(partitionKey))) {

Review comment:
       nit: modify corresponding comments as well.

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterialize.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * A operator that maintains the records corresponding to the upsert keys in 
the state, it receives
+ * the upstream changelog records and generate an upsert view for the 
downstream.
+ *
+ * <ul>
+ *   <li>For insert record, append the state and collect current record.
+ *   <li>For delete record, delete in the state, collect delete record when 
the state is empty.
+ *   <li>For delete record, delete in the state, collect the last one when the 
state is not empty.
+ * </ul>
+ */
+public class SinkUpsertMaterialize extends TableStreamOperator<RowData>

Review comment:
       SinkUpsertMaterialize -> SinkUpsertMaterializer

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##########
@@ -149,12 +154,40 @@ public DynamicTableSinkSpec getTableSinkSpec() {
 
             // apply keyBy partition transformation if needed
             inputTransform =
-                    applyKeyByForDifferentParallelism(
+                    applyKeyByIfNeeded(
                             physicalRowType,
                             schema.getPrimaryKey().orElse(null),
                             inputTransform,
                             inputParallelism,
-                            sinkParallelism);
+                            sinkParallelism,
+                            upsertMaterialize);
+
+            if (upsertMaterialize) {
+                GeneratedRecordEqualiser equaliser =
+                        new EqualiserCodeGenerator(physicalRowType)
+                                
.generateRecordEqualiser("SinkMaterializeEqualiser");
+                SinkUpsertMaterialize operator =
+                        new SinkUpsertMaterialize(
+                                StateConfigUtil.createTtlConfig(
+                                        
tableConfig.getIdleStateRetention().toMillis()),
+                                
InternalTypeInfo.of(physicalRowType).toSerializer(),
+                                equaliser);
+                OneInputTransformation<RowData, RowData> materializeTransform =
+                        new OneInputTransformation<>(
+                                inputTransform,
+                                "SinkMaterialize",

Review comment:
       SinkMaterialize -> SinkMaterializer?

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterialize.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * A operator that maintains the records corresponding to the upsert keys in 
the state, it receives
+ * the upstream changelog records and generate an upsert view for the 
downstream.
+ *
+ * <ul>
+ *   <li>For insert record, append the state and collect current record.
+ *   <li>For delete record, delete in the state, collect delete record when 
the state is empty.
+ *   <li>For delete record, delete in the state, collect the last one when the 
state is not empty.
+ * </ul>
+ */
+public class SinkUpsertMaterialize extends TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData> {

Review comment:
       add operator UT as well

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
##########
@@ -161,12 +161,12 @@ class TemporalJoinRewriteWithUniqueKeyRule extends 
RelOptRule(
     val rightFields = snapshot.getRowType.getFieldList
     val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(snapshot.getCluster.getMetadataQuery)
 
-    val uniqueKeySet = fmq.getUniqueKeys(snapshot.getInput())
+    val upsertKeySet = fmq.getUpsertKeys(snapshot.getInput())

Review comment:
       Maybe it's feasible to add a test case by adding a rowtime Deduplicate 
before Snapshot to change upsert keys.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to