This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 397b0f910 [FLINK-37452][pipeline-connector/paimon] Support writing to
Paimon append-only table
397b0f910 is described below
commit 397b0f910b8d79071d356879a27ec95ebcc7b829
Author: Kunni <[email protected]>
AuthorDate: Wed Mar 12 21:30:16 2025 +0800
[FLINK-37452][pipeline-connector/paimon] Support writing to Paimon
append-only table
This closes #3945.
---
.../connectors/paimon/sink/PaimonHashFunction.java | 26 ++-
.../paimon/sink/PaimonHashFunctionTest.java | 55 +++++++
.../paimon/sink/PaimonMetadataApplierTest.java | 43 +++++
.../paimon/sink/v2/PaimonSinkITCase.java | 53 +++++-
.../cdc/pipeline/tests/MySqlToPaimonE2eITCase.java | 179 +++++++++++++++++++++
5 files changed, 347 insertions(+), 9 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java
index b01ad39f2..405a07c39 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java
@@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.function.HashFunction;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonWriterHelper;
+import org.apache.paimon.AppendOnlyFileStore;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
@@ -35,6 +36,7 @@ import org.apache.paimon.table.FileStoreTable;
import java.io.Serializable;
import java.time.ZoneId;
import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
/**
* A {@link HashFunction} implementation for {@link PaimonDataSink}. Shuffle
{@link DataChangeEvent}
@@ -48,8 +50,11 @@ public class PaimonHashFunction implements
HashFunction<DataChangeEvent>, Serial
private final RowAssignerChannelComputer channelComputer;
+ private final int parallelism;
+
public PaimonHashFunction(
Options options, TableId tableId, Schema schema, ZoneId zoneId,
int parallelism) {
+ this.parallelism = parallelism;
Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options);
FileStoreTable table;
try {
@@ -57,14 +62,25 @@ public class PaimonHashFunction implements
HashFunction<DataChangeEvent>, Serial
} catch (Catalog.TableNotExistException e) {
throw new RuntimeException(e);
}
- this.fieldGetters = PaimonWriterHelper.createFieldGetters(schema,
zoneId);
- channelComputer = new RowAssignerChannelComputer(table.schema(),
parallelism);
- channelComputer.setup(parallelism);
+ if (table instanceof AppendOnlyFileStore) {
+ this.fieldGetters = null;
+ channelComputer = null;
+ } else {
+ this.fieldGetters = PaimonWriterHelper.createFieldGetters(schema,
zoneId);
+ channelComputer = new RowAssignerChannelComputer(table.schema(),
parallelism);
+ channelComputer.setup(parallelism);
+ }
}
@Override
public int hashcode(DataChangeEvent event) {
- GenericRow genericRow =
PaimonWriterHelper.convertEventToGenericRow(event, fieldGetters);
- return channelComputer.channel(genericRow);
+ if (channelComputer != null) {
+ GenericRow genericRow =
+ PaimonWriterHelper.convertEventToGenericRow(event,
fieldGetters);
+ return channelComputer.channel(genericRow);
+ } else {
+ // Avoid sending all events to the same subtask when table has no
primary key.
+ return ThreadLocalRandom.current().nextInt(parallelism);
+ }
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
index 86ff2f7b2..5726448cb 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
@@ -31,6 +31,7 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.options.Options;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -71,6 +72,60 @@ public class PaimonHashFunctionTest {
catalog.close();
}
+ @Test
+ public void testHashCodeForAppendOnlyTable() {
+ TableId tableId = TableId.tableId(TEST_DATABASE, "test_table");
+ Map<String, String> tableOptions = new HashMap<>();
+ MetadataApplier metadataApplier =
+ new PaimonMetadataApplier(catalogOptions, tableOptions, new
HashMap<>());
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING().notNull())
+ .physicalColumn("col2", DataTypes.STRING())
+ .physicalColumn("pt", DataTypes.STRING())
+ .build();
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
schema);
+ metadataApplier.applySchemaChange(createTableEvent);
+ BinaryRecordDataGenerator generator =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+ PaimonHashFunction hashFunction =
+ new PaimonHashFunction(catalogOptions, tableId, schema,
ZoneId.systemDefault(), 4);
+ DataChangeEvent dataChangeEvent1 =
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("1"),
+ BinaryStringData.fromString("1"),
+ BinaryStringData.fromString("2024")
+ }));
+ int key1 = hashFunction.hashcode(dataChangeEvent1);
+
+ DataChangeEvent dataChangeEvent2 =
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("2"),
+ BinaryStringData.fromString("1"),
+ BinaryStringData.fromString("2024")
+ }));
+ int key2 = hashFunction.hashcode(dataChangeEvent2);
+
+ DataChangeEvent dataChangeEvent3 =
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("3"),
+ BinaryStringData.fromString("1"),
+ BinaryStringData.fromString("2024")
+ }));
+ int key3 = hashFunction.hashcode(dataChangeEvent3);
+ Assertions.assertTrue(
+ key1 >= 0 && key1 < 4 && key2 >= 0 && key2 < 4 && key3 >= 0 &&
key3 < 4);
+ }
+
@Test
public void testHashCodeForFixedBucketTable() {
TableId tableId = TableId.tableId(TEST_DATABASE, "test_table");
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
index 1bf6ab3d6..0d1678d59 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
@@ -242,6 +242,49 @@ public class PaimonMetadataApplierTest {
catalog.getTable(Identifier.fromString("test.table_with_upper_case")).rowType());
}
+ @ParameterizedTest
+ @ValueSource(strings = {"filesystem", "hive"})
+ public void testCreateTableWithoutPrimaryKey(String metastore)
+ throws Catalog.TableNotExistException,
Catalog.DatabaseNotEmptyException,
+ Catalog.DatabaseNotExistException, SchemaEvolveException {
+ initialize(metastore);
+ Map<String, String> tableOptions = new HashMap<>();
+ tableOptions.put("bucket", "-1");
+ MetadataApplier metadataApplier =
+ new PaimonMetadataApplier(catalogOptions, tableOptions, new
HashMap<>());
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ TableId.parse("test.table1"),
+ org.apache.flink.cdc.common.schema.Schema.newBuilder()
+ .physicalColumn(
+ "col1",
+
org.apache.flink.cdc.common.types.DataTypes.STRING()
+ .notNull())
+ .physicalColumn(
+ "col2",
+
org.apache.flink.cdc.common.types.DataTypes.STRING())
+ .physicalColumn(
+ "col3",
+
org.apache.flink.cdc.common.types.DataTypes.STRING())
+ .physicalColumn(
+ "col4",
+
org.apache.flink.cdc.common.types.DataTypes.STRING())
+ .build());
+ metadataApplier.applySchemaChange(createTableEvent);
+ Table table = catalog.getTable(Identifier.fromString("test.table1"));
+ RowType tableSchema =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "col1",
DataTypes.STRING().notNull()),
+ new DataField(1, "col2", DataTypes.STRING()),
+ new DataField(2, "col3", DataTypes.STRING()),
+ new DataField(3, "col4", DataTypes.STRING())));
+ Assertions.assertEquals(tableSchema, table.rowType());
+ Assertions.assertEquals(new ArrayList<>(), table.primaryKeys());
+ Assertions.assertEquals(new ArrayList<>(), table.partitionKeys());
+ Assertions.assertEquals("-1", table.options().get("bucket"));
+ }
+
@ParameterizedTest
@ValueSource(strings = {"filesystem", "hive"})
public void testCreateTableWithOptions(String metastore)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
index adb43483d..08b1774e5 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
@@ -147,14 +147,24 @@ public class PaimonSinkITCase {
}
private List<Event> createTestEvents(boolean enableDeleteVectors) throws
SchemaEvolveException {
+ return createTestEvents(enableDeleteVectors, false, true);
+ }
+
+ private List<Event> createTestEvents(
+ boolean enableDeleteVectors, boolean appendOnly, boolean
enabledBucketKey)
+ throws SchemaEvolveException {
List<Event> testEvents = new ArrayList<>();
+ Schema.Builder builder = Schema.newBuilder();
+ if (!appendOnly) {
+ builder.primaryKey("col1");
+ } else if (enabledBucketKey) {
+ builder.option("bucket-key", "col1");
+ builder.option("bucket", "5");
+ }
// create table
Schema schema =
- Schema.newBuilder()
- .physicalColumn("col1", STRING())
+ builder.physicalColumn("col1", STRING())
.physicalColumn("col2", STRING())
- .primaryKey("col1")
- .option("bucket", "1")
.option("deletion-vectors.enabled",
String.valueOf(enableDeleteVectors))
.build();
CreateTableEvent createTableEvent = new CreateTableEvent(table1,
schema);
@@ -230,6 +240,41 @@ public class PaimonSinkITCase {
}
}
+ @ParameterizedTest
+ @CsvSource({"filesystem, true", "hive, true", "filesystem, false", "hive,
false"})
+ public void testSinkWithDataChangeForAppendOnlyTable(String metastore,
boolean enabledBucketKey)
+ throws IOException, InterruptedException,
Catalog.DatabaseNotEmptyException,
+ Catalog.DatabaseNotExistException, SchemaEvolveException {
+ initialize(metastore);
+ PaimonSink<Event> paimonSink =
+ new PaimonSink<>(
+ catalogOptions, new
PaimonRecordEventSerializer(ZoneId.systemDefault()));
+ PaimonWriter<Event> writer = paimonSink.createWriter(new
MockInitContext());
+ Committer<MultiTableCommittable> committer =
paimonSink.createCommitter();
+
+ // insert
+ writeAndCommit(
+ writer,
+ committer,
+ createTestEvents(false, true, enabledBucketKey).toArray(new
Event[0]));
+ Assertions.assertThat(fetchResults(table1))
+ .containsExactlyInAnyOrder(
+ Row.ofKind(RowKind.INSERT, "1", "1"),
Row.ofKind(RowKind.INSERT, "2", "2"));
+
+ // Insert
+ writeAndCommit(
+ writer,
+ committer,
+ generateInsert(
+ table1, Arrays.asList(Tuple2.of(STRING(), "3"),
Tuple2.of(STRING(), "3"))));
+
+ Assertions.assertThat(fetchResults(table1))
+ .containsExactlyInAnyOrder(
+ Row.ofKind(RowKind.INSERT, "1", "1"),
+ Row.ofKind(RowKind.INSERT, "2", "2"),
+ Row.ofKind(RowKind.INSERT, "3", "3"));
+ }
+
@ParameterizedTest
@CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive,
false"})
public void testSinkWithSchemaChange(String metastore, boolean
enableDeleteVector)
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java
index b569aed36..d9398d749 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java
@@ -212,6 +212,185 @@ public class MySqlToPaimonE2eITCase extends
PipelineTestEnvironment {
validateSinkResult(warehouse, database, "products",
recordsInSnapshotPhase);
}
+ @Test
+ public void testSinkToAppendOnlyTable() throws Exception {
+ String warehouse = sharedVolume.toString() + "/" + "paimon_" +
UUID.randomUUID();
+ String database = inventoryDatabase.getDatabaseName();
+ String pipelineJob =
+ String.format(
+ "source:\n"
+ + " type: mysql\n"
+ + " hostname: mysql\n"
+ + " port: 3306\n"
+ + " username: %s\n"
+ + " password: %s\n"
+ + " tables: %s.appendOnlySource\n"
+ + " server-id: 5400-5404\n"
+ + " server-time-zone: UTC\n"
+ + "
scan.incremental.snapshot.chunk.key-column: %s.appendOnlySource:id\n"
+ + "\n"
+ + "sink:\n"
+ + " type: paimon\n"
+ + " catalog.properties.warehouse: %s\n"
+ + " catalog.properties.metastore:
filesystem\n"
+ + " catalog.properties.cache-enabled: false\n"
+ + "\n"
+ + "pipeline:\n"
+ + " schema.change.behavior: evolve\n"
+ + " parallelism: 4",
+ MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, database,
database, warehouse);
+ Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+ Path paimonCdcConnector =
TestUtils.getResource("paimon-cdc-pipeline-connector.jar");
+ Path hadoopJar = TestUtils.getResource("flink-shade-hadoop.jar");
+ Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+ String mysqlJdbcUrl =
+ String.format(
+ "jdbc:mysql://%s:%s/%s",
+ MYSQL.getHost(), MYSQL.getDatabasePort(), database);
+ try (Connection conn =
+ DriverManager.getConnection(
+ mysqlJdbcUrl, MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD);
+ Statement stat = conn.createStatement()) {
+ stat.execute(
+ "CREATE TABLE appendOnlySource (\n"
+ + " id INTEGER NOT NULL,\n"
+ + " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
+ + " description VARCHAR(512),\n"
+ + " weight FLOAT,\n"
+ + " enum_c enum('red', 'white') default 'red',\n"
+ + " json_c JSON,\n"
+ + " point_c POINT)");
+ stat.execute(
+ "INSERT INTO appendOnlySource \n"
+ + "VALUES (1,\"One\", \"Alice\", 3.202, 'red',
'{\"key1\": \"value1\"}', null),\n"
+ + " (2,\"Two\", \"Bob\", 1.703,
'white', '{\"key2\": \"value2\"}', null),\n"
+ + " (3,\"Three\", \"Cecily\", 4.105, 'red',
'{\"key3\": \"value3\"}', null),\n"
+ + " (4,\"Four\", \"Derrida\", 1.857,
'white', '{\"key4\": \"value4\"}', null),\n"
+ + " (5,\"Five\", \"Evelyn\", 5.211, 'red',
'{\"K\": \"V\", \"k\": \"v\"}', null),\n"
+ + " (6,\"Six\", \"Ferris\", 9.813, null,
null, null),\n"
+ + " (7,\"Seven\", \"Grace\", 2.117, null,
null, null),\n"
+ + " (8,\"Eight\", \"Hesse\", 6.819, null,
null, null),\n"
+ + " (9,\"Nine\", \"IINA\", 5.223, null,
null, null)");
+ } catch (SQLException e) {
+ LOG.error("Create table for CDC failed.", e);
+ throw e;
+ }
+ submitPipelineJob(pipelineJob, mysqlCdcJar, paimonCdcConnector,
mysqlDriverJar, hadoopJar);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+ LOG.info("Pipeline job is running");
+ validateSinkResult(
+ warehouse,
+ database,
+ "appendOnlySource",
+ Arrays.asList(
+ "1, One, Alice, 3.202, red, {\"key1\": \"value1\"},
null",
+ "2, Two, Bob, 1.703, white, {\"key2\": \"value2\"},
null",
+ "3, Three, Cecily, 4.105, red, {\"key3\": \"value3\"},
null",
+ "4, Four, Derrida, 1.857, white, {\"key4\":
\"value4\"}, null",
+ "5, Five, Evelyn, 5.211, red, {\"K\": \"V\", \"k\":
\"v\"}, null",
+ "6, Six, Ferris, 9.813, null, null, null",
+ "7, Seven, Grace, 2.117, null, null, null",
+ "8, Eight, Hesse, 6.819, null, null, null",
+ "9, Nine, IINA, 5.223, null, null, null"));
+
+ LOG.info("Begin incremental reading stage.");
+ // generate binlogs
+ List<String> recordsInIncrementalPhase;
+ try (Connection conn =
+ DriverManager.getConnection(
+ mysqlJdbcUrl, MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD);
+ Statement stat = conn.createStatement()) {
+
+ stat.execute(
+ "INSERT INTO appendOnlySource VALUES
(10,'Ten','Jukebox',0.2, null, null, null);");
+ stat.execute(
+ "INSERT INTO appendOnlySource VALUES
(11,'Eleven','Kryo',5.18, null, null, null);");
+ stat.execute(
+ "INSERT INTO appendOnlySource VALUES (12,'Twelve', 'Lily',
2.14, null, null, null);");
+ recordsInIncrementalPhase =
createChangesAndValidateForAppendOnlyTable(stat);
+ } catch (SQLException e) {
+ LOG.error("Update table for CDC failed.", e);
+ throw e;
+ }
+ List<String> recordsInSnapshotPhase =
+ new ArrayList<>(
+ Arrays.asList(
+ "1, One, Alice, 3.202, red, {\"key1\":
\"value1\"}, null, null, null, null, null, null, null, null, null, null, null",
+ "2, Two, Bob, 1.703, white, {\"key2\":
\"value2\"}, null, null, null, null, null, null, null, null, null, null, null",
+ "3, Three, Cecily, 4.105, red, {\"key3\":
\"value3\"}, null, null, null, null, null, null, null, null, null, null, null",
+ "4, Four, Derrida, 1.857, white, {\"key4\":
\"value4\"}, null, null, null, null, null, null, null, null, null, null, null",
+ "5, Five, Evelyn, 5.211, red, {\"K\": \"V\",
\"k\": \"v\"}, null, null, null, null, null, null, null, null, null, null,
null",
+ "6, Six, Ferris, 9.813, null, null, null,
null, null, null, null, null, null, null, null, null, null",
+ "7, Seven, Grace, 2.117, null, null, null,
null, null, null, null, null, null, null, null, null, null",
+ "8, Eight, Hesse, 6.819, null, null, null,
null, null, null, null, null, null, null, null, null, null",
+ "9, Nine, IINA, 5.223, null, null, null, null,
null, null, null, null, null, null, null, null, null",
+ "10, Ten, Jukebox, 0.2, null, null, null,
null, null, null, null, null, null, null, null, null, null",
+ "11, Eleven, Kryo, 5.18, null, null, null,
null, null, null, null, null, null, null, null, null, null",
+ "12, Twelve, Lily, 2.14, null, null, null,
null, null, null, null, null, null, null, null, null, null"));
+ recordsInSnapshotPhase.addAll(recordsInIncrementalPhase);
+ validateSinkResult(warehouse, database, "appendOnlySource",
recordsInSnapshotPhase);
+ }
+
+ /**
+ * Basic Schema: id INTEGER NOT NULL, name VARCHAR(255) NOT NULL,
description VARCHAR(512),
+ * weight FLOAT, enum_c enum('red', 'white'), json_c JSON.
+ */
+ private List<String> createChangesAndValidateForAppendOnlyTable(Statement
stat)
+ throws SQLException {
+ List<String> result = new ArrayList<>();
+ StringBuilder sqlFields = new StringBuilder();
+ int id = 13;
+
+ // Add Column.
+ for (int addColumnRepeat = 0; addColumnRepeat < 10; addColumnRepeat++)
{
+ stat.execute(
+ String.format(
+ "ALTER TABLE appendOnlySource ADD COLUMN
point_c_%s VARCHAR(10);",
+ addColumnRepeat));
+ sqlFields.append(", '1'");
+ StringBuilder resultFields = new StringBuilder();
+ for (int j = 0; j < 10; j++) {
+ if (j <= addColumnRepeat) {
+ resultFields.append(", 1");
+ } else {
+ resultFields.append(", null");
+ }
+ }
+ for (int j = 0; j < 1000; j++) {
+ stat.addBatch(
+ String.format(
+ "INSERT INTO appendOnlySource VALUES
(%s,'finally', null, 2.14, null, null, null %s);",
+ id, sqlFields));
+ result.add(
+ String.format(
+ "%s, finally, null, 2.14, null, null, null%s",
id, resultFields));
+ id++;
+ }
+ stat.executeBatch();
+ }
+
+ // Modify Column type.
+ for (int modifyColumnRepeat = 0; modifyColumnRepeat < 10;
modifyColumnRepeat++) {
+ for (int j = 0; j < 1000; j++) {
+ stat.addBatch(
+ String.format(
+ "INSERT INTO appendOnlySource VALUES
(%s,'finally', null, 2.14, null, null, null %s);",
+ id, sqlFields));
+ result.add(
+ String.format(
+ "%s, finally, null, 2.14, null, null, null%s",
+ id, ", 1, 1, 1, 1, 1, 1, 1, 1, 1, 1"));
+ id++;
+ }
+ stat.executeBatch();
+ stat.execute(
+ String.format(
+ "ALTER TABLE appendOnlySource MODIFY point_c_0
VARCHAR(%s);",
+ 10 + modifyColumnRepeat));
+ }
+ return result;
+ }
+
/**
* Basic Schema: id INTEGER NOT NULL, name VARCHAR(255) NOT NULL,
description VARCHAR(512),
* weight FLOAT, enum_c enum('red', 'white'), json_c JSON.