This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new c1b0153  [hotfix] Rename ambiguous key to primary key
c1b0153 is described below

commit c1b0153a8f93c76e21ccfc872a28126f40fb1c46
Author: Jane Chan <[email protected]>
AuthorDate: Fri Mar 18 10:39:37 2022 +0800

    [hotfix] Rename ambiguous key to primary key
    
    This closes #51
---
 .../connector/sink/BucketStreamPartitioner.java     |  2 +-
 .../table/store/connector/sink/StoreSinkWriter.java |  8 ++++----
 .../apache/flink/table/store/sink/SinkRecord.java   | 12 ++++++------
 .../flink/table/store/sink/SinkRecordConverter.java | 21 +++++++++++----------
 .../store/kafka/KafkaLogSerializationSchema.java    | 20 ++++++++++----------
 .../flink/table/store/kafka/KafkaLogTestUtils.java  |  6 +++---
 6 files changed, 35 insertions(+), 34 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
index a20253b..f69ecc0 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
@@ -54,7 +54,7 @@ public class BucketStreamPartitioner extends 
StreamPartitioner<RowData> {
     @Override
     public int selectChannel(SerializationDelegate<StreamRecord<RowData>> 
record) {
         RowData row = record.getInstance().getValue();
-        int bucket = recordConverter.bucket(row, recordConverter.key(row));
+        int bucket = recordConverter.bucket(row, 
recordConverter.primaryKey(row));
         return bucket % numberOfChannels;
     }
 
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
index 6fea71d..4effdd8 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
@@ -115,18 +115,18 @@ public class StoreSinkWriter<WriterStateT>
         switch (record.row().getRowKind()) {
             case INSERT:
             case UPDATE_AFTER:
-                if (record.key().getArity() == 0) {
+                if (record.primaryKey().getArity() == 0) {
                     writer.write(ValueKind.ADD, record.row(), 
GenericRowData.of(1L));
                 } else {
-                    writer.write(ValueKind.ADD, record.key(), record.row());
+                    writer.write(ValueKind.ADD, record.primaryKey(), 
record.row());
                 }
                 break;
             case UPDATE_BEFORE:
             case DELETE:
-                if (record.key().getArity() == 0) {
+                if (record.primaryKey().getArity() == 0) {
                     writer.write(ValueKind.ADD, record.row(), 
GenericRowData.of(-1L));
                 } else {
-                    writer.write(ValueKind.DELETE, record.key(), record.row());
+                    writer.write(ValueKind.DELETE, record.primaryKey(), 
record.row());
                 }
                 break;
         }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecord.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecord.java
index 25b08c8..4fcccca 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecord.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecord.java
@@ -31,16 +31,16 @@ public class SinkRecord {
 
     private final int bucket;
 
-    private final BinaryRowData key;
+    private final BinaryRowData primaryKey;
 
     private final RowData row;
 
-    public SinkRecord(BinaryRowData partition, int bucket, BinaryRowData key, 
RowData row) {
+    public SinkRecord(BinaryRowData partition, int bucket, BinaryRowData 
primaryKey, RowData row) {
         checkArgument(partition.getRowKind() == RowKind.INSERT);
-        checkArgument(key.getRowKind() == RowKind.INSERT);
+        checkArgument(primaryKey.getRowKind() == RowKind.INSERT);
         this.partition = partition;
         this.bucket = bucket;
-        this.key = key;
+        this.primaryKey = primaryKey;
         this.row = row;
     }
 
@@ -52,8 +52,8 @@ public class SinkRecord {
         return bucket;
     }
 
-    public BinaryRowData key() {
-        return key;
+    public BinaryRowData primaryKey() {
+        return primaryKey;
     }
 
     public RowData row() {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
index 9454d37..2280418 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
@@ -36,30 +36,31 @@ public class SinkRecordConverter {
 
     private final Projection<RowData, BinaryRowData> partProjection;
 
-    private final Projection<RowData, BinaryRowData> keyProjection;
+    private final Projection<RowData, BinaryRowData> pkProjection;
 
-    public SinkRecordConverter(int numBucket, RowType inputType, int[] 
partitions, int[] keys) {
+    public SinkRecordConverter(
+            int numBucket, RowType inputType, int[] partitions, int[] 
primaryKeys) {
         this.numBucket = numBucket;
         this.allProjection =
                 CodeGenUtils.newProjection(
                         inputType, IntStream.range(0, 
inputType.getFieldCount()).toArray());
         this.partProjection = CodeGenUtils.newProjection(inputType, 
partitions);
-        this.keyProjection = CodeGenUtils.newProjection(inputType, keys);
+        this.pkProjection = CodeGenUtils.newProjection(inputType, primaryKeys);
     }
 
     public SinkRecord convert(RowData row) {
         BinaryRowData partition = partProjection.apply(row);
-        BinaryRowData key = key(row);
-        int bucket = bucket(row, key);
-        return new SinkRecord(partition, bucket, key, row);
+        BinaryRowData primaryKey = primaryKey(row);
+        int bucket = bucket(row, primaryKey);
+        return new SinkRecord(partition, bucket, primaryKey, row);
     }
 
-    public BinaryRowData key(RowData row) {
-        return keyProjection.apply(row);
+    public BinaryRowData primaryKey(RowData row) {
+        return pkProjection.apply(row);
     }
 
-    public int bucket(RowData row, BinaryRowData key) {
-        int hash = key.getArity() == 0 ? hashRow(row) : key.hashCode();
+    public int bucket(RowData row, BinaryRowData primaryKey) {
+        int hash = primaryKey.getArity() == 0 ? hashRow(row) : 
primaryKey.hashCode();
         return Math.abs(hash % numBucket);
     }
 
diff --git 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
index e9b4081..55fab14 100644
--- 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
+++ 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
@@ -35,20 +35,20 @@ public class KafkaLogSerializationSchema implements 
KafkaRecordSerializationSche
     private static final long serialVersionUID = 1L;
 
     private final String topic;
-    @Nullable private final SerializationSchema<RowData> keySerializer;
+    @Nullable private final SerializationSchema<RowData> primaryKeySerializer;
     private final SerializationSchema<RowData> valueSerializer;
     private final LogChangelogMode changelogMode;
 
     public KafkaLogSerializationSchema(
             String topic,
-            @Nullable SerializationSchema<RowData> keySerializer,
+            @Nullable SerializationSchema<RowData> primaryKeySerializer,
             SerializationSchema<RowData> valueSerializer,
             LogChangelogMode changelogMode) {
         this.topic = topic;
-        this.keySerializer = keySerializer;
+        this.primaryKeySerializer = primaryKeySerializer;
         this.valueSerializer = valueSerializer;
         this.changelogMode = changelogMode;
-        if (changelogMode == LogChangelogMode.UPSERT && keySerializer == null) 
{
+        if (changelogMode == LogChangelogMode.UPSERT && primaryKeySerializer 
== null) {
             throw new IllegalArgumentException(
                     "Can not use upsert changelog mode for non-pk table.");
         }
@@ -58,8 +58,8 @@ public class KafkaLogSerializationSchema implements 
KafkaRecordSerializationSche
     public void open(
             SerializationSchema.InitializationContext context, 
KafkaSinkContext sinkContext)
             throws Exception {
-        if (keySerializer != null) {
-            keySerializer.open(context);
+        if (primaryKeySerializer != null) {
+            primaryKeySerializer.open(context);
         }
         valueSerializer.open(context);
     }
@@ -69,10 +69,10 @@ public class KafkaLogSerializationSchema implements 
KafkaRecordSerializationSche
             SinkRecord element, KafkaSinkContext context, Long timestamp) {
         RowKind kind = element.row().getRowKind();
 
-        byte[] keyBytes = null;
+        byte[] primaryKeyBytes = null;
         byte[] valueBytes = null;
-        if (keySerializer != null) {
-            keyBytes = keySerializer.serialize(element.key());
+        if (primaryKeySerializer != null) {
+            primaryKeyBytes = 
primaryKeySerializer.serialize(element.primaryKey());
             if (changelogMode == LogChangelogMode.ALL
                     || kind == RowKind.INSERT
                     || kind == RowKind.UPDATE_AFTER) {
@@ -81,6 +81,6 @@ public class KafkaLogSerializationSchema implements 
KafkaRecordSerializationSche
         } else {
             valueBytes = valueSerializer.serialize(element.row());
         }
-        return new ProducerRecord<>(topic, element.bucket(), keyBytes, 
valueBytes);
+        return new ProducerRecord<>(topic, element.bucket(), primaryKeyBytes, 
valueBytes);
     }
 }
diff --git 
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
 
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
index b4b2efc..a375d14 100644
--- 
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
+++ 
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
@@ -192,11 +192,11 @@ public class KafkaLogTestUtils {
         return createContext(name, type, keys, options);
     }
 
-    static SinkRecord testRecord(boolean keyed, int bucket, int key, int 
value, RowKind rowKind) {
+    static SinkRecord testRecord(boolean hasPk, int bucket, int pk, int value, 
RowKind rowKind) {
         return new SinkRecord(
                 EMPTY_ROW,
                 bucket,
-                keyed ? row(key) : EMPTY_ROW,
-                GenericRowData.ofKind(rowKind, key, value));
+                hasPk ? row(pk) : EMPTY_ROW,
+                GenericRowData.ofKind(rowKind, pk, value));
     }
 }

Reply via email to