This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 397b0f910 [FLINK-37452][pipeline-connector/paimon] Support writing to 
Paimon append-only table
397b0f910 is described below

commit 397b0f910b8d79071d356879a27ec95ebcc7b829
Author: Kunni <[email protected]>
AuthorDate: Wed Mar 12 21:30:16 2025 +0800

    [FLINK-37452][pipeline-connector/paimon] Support writing to Paimon 
append-only table
    
    This closes  #3945.
---
 .../connectors/paimon/sink/PaimonHashFunction.java |  26 ++-
 .../paimon/sink/PaimonHashFunctionTest.java        |  55 +++++++
 .../paimon/sink/PaimonMetadataApplierTest.java     |  43 +++++
 .../paimon/sink/v2/PaimonSinkITCase.java           |  53 +++++-
 .../cdc/pipeline/tests/MySqlToPaimonE2eITCase.java | 179 +++++++++++++++++++++
 5 files changed, 347 insertions(+), 9 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java
index b01ad39f2..405a07c39 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java
@@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.function.HashFunction;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonWriterHelper;
 
+import org.apache.paimon.AppendOnlyFileStore;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.GenericRow;
@@ -35,6 +36,7 @@ import org.apache.paimon.table.FileStoreTable;
 import java.io.Serializable;
 import java.time.ZoneId;
 import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 
 /**
  * A {@link HashFunction} implementation for {@link PaimonDataSink}. Shuffle 
{@link DataChangeEvent}
@@ -48,8 +50,11 @@ public class PaimonHashFunction implements 
HashFunction<DataChangeEvent>, Serial
 
     private final RowAssignerChannelComputer channelComputer;
 
+    private final int parallelism;
+
     public PaimonHashFunction(
             Options options, TableId tableId, Schema schema, ZoneId zoneId, 
int parallelism) {
+        this.parallelism = parallelism;
         Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options);
         FileStoreTable table;
         try {
@@ -57,14 +62,25 @@ public class PaimonHashFunction implements 
HashFunction<DataChangeEvent>, Serial
         } catch (Catalog.TableNotExistException e) {
             throw new RuntimeException(e);
         }
-        this.fieldGetters = PaimonWriterHelper.createFieldGetters(schema, 
zoneId);
-        channelComputer = new RowAssignerChannelComputer(table.schema(), 
parallelism);
-        channelComputer.setup(parallelism);
+        if (table instanceof AppendOnlyFileStore) {
+            this.fieldGetters = null;
+            channelComputer = null;
+        } else {
+            this.fieldGetters = PaimonWriterHelper.createFieldGetters(schema, 
zoneId);
+            channelComputer = new RowAssignerChannelComputer(table.schema(), 
parallelism);
+            channelComputer.setup(parallelism);
+        }
     }
 
     @Override
     public int hashcode(DataChangeEvent event) {
-        GenericRow genericRow = 
PaimonWriterHelper.convertEventToGenericRow(event, fieldGetters);
-        return channelComputer.channel(genericRow);
+        if (channelComputer != null) {
+            GenericRow genericRow =
+                    PaimonWriterHelper.convertEventToGenericRow(event, 
fieldGetters);
+            return channelComputer.channel(genericRow);
+        } else {
+            // Avoid sending all events to the same subtask when table has no 
primary key.
+            return ThreadLocalRandom.current().nextInt(parallelism);
+        }
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
index 86ff2f7b2..5726448cb 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
@@ -31,6 +31,7 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.flink.FlinkCatalogFactory;
 import org.apache.paimon.options.Options;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -71,6 +72,60 @@ public class PaimonHashFunctionTest {
         catalog.close();
     }
 
+    @Test
+    public void testHashCodeForAppendOnlyTable() {
+        TableId tableId = TableId.tableId(TEST_DATABASE, "test_table");
+        Map<String, String> tableOptions = new HashMap<>();
+        MetadataApplier metadataApplier =
+                new PaimonMetadataApplier(catalogOptions, tableOptions, new 
HashMap<>());
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("col1", DataTypes.STRING().notNull())
+                        .physicalColumn("col2", DataTypes.STRING())
+                        .physicalColumn("pt", DataTypes.STRING())
+                        .build();
+        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
schema);
+        metadataApplier.applySchemaChange(createTableEvent);
+        BinaryRecordDataGenerator generator =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+        PaimonHashFunction hashFunction =
+                new PaimonHashFunction(catalogOptions, tableId, schema, 
ZoneId.systemDefault(), 4);
+        DataChangeEvent dataChangeEvent1 =
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("1"),
+                                    BinaryStringData.fromString("1"),
+                                    BinaryStringData.fromString("2024")
+                                }));
+        int key1 = hashFunction.hashcode(dataChangeEvent1);
+
+        DataChangeEvent dataChangeEvent2 =
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("2"),
+                                    BinaryStringData.fromString("1"),
+                                    BinaryStringData.fromString("2024")
+                                }));
+        int key2 = hashFunction.hashcode(dataChangeEvent2);
+
+        DataChangeEvent dataChangeEvent3 =
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("3"),
+                                    BinaryStringData.fromString("1"),
+                                    BinaryStringData.fromString("2024")
+                                }));
+        int key3 = hashFunction.hashcode(dataChangeEvent3);
+        Assertions.assertTrue(
+                key1 >= 0 && key1 < 4 && key2 >= 0 && key2 < 4 && key3 >= 0 && 
key3 < 4);
+    }
+
     @Test
     public void testHashCodeForFixedBucketTable() {
         TableId tableId = TableId.tableId(TEST_DATABASE, "test_table");
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
index 1bf6ab3d6..0d1678d59 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
@@ -242,6 +242,49 @@ public class PaimonMetadataApplierTest {
                 
catalog.getTable(Identifier.fromString("test.table_with_upper_case")).rowType());
     }
 
+    @ParameterizedTest
+    @ValueSource(strings = {"filesystem", "hive"})
+    public void testCreateTableWithoutPrimaryKey(String metastore)
+            throws Catalog.TableNotExistException, 
Catalog.DatabaseNotEmptyException,
+                    Catalog.DatabaseNotExistException, SchemaEvolveException {
+        initialize(metastore);
+        Map<String, String> tableOptions = new HashMap<>();
+        tableOptions.put("bucket", "-1");
+        MetadataApplier metadataApplier =
+                new PaimonMetadataApplier(catalogOptions, tableOptions, new 
HashMap<>());
+        CreateTableEvent createTableEvent =
+                new CreateTableEvent(
+                        TableId.parse("test.table1"),
+                        org.apache.flink.cdc.common.schema.Schema.newBuilder()
+                                .physicalColumn(
+                                        "col1",
+                                        
org.apache.flink.cdc.common.types.DataTypes.STRING()
+                                                .notNull())
+                                .physicalColumn(
+                                        "col2",
+                                        
org.apache.flink.cdc.common.types.DataTypes.STRING())
+                                .physicalColumn(
+                                        "col3",
+                                        
org.apache.flink.cdc.common.types.DataTypes.STRING())
+                                .physicalColumn(
+                                        "col4",
+                                        
org.apache.flink.cdc.common.types.DataTypes.STRING())
+                                .build());
+        metadataApplier.applySchemaChange(createTableEvent);
+        Table table = catalog.getTable(Identifier.fromString("test.table1"));
+        RowType tableSchema =
+                new RowType(
+                        Arrays.asList(
+                                new DataField(0, "col1", 
DataTypes.STRING().notNull()),
+                                new DataField(1, "col2", DataTypes.STRING()),
+                                new DataField(2, "col3", DataTypes.STRING()),
+                                new DataField(3, "col4", DataTypes.STRING())));
+        Assertions.assertEquals(tableSchema, table.rowType());
+        Assertions.assertEquals(new ArrayList<>(), table.primaryKeys());
+        Assertions.assertEquals(new ArrayList<>(), table.partitionKeys());
+        Assertions.assertEquals("-1", table.options().get("bucket"));
+    }
+
     @ParameterizedTest
     @ValueSource(strings = {"filesystem", "hive"})
     public void testCreateTableWithOptions(String metastore)
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
index adb43483d..08b1774e5 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
@@ -147,14 +147,24 @@ public class PaimonSinkITCase {
     }
 
     private List<Event> createTestEvents(boolean enableDeleteVectors) throws 
SchemaEvolveException {
+        return createTestEvents(enableDeleteVectors, false, true);
+    }
+
+    private List<Event> createTestEvents(
+            boolean enableDeleteVectors, boolean appendOnly, boolean 
enabledBucketKey)
+            throws SchemaEvolveException {
         List<Event> testEvents = new ArrayList<>();
+        Schema.Builder builder = Schema.newBuilder();
+        if (!appendOnly) {
+            builder.primaryKey("col1");
+        } else if (enabledBucketKey) {
+            builder.option("bucket-key", "col1");
+            builder.option("bucket", "5");
+        }
         // create table
         Schema schema =
-                Schema.newBuilder()
-                        .physicalColumn("col1", STRING())
+                builder.physicalColumn("col1", STRING())
                         .physicalColumn("col2", STRING())
-                        .primaryKey("col1")
-                        .option("bucket", "1")
                         .option("deletion-vectors.enabled", 
String.valueOf(enableDeleteVectors))
                         .build();
         CreateTableEvent createTableEvent = new CreateTableEvent(table1, 
schema);
@@ -230,6 +240,41 @@ public class PaimonSinkITCase {
         }
     }
 
+    @ParameterizedTest
+    @CsvSource({"filesystem, true", "hive, true", "filesystem, false", "hive, 
false"})
+    public void testSinkWithDataChangeForAppendOnlyTable(String metastore, 
boolean enabledBucketKey)
+            throws IOException, InterruptedException, 
Catalog.DatabaseNotEmptyException,
+                    Catalog.DatabaseNotExistException, SchemaEvolveException {
+        initialize(metastore);
+        PaimonSink<Event> paimonSink =
+                new PaimonSink<>(
+                        catalogOptions, new 
PaimonRecordEventSerializer(ZoneId.systemDefault()));
+        PaimonWriter<Event> writer = paimonSink.createWriter(new 
MockInitContext());
+        Committer<MultiTableCommittable> committer = 
paimonSink.createCommitter();
+
+        // insert
+        writeAndCommit(
+                writer,
+                committer,
+                createTestEvents(false, true, enabledBucketKey).toArray(new 
Event[0]));
+        Assertions.assertThat(fetchResults(table1))
+                .containsExactlyInAnyOrder(
+                        Row.ofKind(RowKind.INSERT, "1", "1"), 
Row.ofKind(RowKind.INSERT, "2", "2"));
+
+        // Insert
+        writeAndCommit(
+                writer,
+                committer,
+                generateInsert(
+                        table1, Arrays.asList(Tuple2.of(STRING(), "3"), 
Tuple2.of(STRING(), "3"))));
+
+        Assertions.assertThat(fetchResults(table1))
+                .containsExactlyInAnyOrder(
+                        Row.ofKind(RowKind.INSERT, "1", "1"),
+                        Row.ofKind(RowKind.INSERT, "2", "2"),
+                        Row.ofKind(RowKind.INSERT, "3", "3"));
+    }
+
     @ParameterizedTest
     @CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, 
false"})
     public void testSinkWithSchemaChange(String metastore, boolean 
enableDeleteVector)
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java
index b569aed36..d9398d749 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java
@@ -212,6 +212,185 @@ public class MySqlToPaimonE2eITCase extends 
PipelineTestEnvironment {
         validateSinkResult(warehouse, database, "products", 
recordsInSnapshotPhase);
     }
 
+    @Test
+    public void testSinkToAppendOnlyTable() throws Exception {
+        String warehouse = sharedVolume.toString() + "/" + "paimon_" + 
UUID.randomUUID();
+        String database = inventoryDatabase.getDatabaseName();
+        String pipelineJob =
+                String.format(
+                        "source:\n"
+                                + "  type: mysql\n"
+                                + "  hostname: mysql\n"
+                                + "  port: 3306\n"
+                                + "  username: %s\n"
+                                + "  password: %s\n"
+                                + "  tables: %s.appendOnlySource\n"
+                                + "  server-id: 5400-5404\n"
+                                + "  server-time-zone: UTC\n"
+                                + "  
scan.incremental.snapshot.chunk.key-column: %s.appendOnlySource:id\n"
+                                + "\n"
+                                + "sink:\n"
+                                + "  type: paimon\n"
+                                + "  catalog.properties.warehouse: %s\n"
+                                + "  catalog.properties.metastore: 
filesystem\n"
+                                + "  catalog.properties.cache-enabled: false\n"
+                                + "\n"
+                                + "pipeline:\n"
+                                + "  schema.change.behavior: evolve\n"
+                                + "  parallelism: 4",
+                        MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, database, 
database, warehouse);
+        Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+        Path paimonCdcConnector = 
TestUtils.getResource("paimon-cdc-pipeline-connector.jar");
+        Path hadoopJar = TestUtils.getResource("flink-shade-hadoop.jar");
+        Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+        String mysqlJdbcUrl =
+                String.format(
+                        "jdbc:mysql://%s:%s/%s",
+                        MYSQL.getHost(), MYSQL.getDatabasePort(), database);
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                mysqlJdbcUrl, MYSQL_TEST_USER, 
MYSQL_TEST_PASSWORD);
+                Statement stat = conn.createStatement()) {
+            stat.execute(
+                    "CREATE TABLE appendOnlySource (\n"
+                            + "  id INTEGER NOT NULL,\n"
+                            + "  name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
+                            + "  description VARCHAR(512),\n"
+                            + "  weight FLOAT,\n"
+                            + "  enum_c enum('red', 'white') default 'red',\n"
+                            + "  json_c JSON,\n"
+                            + "  point_c POINT)");
+            stat.execute(
+                    "INSERT INTO appendOnlySource \n"
+                            + "VALUES (1,\"One\",   \"Alice\",   3.202, 'red', 
'{\"key1\": \"value1\"}', null),\n"
+                            + "       (2,\"Two\",   \"Bob\",     1.703, 
'white', '{\"key2\": \"value2\"}', null),\n"
+                            + "       (3,\"Three\", \"Cecily\",  4.105, 'red', 
'{\"key3\": \"value3\"}', null),\n"
+                            + "       (4,\"Four\",  \"Derrida\", 1.857, 
'white', '{\"key4\": \"value4\"}', null),\n"
+                            + "       (5,\"Five\",  \"Evelyn\",  5.211, 'red', 
'{\"K\": \"V\", \"k\": \"v\"}', null),\n"
+                            + "       (6,\"Six\",   \"Ferris\",  9.813, null, 
null, null),\n"
+                            + "       (7,\"Seven\", \"Grace\",   2.117, null, 
null, null),\n"
+                            + "       (8,\"Eight\", \"Hesse\",   6.819, null, 
null, null),\n"
+                            + "       (9,\"Nine\",  \"IINA\",    5.223, null, 
null, null)");
+        } catch (SQLException e) {
+            LOG.error("Create table for CDC failed.", e);
+            throw e;
+        }
+        submitPipelineJob(pipelineJob, mysqlCdcJar, paimonCdcConnector, 
mysqlDriverJar, hadoopJar);
+        waitUntilJobRunning(Duration.ofSeconds(30));
+        LOG.info("Pipeline job is running");
+        validateSinkResult(
+                warehouse,
+                database,
+                "appendOnlySource",
+                Arrays.asList(
+                        "1, One, Alice, 3.202, red, {\"key1\": \"value1\"}, 
null",
+                        "2, Two, Bob, 1.703, white, {\"key2\": \"value2\"}, 
null",
+                        "3, Three, Cecily, 4.105, red, {\"key3\": \"value3\"}, 
null",
+                        "4, Four, Derrida, 1.857, white, {\"key4\": 
\"value4\"}, null",
+                        "5, Five, Evelyn, 5.211, red, {\"K\": \"V\", \"k\": 
\"v\"}, null",
+                        "6, Six, Ferris, 9.813, null, null, null",
+                        "7, Seven, Grace, 2.117, null, null, null",
+                        "8, Eight, Hesse, 6.819, null, null, null",
+                        "9, Nine, IINA, 5.223, null, null, null"));
+
+        LOG.info("Begin incremental reading stage.");
+        // generate binlogs
+        List<String> recordsInIncrementalPhase;
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                mysqlJdbcUrl, MYSQL_TEST_USER, 
MYSQL_TEST_PASSWORD);
+                Statement stat = conn.createStatement()) {
+
+            stat.execute(
+                    "INSERT INTO appendOnlySource VALUES 
(10,'Ten','Jukebox',0.2, null, null, null);");
+            stat.execute(
+                    "INSERT INTO appendOnlySource VALUES 
(11,'Eleven','Kryo',5.18, null, null, null);");
+            stat.execute(
+                    "INSERT INTO appendOnlySource VALUES (12,'Twelve', 'Lily', 
2.14, null, null, null);");
+            recordsInIncrementalPhase = 
createChangesAndValidateForAppendOnlyTable(stat);
+        } catch (SQLException e) {
+            LOG.error("Update table for CDC failed.", e);
+            throw e;
+        }
+        List<String> recordsInSnapshotPhase =
+                new ArrayList<>(
+                        Arrays.asList(
+                                "1, One, Alice, 3.202, red, {\"key1\": 
\"value1\"}, null, null, null, null, null, null, null, null, null, null, null",
+                                "2, Two, Bob, 1.703, white, {\"key2\": 
\"value2\"}, null, null, null, null, null, null, null, null, null, null, null",
+                                "3, Three, Cecily, 4.105, red, {\"key3\": 
\"value3\"}, null, null, null, null, null, null, null, null, null, null, null",
+                                "4, Four, Derrida, 1.857, white, {\"key4\": 
\"value4\"}, null, null, null, null, null, null, null, null, null, null, null",
+                                "5, Five, Evelyn, 5.211, red, {\"K\": \"V\", 
\"k\": \"v\"}, null, null, null, null, null, null, null, null, null, null, 
null",
+                                "6, Six, Ferris, 9.813, null, null, null, 
null, null, null, null, null, null, null, null, null, null",
+                                "7, Seven, Grace, 2.117, null, null, null, 
null, null, null, null, null, null, null, null, null, null",
+                                "8, Eight, Hesse, 6.819, null, null, null, 
null, null, null, null, null, null, null, null, null, null",
+                                "9, Nine, IINA, 5.223, null, null, null, null, 
null, null, null, null, null, null, null, null, null",
+                                "10, Ten, Jukebox, 0.2, null, null, null, 
null, null, null, null, null, null, null, null, null, null",
+                                "11, Eleven, Kryo, 5.18, null, null, null, 
null, null, null, null, null, null, null, null, null, null",
+                                "12, Twelve, Lily, 2.14, null, null, null, 
null, null, null, null, null, null, null, null, null, null"));
+        recordsInSnapshotPhase.addAll(recordsInIncrementalPhase);
+        validateSinkResult(warehouse, database, "appendOnlySource", 
recordsInSnapshotPhase);
+    }
+
+    /**
+     * Basic Schema: id INTEGER NOT NULL, name VARCHAR(255) NOT NULL, 
description VARCHAR(512),
+     * weight FLOAT, enum_c enum('red', 'white'), json_c JSON.
+     */
+    private List<String> createChangesAndValidateForAppendOnlyTable(Statement 
stat)
+            throws SQLException {
+        List<String> result = new ArrayList<>();
+        StringBuilder sqlFields = new StringBuilder();
+        int id = 13;
+
+        // Add Column.
+        for (int addColumnRepeat = 0; addColumnRepeat < 10; addColumnRepeat++) 
{
+            stat.execute(
+                    String.format(
+                            "ALTER TABLE appendOnlySource ADD COLUMN 
point_c_%s VARCHAR(10);",
+                            addColumnRepeat));
+            sqlFields.append(", '1'");
+            StringBuilder resultFields = new StringBuilder();
+            for (int j = 0; j < 10; j++) {
+                if (j <= addColumnRepeat) {
+                    resultFields.append(", 1");
+                } else {
+                    resultFields.append(", null");
+                }
+            }
+            for (int j = 0; j < 1000; j++) {
+                stat.addBatch(
+                        String.format(
+                                "INSERT INTO appendOnlySource VALUES 
(%s,'finally', null, 2.14, null, null, null %s);",
+                                id, sqlFields));
+                result.add(
+                        String.format(
+                                "%s, finally, null, 2.14, null, null, null%s", 
id, resultFields));
+                id++;
+            }
+            stat.executeBatch();
+        }
+
+        // Modify Column type.
+        for (int modifyColumnRepeat = 0; modifyColumnRepeat < 10; 
modifyColumnRepeat++) {
+            for (int j = 0; j < 1000; j++) {
+                stat.addBatch(
+                        String.format(
+                                "INSERT INTO appendOnlySource VALUES 
(%s,'finally', null, 2.14, null, null, null %s);",
+                                id, sqlFields));
+                result.add(
+                        String.format(
+                                "%s, finally, null, 2.14, null, null, null%s",
+                                id, ", 1, 1, 1, 1, 1, 1, 1, 1, 1, 1"));
+                id++;
+            }
+            stat.executeBatch();
+            stat.execute(
+                    String.format(
+                            "ALTER TABLE appendOnlySource MODIFY point_c_0 
VARCHAR(%s);",
+                            10 + modifyColumnRepeat));
+        }
+        return result;
+    }
+
     /**
      * Basic Schema: id INTEGER NOT NULL, name VARCHAR(255) NOT NULL, 
description VARCHAR(512),
      * weight FLOAT, enum_c enum('red', 'white'), json_c JSON.

Reply via email to