This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new c1b0153 [hotfix] Rename ambiguous key to primary key
c1b0153 is described below
commit c1b0153a8f93c76e21ccfc872a28126f40fb1c46
Author: Jane Chan <[email protected]>
AuthorDate: Fri Mar 18 10:39:37 2022 +0800
[hotfix] Rename ambiguous key to primary key
This closes #51
---
.../connector/sink/BucketStreamPartitioner.java | 2 +-
.../table/store/connector/sink/StoreSinkWriter.java | 8 ++++----
.../apache/flink/table/store/sink/SinkRecord.java | 12 ++++++------
.../flink/table/store/sink/SinkRecordConverter.java | 21 +++++++++++----------
.../store/kafka/KafkaLogSerializationSchema.java | 20 ++++++++++----------
.../flink/table/store/kafka/KafkaLogTestUtils.java | 6 +++---
6 files changed, 35 insertions(+), 34 deletions(-)
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
index a20253b..f69ecc0 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
@@ -54,7 +54,7 @@ public class BucketStreamPartitioner extends
StreamPartitioner<RowData> {
@Override
public int selectChannel(SerializationDelegate<StreamRecord<RowData>>
record) {
RowData row = record.getInstance().getValue();
- int bucket = recordConverter.bucket(row, recordConverter.key(row));
+ int bucket = recordConverter.bucket(row,
recordConverter.primaryKey(row));
return bucket % numberOfChannels;
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
index 6fea71d..4effdd8 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
@@ -115,18 +115,18 @@ public class StoreSinkWriter<WriterStateT>
switch (record.row().getRowKind()) {
case INSERT:
case UPDATE_AFTER:
- if (record.key().getArity() == 0) {
+ if (record.primaryKey().getArity() == 0) {
writer.write(ValueKind.ADD, record.row(),
GenericRowData.of(1L));
} else {
- writer.write(ValueKind.ADD, record.key(), record.row());
+ writer.write(ValueKind.ADD, record.primaryKey(),
record.row());
}
break;
case UPDATE_BEFORE:
case DELETE:
- if (record.key().getArity() == 0) {
+ if (record.primaryKey().getArity() == 0) {
writer.write(ValueKind.ADD, record.row(),
GenericRowData.of(-1L));
} else {
- writer.write(ValueKind.DELETE, record.key(), record.row());
+ writer.write(ValueKind.DELETE, record.primaryKey(),
record.row());
}
break;
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecord.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecord.java
index 25b08c8..4fcccca 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecord.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecord.java
@@ -31,16 +31,16 @@ public class SinkRecord {
private final int bucket;
- private final BinaryRowData key;
+ private final BinaryRowData primaryKey;
private final RowData row;
- public SinkRecord(BinaryRowData partition, int bucket, BinaryRowData key,
RowData row) {
+ public SinkRecord(BinaryRowData partition, int bucket, BinaryRowData
primaryKey, RowData row) {
checkArgument(partition.getRowKind() == RowKind.INSERT);
- checkArgument(key.getRowKind() == RowKind.INSERT);
+ checkArgument(primaryKey.getRowKind() == RowKind.INSERT);
this.partition = partition;
this.bucket = bucket;
- this.key = key;
+ this.primaryKey = primaryKey;
this.row = row;
}
@@ -52,8 +52,8 @@ public class SinkRecord {
return bucket;
}
- public BinaryRowData key() {
- return key;
+ public BinaryRowData primaryKey() {
+ return primaryKey;
}
public RowData row() {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
index 9454d37..2280418 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
@@ -36,30 +36,31 @@ public class SinkRecordConverter {
private final Projection<RowData, BinaryRowData> partProjection;
- private final Projection<RowData, BinaryRowData> keyProjection;
+ private final Projection<RowData, BinaryRowData> pkProjection;
- public SinkRecordConverter(int numBucket, RowType inputType, int[]
partitions, int[] keys) {
+ public SinkRecordConverter(
+ int numBucket, RowType inputType, int[] partitions, int[]
primaryKeys) {
this.numBucket = numBucket;
this.allProjection =
CodeGenUtils.newProjection(
inputType, IntStream.range(0,
inputType.getFieldCount()).toArray());
this.partProjection = CodeGenUtils.newProjection(inputType,
partitions);
- this.keyProjection = CodeGenUtils.newProjection(inputType, keys);
+ this.pkProjection = CodeGenUtils.newProjection(inputType, primaryKeys);
}
public SinkRecord convert(RowData row) {
BinaryRowData partition = partProjection.apply(row);
- BinaryRowData key = key(row);
- int bucket = bucket(row, key);
- return new SinkRecord(partition, bucket, key, row);
+ BinaryRowData primaryKey = primaryKey(row);
+ int bucket = bucket(row, primaryKey);
+ return new SinkRecord(partition, bucket, primaryKey, row);
}
- public BinaryRowData key(RowData row) {
- return keyProjection.apply(row);
+ public BinaryRowData primaryKey(RowData row) {
+ return pkProjection.apply(row);
}
- public int bucket(RowData row, BinaryRowData key) {
- int hash = key.getArity() == 0 ? hashRow(row) : key.hashCode();
+ public int bucket(RowData row, BinaryRowData primaryKey) {
+ int hash = primaryKey.getArity() == 0 ? hashRow(row) :
primaryKey.hashCode();
return Math.abs(hash % numBucket);
}
diff --git
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
index e9b4081..55fab14 100644
---
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
+++
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
@@ -35,20 +35,20 @@ public class KafkaLogSerializationSchema implements
KafkaRecordSerializationSche
private static final long serialVersionUID = 1L;
private final String topic;
- @Nullable private final SerializationSchema<RowData> keySerializer;
+ @Nullable private final SerializationSchema<RowData> primaryKeySerializer;
private final SerializationSchema<RowData> valueSerializer;
private final LogChangelogMode changelogMode;
public KafkaLogSerializationSchema(
String topic,
- @Nullable SerializationSchema<RowData> keySerializer,
+ @Nullable SerializationSchema<RowData> primaryKeySerializer,
SerializationSchema<RowData> valueSerializer,
LogChangelogMode changelogMode) {
this.topic = topic;
- this.keySerializer = keySerializer;
+ this.primaryKeySerializer = primaryKeySerializer;
this.valueSerializer = valueSerializer;
this.changelogMode = changelogMode;
- if (changelogMode == LogChangelogMode.UPSERT && keySerializer == null)
{
+ if (changelogMode == LogChangelogMode.UPSERT && primaryKeySerializer
== null) {
throw new IllegalArgumentException(
"Can not use upsert changelog mode for non-pk table.");
}
@@ -58,8 +58,8 @@ public class KafkaLogSerializationSchema implements
KafkaRecordSerializationSche
public void open(
SerializationSchema.InitializationContext context,
KafkaSinkContext sinkContext)
throws Exception {
- if (keySerializer != null) {
- keySerializer.open(context);
+ if (primaryKeySerializer != null) {
+ primaryKeySerializer.open(context);
}
valueSerializer.open(context);
}
@@ -69,10 +69,10 @@ public class KafkaLogSerializationSchema implements
KafkaRecordSerializationSche
SinkRecord element, KafkaSinkContext context, Long timestamp) {
RowKind kind = element.row().getRowKind();
- byte[] keyBytes = null;
+ byte[] primaryKeyBytes = null;
byte[] valueBytes = null;
- if (keySerializer != null) {
- keyBytes = keySerializer.serialize(element.key());
+ if (primaryKeySerializer != null) {
+ primaryKeyBytes =
primaryKeySerializer.serialize(element.primaryKey());
if (changelogMode == LogChangelogMode.ALL
|| kind == RowKind.INSERT
|| kind == RowKind.UPDATE_AFTER) {
@@ -81,6 +81,6 @@ public class KafkaLogSerializationSchema implements
KafkaRecordSerializationSche
} else {
valueBytes = valueSerializer.serialize(element.row());
}
- return new ProducerRecord<>(topic, element.bucket(), keyBytes,
valueBytes);
+ return new ProducerRecord<>(topic, element.bucket(), primaryKeyBytes,
valueBytes);
}
}
diff --git
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
index b4b2efc..a375d14 100644
---
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
+++
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
@@ -192,11 +192,11 @@ public class KafkaLogTestUtils {
return createContext(name, type, keys, options);
}
- static SinkRecord testRecord(boolean keyed, int bucket, int key, int
value, RowKind rowKind) {
+ static SinkRecord testRecord(boolean hasPk, int bucket, int pk, int value,
RowKind rowKind) {
return new SinkRecord(
EMPTY_ROW,
bucket,
- keyed ? row(key) : EMPTY_ROW,
- GenericRowData.ofKind(rowKind, key, value));
+ hasPk ? row(pk) : EMPTY_ROW,
+ GenericRowData.ofKind(rowKind, pk, value));
}
}