This is an automated email from the ASF dual-hosted git repository. kunni pushed a commit to branch security-issue-2 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit 03e9349282f20b14dce170190ac5df64e7a37921 Author: lvyanquan <lvyanquan....@alibaba-inc.com> AuthorDate: Tue Jul 29 20:35:47 2025 +0800 [hotfix] Fix CI. --- .../pipeline/tests/MySqlToIcebergE2eITCase.java | 383 --------------------- 1 file changed, 383 deletions(-) diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToIcebergE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToIcebergE2eITCase.java deleted file mode 100644 index c63c54ad6..000000000 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToIcebergE2eITCase.java +++ /dev/null @@ -1,383 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.pipeline.tests; - -import org.apache.flink.cdc.common.test.utils.TestUtils; -import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; -import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; -import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; -import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; -import org.apache.flink.cdc.pipeline.tests.utils.TarballFetcher; - -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.CatalogUtil; -import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.data.IcebergGenerics; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.types.Types; -import org.assertj.core.api.Assertions; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.BindMode; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.output.ToStringConsumer; -import org.testcontainers.lifecycle.Startables; - -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.attribute.FileAttribute; -import java.nio.file.attribute.PosixFilePermission; -import java.nio.file.attribute.PosixFilePermissions; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.UUID; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** End-to-end tests for mysql cdc to Iceberg pipeline job. */ -public class MySqlToIcebergE2eITCase extends PipelineTestEnvironment { - - private static final Logger LOG = LoggerFactory.getLogger(MySqlToIcebergE2eITCase.class); - - @TempDir public Path temporaryFolder; - - @org.testcontainers.junit.jupiter.Container - public static final MySqlContainer MYSQL = - (MySqlContainer) - new MySqlContainer( - MySqlVersion.V8_0) // v8 support both ARM and AMD architectures - .withConfigurationOverride("docker/mysql/my.cnf") - .withSetupSQL("docker/mysql/setup.sql") - .withDatabaseName("flink-test") - .withUsername("flinkuser") - .withPassword("flinkpw") - .withNetwork(NETWORK) - .withNetworkAliases("mysql") - .withLogConsumer(new Slf4jLogConsumer(LOG)); - - protected final UniqueDatabase inventoryDatabase = - new UniqueDatabase(MYSQL, "iceberg_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); - - private String warehouse; - - @BeforeAll - public static void initializeContainers() { - LOG.info("Starting containers..."); - Startables.deepStart(Stream.of(MYSQL)).join(); - LOG.info("Containers are started."); - } - - @BeforeEach - public void before() throws Exception { - LOG.info("Starting containers..."); - Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwxrwxrwx"); - FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms); - warehouse = - Files.createDirectory(temporaryFolder.resolve(UUID.randomUUID().toString()), attr) - .toString(); - jobManagerConsumer = new ToStringConsumer(); - jobManager = - new GenericContainer<>(getFlinkDockerImageTag()) - .withCommand("jobmanager") - .withNetwork(NETWORK) - .withNetworkAliases(INTER_CONTAINER_JM_ALIAS) - .withExposedPorts(JOB_MANAGER_REST_PORT) - .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) - .withCreateContainerCmdModifier(cmd -> cmd.withVolumes(sharedVolume)) - .withFileSystemBind(warehouse, warehouse, BindMode.READ_WRITE) - .withLogConsumer(jobManagerConsumer); - Startables.deepStart(Stream.of(jobManager)).join(); - LOG.info("JobManager is started."); - runInContainerAsRoot(jobManager, "chmod", "0777", "-R", sharedVolume.toString()); - runInContainerAsRoot(jobManager, "chmod", "0777", "-R", warehouse); - - taskManagerConsumer = new ToStringConsumer(); - taskManager = - new GenericContainer<>(getFlinkDockerImageTag()) - .withCommand("taskmanager") - .withNetwork(NETWORK) - .withNetworkAliases(INTER_CONTAINER_TM_ALIAS) - .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) - .dependsOn(jobManager) - .withVolumesFrom(jobManager, BindMode.READ_WRITE) - .withFileSystemBind(warehouse, warehouse, BindMode.READ_WRITE) - .withLogConsumer(taskManagerConsumer); - Startables.deepStart(Stream.of(taskManager)).join(); - LOG.info("TaskManager is started."); - runInContainerAsRoot(taskManager, "chmod", "0777", "-R", sharedVolume.toString()); - runInContainerAsRoot(taskManager, "chmod", "0777", "-R", warehouse); - inventoryDatabase.createAndInitialize(); - - TarballFetcher.fetchLatest(jobManager); - LOG.info("CDC executables deployed."); - } - - @AfterEach - public void after() { - try { - super.after(); - inventoryDatabase.dropDatabase(); - } catch (Exception e) { - LOG.error("Failed to clean up resources", e); - } - } - - @Test - public void testSyncWholeDatabase() throws Exception { - String database = inventoryDatabase.getDatabaseName(); - String pipelineJob = - String.format( - "source:\n" - + " type: mysql\n" - + " hostname: mysql\n" - + " port: 3306\n" - + " username: %s\n" - + " password: %s\n" - + " tables: %s.\\.*\n" - + " server-id: 5400-5404\n" - + " server-time-zone: UTC\n" - + "\n" - + "sink:\n" - + " type: iceberg\n" - + " catalog.properties.warehouse: %s\n" - + " catalog.properties.type: hadoop\n" - + "\n" - + "pipeline:\n" - + " schema.change.behavior: evolve\n" - + " parallelism: %s", - MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, database, warehouse, parallelism); - Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); - Path icebergCdcConnector = TestUtils.getResource("iceberg-cdc-pipeline-connector.jar"); - Path hadoopJar = TestUtils.getResource("flink-shade-hadoop.jar"); - Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); - submitPipelineJob(pipelineJob, mysqlCdcJar, icebergCdcConnector, mysqlDriverJar, hadoopJar); - waitUntilJobRunning(Duration.ofSeconds(60)); - LOG.info("Pipeline job is running"); - validateSinkResult( - warehouse, - database, - "products", - Arrays.asList( - "101, One, Alice, 3.202, red, {\"key1\": \"value1\"}, null", - "102, Two, Bob, 1.703, white, {\"key2\": \"value2\"}, null", - "103, Three, Cecily, 4.105, red, {\"key3\": \"value3\"}, null", - "104, Four, Derrida, 1.857, white, {\"key4\": \"value4\"}, null", - "105, Five, Evelyn, 5.211, red, {\"K\": \"V\", \"k\": \"v\"}, null", - "106, Six, Ferris, 9.813, null, null, null", - "107, Seven, Grace, 2.117, null, null, null", - "108, Eight, Hesse, 6.819, null, null, null", - "109, Nine, IINA, 5.223, null, null, null")); - - validateSinkResult( - warehouse, - database, - "customers", - Arrays.asList( - "101, user_1, Shanghai, 123567891234", - "102, user_2, Shanghai, 123567891234", - "103, user_3, Shanghai, 123567891234", - "104, user_4, Shanghai, 123567891234")); - - LOG.info("Begin incremental reading stage."); - // generate binlogs - String mysqlJdbcUrl = - String.format( - "jdbc:mysql://%s:%s/%s", - MYSQL.getHost(), MYSQL.getDatabasePort(), database); - List<String> recordsInIncrementalPhase; - try (Connection conn = - DriverManager.getConnection( - mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); - Statement stat = conn.createStatement()) { - - stat.execute( - "INSERT INTO products VALUES (default,'Ten','Jukebox',0.2, null, null, null);"); // 110 - stat.execute("UPDATE products SET description='Fay' WHERE id=106;"); - stat.execute("UPDATE products SET weight='5.125' WHERE id=107;"); - - // modify table schema - stat.execute("ALTER TABLE products DROP COLUMN point_c;"); - stat.execute("DELETE FROM products WHERE id=101;"); - - stat.execute( - "INSERT INTO products VALUES (default,'Eleven','Kryo',5.18, null, null);"); // 111 - stat.execute( - "INSERT INTO products VALUES (default,'Twelve', 'Lily', 2.14, null, null);"); // 112 - recordsInIncrementalPhase = createChangesAndValidate(stat); - } catch (SQLException e) { - LOG.error("Update table for CDC failed.", e); - throw e; - } - List<String> recordsInSnapshotPhase = - new ArrayList<>( - Arrays.asList( - "102, Two, Bob, 1.703, white, {\"key2\": \"value2\"}, null, null, null, null, null, null, null, null, null, null", - "103, Three, Cecily, 4.105, red, {\"key3\": \"value3\"}, null, null, null, null, null, null, null, null, null, null", - "104, Four, Derrida, 1.857, white, {\"key4\": \"value4\"}, null, null, null, null, null, null, null, null, null, null", - "105, Five, Evelyn, 5.211, red, {\"K\": \"V\", \"k\": \"v\"}, null, null, null, null, null, null, null, null, null, null", - "106, Six, Fay, 9.813, null, null, null, null, null, null, null, null, null, null, null, null", - "107, Seven, Grace, 5.125, null, null, null, null, null, null, null, null, null, null, null, null", - "108, Eight, Hesse, 6.819, null, null, null, null, null, null, null, null, null, null, null, null", - "109, Nine, IINA, 5.223, null, null, null, null, null, null, null, null, null, null, null, null", - "110, Ten, Jukebox, 0.2, null, null, null, null, null, null, null, null, null, null, null, null", - "111, Eleven, Kryo, 5.18, null, null, null, null, null, null, null, null, null, null, null, null", - "112, Twelve, Lily, 2.14, null, null, null, null, null, null, null, null, null, null, null, null")); - recordsInSnapshotPhase.addAll(recordsInIncrementalPhase); - recordsInSnapshotPhase = - recordsInSnapshotPhase.stream().sorted().collect(Collectors.toList()); - validateSinkResult(warehouse, database, "products", recordsInSnapshotPhase); - } - - /** - * Basic Schema: id INTEGER NOT NULL, name VARCHAR(255) NOT NULL, description VARCHAR(512), - * weight FLOAT, enum_c enum('red', 'white'), json_c JSON. - */ - private List<String> createChangesAndValidate(Statement stat) throws SQLException { - List<String> result = new ArrayList<>(); - StringBuilder sqlFields = new StringBuilder(); - - // Add Column. - for (int addColumnRepeat = 0; addColumnRepeat < 10; addColumnRepeat++) { - stat.execute( - String.format( - "ALTER TABLE products ADD COLUMN point_c_%s VARCHAR(10);", - addColumnRepeat)); - sqlFields.append(", '1'"); - StringBuilder resultFields = new StringBuilder(); - for (int addedFieldCount = 0; addedFieldCount < 10; addedFieldCount++) { - if (addedFieldCount <= addColumnRepeat) { - resultFields.append(", 1"); - } else { - resultFields.append(", null"); - } - } - for (int statementCount = 0; statementCount < 1000; statementCount++) { - stat.addBatch( - String.format( - "INSERT INTO products VALUES (default,'finally', null, 2.14, null, null %s);", - sqlFields)); - int id = addColumnRepeat * 1000 + statementCount + 113; - result.add( - String.format("%s, finally, null, 2.14, null, null%s", id, resultFields)); - } - stat.executeBatch(); - } - - // Modify Column type. - for (int modifyColumnRepeat = 0; modifyColumnRepeat < 10; modifyColumnRepeat++) { - for (int statementCount = 0; statementCount < 1000; statementCount++) { - stat.addBatch( - String.format( - "INSERT INTO products VALUES (default,'finally', null, 2.14, null, null %s);", - sqlFields)); - int id = modifyColumnRepeat * 1000 + statementCount + 10113; - result.add( - String.format( - "%s, finally, null, 2.14, null, null%s", - id, ", 1, 1, 1, 1, 1, 1, 1, 1, 1, 1")); - } - stat.executeBatch(); - stat.execute( - String.format( - "ALTER TABLE products MODIFY point_c_0 VARCHAR(%s);", - 10 + modifyColumnRepeat)); - } - - return result; - } - - private List<String> fetchIcebergTableRows( - String warehouse, String databaseName, String tableName) throws Exception { - Map<String, String> catalogOptions = new HashMap<>(); - catalogOptions.put("type", "hadoop"); - catalogOptions.put("warehouse", warehouse); - catalogOptions.put("cache-enabled", "false"); - Catalog catalog = - CatalogUtil.buildIcebergCatalog( - "cdc-iceberg-catalog", catalogOptions, new Configuration()); - List<String> results = new ArrayList<>(); - Table table = catalog.loadTable(TableIdentifier.of(databaseName, tableName)); - org.apache.iceberg.Schema schema = table.schema(); - CloseableIterable<Record> records = IcebergGenerics.read(table).project(schema).build(); - for (Record record : records) { - List<String> fieldValues = new ArrayList<>(); - for (Types.NestedField field : schema.columns()) { - String fieldValue = Objects.toString(record.getField(field.name()), "null"); - fieldValues.add(fieldValue); - } - String joinedString = String.join(", ", fieldValues); - results.add(joinedString); - } - return results; - } - - private void validateSinkResult( - String warehouse, String database, String table, List<String> expected) - throws InterruptedException { - runInContainerAsRoot(jobManager, "chmod", "0777", "-R", warehouse); - LOG.info("Verifying Iceberg {}::{}::{} results...", warehouse, database, table); - long deadline = System.currentTimeMillis() + EVENT_WAITING_TIMEOUT.toMillis(); - List<String> results = Collections.emptyList(); - while (System.currentTimeMillis() < deadline) { - try { - results = fetchIcebergTableRows(warehouse, database, table); - results = results.stream().sorted().collect(Collectors.toList()); - for (int recordIndex = 0; recordIndex < results.size(); recordIndex++) { - Assertions.assertThat(results.get(recordIndex)) - .isEqualTo(expected.get(recordIndex)); - } - LOG.info( - "Successfully verified {} records in {} seconds.", - expected.size(), - (System.currentTimeMillis() - deadline + EVENT_WAITING_TIMEOUT.toMillis()) - / 1000); - return; - } catch (Exception e) { - LOG.warn("Validate failed, waiting for the next loop...", e); - } catch (AssertionError ignored) { - // AssertionError contains way too much records and might flood the log output. - LOG.warn( - "Results mismatch, expected {} records, but got {} actually. Waiting for the next loop...", - expected.size(), - results.size()); - } - Thread.sleep(10000L); - } - Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected); - } -}