This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 731580b9b [flink][bug] Partition and bucket should be reset when a new
record is set in CdcRecordKeyAndBucketExtractor (#895)
731580b9b is described below
commit 731580b9bbc8e8fe4d92f6b791a92983d6a3d7c1
Author: tsreaper <[email protected]>
AuthorDate: Thu Apr 13 14:55:11 2023 +0800
[flink][bug] Partition and bucket should be reset when a new record is set
in CdcRecordKeyAndBucketExtractor (#895)
---
.../sink/cdc/CdcRecordKeyAndBucketExtractor.java | 4 +
.../cdc/CdcRecordKeyAndBucketExtractorTest.java | 117 +++++++++++++++++++++
.../paimon/flink/sink/cdc/FlinkCdcSinkITCase.java | 14 +--
3 files changed, 129 insertions(+), 6 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
index b1b89f853..cd19e04d8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
@@ -65,6 +65,10 @@ public class CdcRecordKeyAndBucketExtractor implements
KeyAndBucketExtractor<Cdc
@Override
public void setRecord(CdcRecord record) {
this.record = record;
+
+ this.partition = null;
+ this.bucketKey = null;
+ this.bucket = null;
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
new file mode 100644
index 000000000..bdf63ea77
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.flink.sink.RowDataKeyAndBucketExtractor;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.paimon.types.DataTypesTest.assertThat;
+
+/** Tests for {@link CdcRecordKeyAndBucketExtractor}. */
+public class CdcRecordKeyAndBucketExtractorTest {
+
+ private static final RowType ROW_TYPE =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.INT(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"pt1", "pt2", "k1", "v1", "k2", "v2"});
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @Test
+ public void testExtract() throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ TableSchema schema = createTableSchema();
+ RowDataKeyAndBucketExtractor expected = new
RowDataKeyAndBucketExtractor(schema);
+ CdcRecordKeyAndBucketExtractor actual = new
CdcRecordKeyAndBucketExtractor(schema);
+
+ int numTests = random.nextInt(1000) + 1;
+ for (int i = 0; i < numTests; i++) {
+ String pt1 = UUID.randomUUID().toString();
+ int pt2 = random.nextInt();
+ long k1 = random.nextLong();
+ int v1 = random.nextInt();
+ String k2 = UUID.randomUUID().toString();
+ String v2 = UUID.randomUUID().toString();
+
+ GenericRowData rowData =
+ GenericRowData.of(
+ StringData.fromString(pt1),
+ pt2,
+ k1,
+ v1,
+ StringData.fromString(k2),
+ StringData.fromString(v2));
+ expected.setRecord(rowData);
+
+ Map<String, String> fields = new HashMap<>();
+ fields.put("pt1", pt1);
+ fields.put("pt2", String.valueOf(pt2));
+ fields.put("k1", String.valueOf(k1));
+ fields.put("v1", String.valueOf(v1));
+ fields.put("k2", k2);
+ fields.put("v2", v2);
+
+ actual.setRecord(new CdcRecord(RowKind.INSERT, fields));
+ assertThat(actual.partition()).isEqualTo(expected.partition());
+ assertThat(actual.bucket()).isEqualTo(expected.bucket());
+
+ actual.setRecord(new CdcRecord(RowKind.DELETE, fields));
+ assertThat(actual.partition()).isEqualTo(expected.partition());
+ assertThat(actual.bucket()).isEqualTo(expected.bucket());
+ }
+ }
+
+ private TableSchema createTableSchema() throws Exception {
+ return SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), new
Path(tempDir.toString())),
+ new Schema(
+ ROW_TYPE.getFields(),
+ Arrays.asList("pt1", "pt2"),
+ Arrays.asList("pt1", "pt2", "k1", "k2"),
+ new HashMap<>(),
+ ""));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
index eaac7448b..7be475e3c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
@@ -42,6 +42,7 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FailingFileIO;
import org.apache.paimon.utils.TraceableFileIO;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.Test;
@@ -72,11 +73,10 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
ThreadLocalRandom random = ThreadLocalRandom.current();
int numEvents = random.nextInt(1500) + 1;
- int numSchemaChanges = random.nextInt(10) + 1;
+ int numSchemaChanges = Math.min(numEvents / 2, random.nextInt(10) + 1);
int numPartitions = random.nextInt(3) + 1;
int numKeys = random.nextInt(150) + 1;
int numBucket = random.nextInt(5) + 1;
- boolean shuffleByPartitionEnable = random.nextBoolean();
boolean enableFailure = random.nextBoolean();
TestCdcEvent[] events = new TestCdcEvent[numEvents];
@@ -160,11 +160,14 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
new String[] {"pt", "k", "v0"}),
Collections.singletonList("pt"),
Arrays.asList("pt", "k"),
- numBucket,
- shuffleByPartitionEnable);
+ numBucket);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setCheckpointInterval(100);
+ if (!enableFailure) {
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ }
+
TestCdcSourceFunction sourceFunction =
new TestCdcSourceFunction(
events, record ->
Integer.valueOf(record.fields().get("k")));
@@ -218,8 +221,7 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
RowType rowType,
List<String> partitions,
List<String> primaryKeys,
- int numBucket,
- boolean shuffleByPartitionEnable)
+ int numBucket)
throws Exception {
Options conf = new Options();
conf.set(CoreOptions.BUCKET, numBucket);