This is an automated email from the ASF dual-hosted git repository. yux pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 42f91a864 [tests][pipeline-connector/fluss] Add MySQL to Fluss E2e IT case (#4057) 42f91a864 is described below commit 42f91a864e329c00959828fe0ca4f1e9e8e1de75 Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com> AuthorDate: Wed Aug 6 15:59:34 2025 +0800 [tests][pipeline-connector/fluss] Add MySQL to Fluss E2e IT case (#4057) * [ci][fluss] Add MySQL to Fluss E2e IT case Signed-off-by: yuxiqian <34335406+yuxiq...@users.noreply.github.com> * add: comments Signed-off-by: yuxiqian <34335406+yuxiq...@users.noreply.github.com> --------- Signed-off-by: yuxiqian <34335406+yuxiq...@users.noreply.github.com> --- .../flink-cdc-pipeline-e2e-tests/pom.xml | 19 ++ .../flink/cdc/pipeline/tests/FlussE2eITCase.java | 325 +++++++++++++++++++++ .../tests/utils/PipelineTestEnvironment.java | 19 +- .../test/resources/ddl/mysql_inventory_wo_pk.sql | 54 ++++ .../src/test/resources/docker/peek-fluss.sql | 32 ++ 5 files changed, 448 insertions(+), 1 deletion(-) diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml index 521d2ba2d..353c0f1fb 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml @@ -42,6 +42,7 @@ limitations under the License. <maven.plugin.download.version>1.6.8</maven.plugin.download.version> <iceberg.version>1.6.1</iceberg.version> <hive.version>2.3.9</hive.version> + <fluss.version>0.7.0</fluss.version> </properties> <dependencies> @@ -602,6 +603,24 @@ limitations under the License. <outputDirectory>${project.build.directory}/dependencies </outputDirectory> </artifactItem> + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-cdc-pipeline-connector-fluss</artifactId> + <version>${project.version}</version> + <destFileName>fluss-cdc-pipeline-connector.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies + </outputDirectory> + </artifactItem> + <artifactItem> + <groupId>com.alibaba.fluss</groupId> + <artifactId>fluss-flink-${flink.major.version}</artifactId> + <version>${fluss.version}</version> + <destFileName>fluss-sql-connector.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies + </outputDirectory> + </artifactItem> <artifactItem> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop-2-uber</artifactId> diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/FlussE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/FlussE2eITCase.java new file mode 100644 index 000000000..c20808fa3 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/FlussE2eITCase.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.pipeline.tests; + +import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; + +import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.images.builder.Transferable; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** An End-to-end test case for Fluss pipeline connector. */ +@Testcontainers +public class FlussE2eITCase extends PipelineTestEnvironment { + + private static final Logger LOG = LoggerFactory.getLogger(FlussE2eITCase.class); + private static final Duration FLUSS_TESTCASE_TIMEOUT = Duration.ofMinutes(3); + private static final String flussImageTag = "fluss/fluss:0.7.0"; + private static final String zooKeeperImageTag = "zookeeper:3.9.2"; + + private static final List<String> flussCoordinatorProperties = + Arrays.asList( + "zookeeper.address: zookeeper:2181", + "bind.listeners: INTERNAL://coordinator-server:0, CLIENT://coordinator-server:9123", + "internal.listener.name: INTERNAL", + "remote.data.dir: /tmp/fluss/remote-data", + "security.protocol.map: CLIENT:SASL, INTERNAL:PLAINTEXT", + "security.sasl.enabled.mechanisms: PLAIN", + "security.sasl.plain.jaas.config: com.alibaba.fluss.security.auth.sasl.plain.PlainLoginModule required user_admin=\"admin-pass\" user_developer=\"developer-pass\";", + "super.users: User:admin"); + + private static final List<String> flussTabletServerProperties = + Arrays.asList( + "zookeeper.address: zookeeper:2181", + "bind.listeners: INTERNAL://tablet-server:0, CLIENT://tablet-server:9123", + "internal.listener.name: INTERNAL", + "tablet-server.id: 0", + "kv.snapshot.interval: 0s", + "data.dir: /tmp/fluss/data", + "remote.data.dir: /tmp/fluss/remote-data", + "security.protocol.map: CLIENT:SASL, INTERNAL:PLAINTEXT", + "security.sasl.enabled.mechanisms: PLAIN", + "security.sasl.plain.jaas.config: com.alibaba.fluss.security.auth.sasl.plain.PlainLoginModule required user_admin=\"admin-pass\" user_developer=\"developer-pass\";", + "super.users: User:admin"); + + @Container + private static final GenericContainer<?> ZOOKEEPER = + new GenericContainer<>(zooKeeperImageTag) + .withNetworkAliases("zookeeper") + .withExposedPorts(2181) + .withNetwork(NETWORK) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + @Container + private static final GenericContainer<?> FLUSS_COORDINATOR = + new GenericContainer<>(flussImageTag) + .withEnv( + ImmutableMap.of( + "FLUSS_PROPERTIES", + String.join("\n", flussCoordinatorProperties))) + .withCommand("coordinatorServer") + .withNetworkAliases("coordinator-server") + .withExposedPorts(9123) + .withNetwork(NETWORK) + .dependsOn(ZOOKEEPER) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + @Container + private static final GenericContainer<?> FLUSS_TABLET_SERVER = + new GenericContainer<>(flussImageTag) + .withEnv( + ImmutableMap.of( + "FLUSS_PROPERTIES", + String.join("\n", flussTabletServerProperties))) + .withCommand("tabletServer") + .withNetworkAliases("tablet-server") + .withExposedPorts(9123) + .withNetwork(NETWORK) + .dependsOn(ZOOKEEPER, FLUSS_COORDINATOR) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + protected final UniqueDatabase inventoryDatabaseWithPrimaryKey = + new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + + protected final UniqueDatabase inventoryDatabaseWithoutPrimaryKey = + new UniqueDatabase( + MYSQL, "mysql_inventory_wo_pk", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + + @Override + protected List<String> copyJarToFlinkLib() { + // Due to a bug described in https://github.com/apache/fluss/pull/1267, it's not viable to + // pass Fluss dependency with `--jar` CLI option. We may remove this workaround and use + // `submitPipelineJob` to carry extra jar later. + return Collections.singletonList("fluss-sql-connector.jar"); + } + + @BeforeEach + public void before() throws Exception { + super.before(); + inventoryDatabaseWithPrimaryKey.createAndInitialize(); + inventoryDatabaseWithoutPrimaryKey.createAndInitialize(); + } + + @AfterEach + public void after() { + super.after(); + inventoryDatabaseWithPrimaryKey.dropDatabase(); + inventoryDatabaseWithoutPrimaryKey.dropDatabase(); + } + + @ParameterizedTest(name = "PkTable: {0}") + @ValueSource(booleans = {true, false}) + void testMySqlToFluss(boolean hasPrimaryKey) throws Exception { + UniqueDatabase inventoryDatabase = + hasPrimaryKey + ? inventoryDatabaseWithPrimaryKey + : inventoryDatabaseWithoutPrimaryKey; + String database = inventoryDatabase.getDatabaseName(); + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + " scan.incremental.snapshot.chunk.key-column: %s.\\.*:id\n" + + "\n" + + "sink:\n" + + " type: fluss\n" + + " bootstrap.servers: coordinator-server:9123\n" + + " properties.client.security.protocol: sasl\n" + + " properties.client.security.sasl.mechanism: PLAIN\n" + + " properties.client.security.sasl.username: developer\n" + + " properties.client.security.sasl.password: developer-pass\n" + + "\n" + + "pipeline:\n" + + " parallelism: %d", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + database, + database, + parallelism); + Path flussConnector = TestUtils.getResource("fluss-cdc-pipeline-connector.jar"); + submitPipelineJob(pipelineJob, flussConnector); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + + validateSinkResult( + database, + "products", + Arrays.asList( + "101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}", + "102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}", + "103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}", + "104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}", + "105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}", + "106, hammer, 16oz carpenter's hammer, 1.0, null, null, null", + "107, rocks, box of assorted rocks, 5.3, null, null, null", + "108, jacket, water resistent black wind breaker, 0.1, null, null, null", + "109, spare tire, 24 inch spare tire, 22.2, null, null, null")); + + validateSinkResult( + database, + "customers", + Arrays.asList( + "101, user_1, Shanghai, 123567891234", + "102, user_2, Shanghai, 123567891234", + "103, user_3, Shanghai, 123567891234", + "104, user_4, Shanghai, 123567891234")); + + if (!hasPrimaryKey) { + // Non-primary key does not support deleting rows for now. + return; + } + + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + inventoryDatabase.getDatabaseName()); + + // Fluss does not support applying DDL events for now. + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); + stat.execute("UPDATE products SET weight='5.1' WHERE id=107;"); + stat.execute("DELETE FROM products WHERE id=111;"); + stat.execute( + "INSERT INTO products VALUES (default,'jacket','water resistant white wind breaker', 0.2, null, null, null);"); + stat.execute( + "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter', 5.18, null, null, null);"); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + + validateSinkResult( + database, + "products", + Arrays.asList( + "101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}", + "102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}", + "103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}", + "104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}", + "105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}", + "106, hammer, 18oz carpenter hammer, 1.0, null, null, null", + "107, rocks, box of assorted rocks, 5.1, null, null, null", + "108, jacket, water resistent black wind breaker, 0.1, null, null, null", + "109, spare tire, 24 inch spare tire, 22.2, null, null, null", + "110, jacket, water resistant white wind breaker, 0.2, null, null, null", + "111, scooter, Big 2-wheel scooter, 5.18, null, null, null")); + } + + private List<String> fetchFlussTableRows(String database, String table, int rowCount) + throws Exception { + String template = + readLines("docker/peek-fluss.sql").stream() + .filter(line -> !line.startsWith("--")) + .collect(Collectors.joining("\n")); + String sql = String.format(template, database, table, rowCount); + String containerSqlPath = sharedVolume.toString() + "/peek.sql"; + jobManager.copyFileToContainer(Transferable.of(sql), containerSqlPath); + + org.testcontainers.containers.Container.ExecResult result = + jobManager.execInContainer("/opt/flink/bin/sql-client.sh", "-f", containerSqlPath); + if (result.getExitCode() != 0) { + throw new RuntimeException( + "Failed to execute peek script. Stdout: " + + result.getStdout() + + "; Stderr: " + + result.getStderr()); + } + + return Arrays.stream(result.getStdout().split("\n")) + .filter(line -> line.startsWith("|")) + .skip(1) + .map(FlussE2eITCase::extractRow) + .map(row -> String.format("%s", String.join(", ", row))) + .collect(Collectors.toList()); + } + + private static String[] extractRow(String row) { + return Arrays.stream(row.split("\\|")) + .map(String::trim) + .filter(col -> !col.isEmpty()) + .map(col -> col.equals("<NULL>") ? "null" : col) + .toArray(String[]::new); + } + + private void validateSinkResult(String database, String table, List<String> expected) + throws InterruptedException { + LOG.info("Verifying Fluss {}::{} results...", database, table); + long deadline = System.currentTimeMillis() + FLUSS_TESTCASE_TIMEOUT.toMillis(); + List<String> results = Collections.emptyList(); + int rowCount = expected.size(); + while (System.currentTimeMillis() < deadline) { + try { + results = fetchFlussTableRows(database, table, rowCount); + Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected); + LOG.info( + "Successfully verified {} records in {} seconds.", + expected.size(), + (System.currentTimeMillis() - deadline + FLUSS_TESTCASE_TIMEOUT.toMillis()) + / 1000); + return; + } catch (Exception e) { + LOG.warn("Validate failed, waiting for the next loop...", e); + } catch (AssertionError ignored) { + // AssertionError contains way too much records and might flood the log output. + LOG.warn( + "Results mismatch, expected {} records, but got {} actually. Waiting for the next loop...", + expected.size(), + results.size()); + } + Thread.sleep(1000L); + } + Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected); + } +} diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java index 279be44d3..6e8bac917 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java @@ -64,6 +64,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -138,7 +139,10 @@ public abstract class PipelineTestEnvironment extends TestLogger { "state.backend.type: hashmap", "env.java.opts.all: -Doracle.jdbc.timezoneAsRegion=false", "execution.checkpointing.savepoint-dir: file:///opt/flink", - "restart-strategy.type: off"); + "restart-strategy.type: off", + // Set off-heap memory explicitly to avoid "java.lang.OutOfMemoryError: Direct + // buffer memory" error. + "taskmanager.memory.task.off-heap.size: 128mb"); public static final String FLINK_PROPERTIES = String.join("\n", EXTERNAL_PROPS); @Nullable protected RestClusterClient<StandaloneClusterId> restClusterClient; @@ -162,6 +166,10 @@ public abstract class PipelineTestEnvironment extends TestLogger { return flinkVersion; } + protected List<String> copyJarToFlinkLib() { + return Collections.emptyList(); + } + @BeforeEach public void before() throws Exception { LOG.info("Starting containers..."); @@ -175,6 +183,15 @@ public abstract class PipelineTestEnvironment extends TestLogger { .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) .withCreateContainerCmdModifier(cmd -> cmd.withVolumes(sharedVolume)) .withLogConsumer(jobManagerConsumer); + + List<String> jarToCopy = copyJarToFlinkLib(); + if (!jarToCopy.isEmpty()) { + for (String jar : jarToCopy) { + jobManager.withCopyFileToContainer( + MountableFile.forHostPath(TestUtils.getResource(jar)), "/opt/flink/lib/"); + } + } + Startables.deepStart(Stream.of(jobManager)).join(); runInContainerAsRoot(jobManager, "chmod", "0777", "-R", sharedVolume.toString()); LOG.info("JobManager is started."); diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/mysql_inventory_wo_pk.sql b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/mysql_inventory_wo_pk.sql new file mode 100644 index 000000000..01fc0d123 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/mysql_inventory_wo_pk.sql @@ -0,0 +1,54 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: mysql_inventory +-- ---------------------------------------------------------------------------------------------------------------- + +-- Create and populate our products using a single insert with many rows +CREATE TABLE products ( + id INTEGER NOT NULL, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + description VARCHAR(512), + weight FLOAT, + enum_c enum('red', 'white') default 'red', -- test some complex types as well, + json_c JSON, -- because we use additional dependencies to deserialize complex types. + point_c POINT +); + +INSERT INTO products +VALUES (101,"scooter","Small 2-wheel scooter",3.14, 'red', '{"key1": "value1"}', ST_GeomFromText('POINT(1 1)')), + (102,"car battery","12V car battery",8.1, 'white', '{"key2": "value2"}', ST_GeomFromText('POINT(2 2)')), + (103,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8, 'red', '{"key3": "value3"}', ST_GeomFromText('POINT(3 3)')), + (104,"hammer","12oz carpenter's hammer",0.75, 'white', '{"key4": "value4"}', ST_GeomFromText('POINT(4 4)')), + (105,"hammer","14oz carpenter's hammer",0.875, 'red', '{"k1": "v1", "k2": "v2"}', ST_GeomFromText('POINT(5 5)')), + (106,"hammer","16oz carpenter's hammer",1.0, null, null, null), + (107,"rocks","box of assorted rocks",5.3, null, null, null), + (108,"jacket","water resistent black wind breaker",0.1, null, null, null), + (109,"spare tire","24 inch spare tire",22.2, null, null, null); + +-- Create and populate our customers using a single insert with many rows +CREATE TABLE customers ( + id INTEGER NOT NULL, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + address VARCHAR(1024), + phone_number VARCHAR(512) +); + +INSERT INTO customers +VALUES (101,"user_1","Shanghai","123567891234"), + (102,"user_2","Shanghai","123567891234"), + (103,"user_3","Shanghai","123567891234"), + (104,"user_4","Shanghai","123567891234"); diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/docker/peek-fluss.sql b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/docker/peek-fluss.sql new file mode 100644 index 000000000..fd7600c18 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/docker/peek-fluss.sql @@ -0,0 +1,32 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- Format this file with the following arguments: +-- Database Name, Table Name, and desired row count. + +SET 'execution.runtime-mode' = 'batch'; +SET 'table.display.max-column-width' = '100000'; +SET 'sql-client.execution.result-mode' = 'tableau'; + +CREATE CATALOG fluss_catalog WITH ( + 'type' = 'fluss', + 'bootstrap.servers' = 'coordinator-server:9123', + 'client.security.protocol' = 'sasl', + 'client.security.sasl.mechanism' = 'PLAIN', + 'client.security.sasl.username' = 'developer', + 'client.security.sasl.password' = 'developer-pass' +); + +SELECT * FROM fluss_catalog.%s.%s LIMIT %d; \ No newline at end of file