This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new fafe05a382 [INLONG-8999][Sort] Add Sqlserver connector on flink 1.15
(#9008)
fafe05a382 is described below
commit fafe05a38276c2a8c264c976eed0427a133fa50f
Author: EpicMo <[email protected]>
AuthorDate: Mon Oct 9 18:46:16 2023 +0800
[INLONG-8999][Sort] Add Sqlserver connector on flink 1.15 (#9008)
---
.../src/main/assemblies/sort-connectors-v1.15.xml | 8 +
inlong-sort/sort-core/pom.xml | 6 +
.../sort-end-to-end-tests-v1.15/pom.xml | 22 +++
.../apache/inlong/sort/tests/MysqlToRocksTest.java | 3 +-
...ocksTest.java => SqlserverToStarRocksTest.java} | 100 +++++++----
.../sort/tests/utils/MSSQLServerContainer.java | 148 ++++++++++++++++
.../src/test/resources/docker/sqlserver/setup.sql | 26 +++
.../src/test/resources/flinkSql/sqlserver_test.sql | 36 ++++
.../sort-flink-v1.15/sort-connectors/pom.xml | 1 +
.../sort-connectors/sqlserver-cdc/pom.xml | 133 +++++++++++++++
.../sort/sqlserver/SqlserverTableFactory.java | 190 +++++++++++++++++++++
.../org.apache.flink.table.factories.Factory | 16 ++
licenses/inlong-sort-connectors/LICENSE | 5 +
.../LICENSE-testcontainers-java-mssqlserver.txt | 21 +++
pom.xml | 1 +
15 files changed, 678 insertions(+), 38 deletions(-)
diff --git a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
index 184e727a91..a91ea91e24 100644
--- a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
+++ b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
@@ -51,6 +51,14 @@
</includes>
<fileMode>0644</fileMode>
</fileSet>
+ <fileSet>
+
<directory>../inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/target</directory>
+ <outputDirectory>inlong-sort/connectors</outputDirectory>
+ <includes>
+
<include>sort-connector-sqlserver-cdc-v1.15-${project.version}.jar</include>
+ </includes>
+ <fileMode>0644</fileMode>
+ </fileSet>
<fileSet>
<directory>../inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/target</directory>
<outputDirectory>inlong-sort/connectors</outputDirectory>
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index b18604b67d..8e78f08a1c 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -257,6 +257,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-sqlserver-cdc-v1.15</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-mysql-cdc-v1.15</artifactId>
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
index 6841331770..37244cf14f 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
@@ -52,6 +52,12 @@
<artifactId>mysql-connector-java</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.microsoft.sqlserver</groupId>
+ <artifactId>mssql-jdbc</artifactId>
+ <version>12.4.1.jre8</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
@@ -157,6 +163,14 @@
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</dependency>
+ <dependency>
+ <groupId>com.microsoft.sqlserver</groupId>
+ <artifactId>mssql-jdbc</artifactId>
+ <version>${mssql.jdbc.version}</version>
+ <destFileName>mssql-driver.jar</destFileName>
+ <type>jar</type>
+
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+ </dependency>
<artifactItem>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-postgres-cdc-v1.15</artifactId>
@@ -181,6 +195,14 @@
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
+ <artifactItem>
+ <groupId>org.apache.inlong</groupId>
+
<artifactId>sort-connector-sqlserver-cdc-v1.15</artifactId>
+ <version>${project.version}</version>
+
<destFileName>sort-connector-sqlserver-cdc.jar</destFileName>
+ <type>jar</type>
+
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+ </artifactItem>
</artifactItems>
</configuration>
<executions>
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksTest.java
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksTest.java
index e02501cfd1..0b777cf429 100644
---
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksTest.java
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksTest.java
@@ -47,6 +47,7 @@ import static
org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINE
import static
org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG;
import static
org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName;
import static
org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable;
+import static
org.apache.inlong.sort.tests.utils.StarRocksManager.buildStarRocksImage;
/**
* End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar.
@@ -65,7 +66,7 @@ public class MysqlToRocksTest extends FlinkContainerTestEnv {
try {
sqlFile =
Paths.get(MysqlToRocksTest.class.getResource("/flinkSql/mysql_test.sql").toURI()).toString();
- StarRocksManager.buildStarRocksImage();
+ buildStarRocksImage();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksTest.java
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/SqlserverToStarRocksTest.java
similarity index 53%
copy from
inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksTest.java
copy to
inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/SqlserverToStarRocksTest.java
index e02501cfd1..01ab9eadb1 100644
---
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksTest.java
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/SqlserverToStarRocksTest.java
@@ -19,9 +19,8 @@ package org.apache.inlong.sort.tests;
import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv;
import org.apache.inlong.sort.tests.utils.JdbcProxy;
-import org.apache.inlong.sort.tests.utils.MySqlContainer;
+import org.apache.inlong.sort.tests.utils.MSSQLServerContainer;
import org.apache.inlong.sort.tests.utils.StarRocksContainer;
-import org.apache.inlong.sort.tests.utils.StarRocksManager;
import org.apache.inlong.sort.tests.utils.TestUtils;
import org.junit.AfterClass;
@@ -31,6 +30,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerImageName;
import java.net.URISyntaxException;
import java.nio.file.Path;
@@ -47,25 +47,24 @@ import static
org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINE
import static
org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG;
import static
org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName;
import static
org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable;
+import static
org.apache.inlong.sort.tests.utils.StarRocksManager.buildStarRocksImage;
-/**
- * End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar.
- * Test flink sql Mysql cdc to StarRocks
- */
-public class MysqlToRocksTest extends FlinkContainerTestEnv {
+public class SqlserverToStarRocksTest extends FlinkContainerTestEnv {
- private static final Logger LOG =
LoggerFactory.getLogger(MysqlToRocksTest.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(SqlserverToStarRocksTest.class);
- private static final Path mysqlJar =
TestUtils.getResource("sort-connector-mysql-cdc.jar");
+ private static final Path sqlserverJar =
TestUtils.getResource("sort-connector-sqlserver-cdc.jar");
private static final Path jdbcJar =
TestUtils.getResource("sort-connector-starrocks.jar");
- private static final Path mysqlJdbcJar =
TestUtils.getResource("mysql-driver.jar");
+
+ private static final Path mysqlJar =
TestUtils.getResource("mysql-driver.jar");
+
private static final String sqlFile;
static {
try {
- sqlFile =
-
Paths.get(MysqlToRocksTest.class.getResource("/flinkSql/mysql_test.sql").toURI()).toString();
- StarRocksManager.buildStarRocksImage();
+ sqlFile =
Paths.get(SqlserverToStarRocksTest.class.getResource("/flinkSql/sqlserver_test.sql").toURI())
+ .toString();
+ buildStarRocksImage();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
@@ -81,34 +80,55 @@ public class MysqlToRocksTest extends FlinkContainerTestEnv
{
.withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));
@ClassRule
- public static final MySqlContainer MYSQL_CONTAINER =
- (MySqlContainer) new
MySqlContainer(MySqlContainer.MySqlVersion.V8_0)
- .withDatabaseName("test")
+ public static final MSSQLServerContainer SQLSERVER_CONTAINER =
(MSSQLServerContainer) new MSSQLServerContainer(
+
DockerImageName.parse("mcr.microsoft.com/mssql/server").withTag("2022-latest"))
+ .acceptLicense()
.withNetwork(NETWORK)
- .withNetworkAliases("mysql")
+ .withNetworkAliases("sqlserver")
.withLogConsumer(new Slf4jLogConsumer(LOG));
@Before
public void setup() {
waitUntilJobRunning(Duration.ofSeconds(30));
- initializeMysqlTable();
+ initializeSqlserverTable();
initializeStarRocksTable(STAR_ROCKS);
}
- private void initializeMysqlTable() {
+ private void initializeSqlserverTable() {
try {
- Class.forName(MYSQL_CONTAINER.getDriverClassName());
+ // Waiting for MSSQL Agent started.
+ LOG.info("Sleep until the MSSQL Agent is started...");
+ Thread.sleep(20 * 1000);
+ LOG.info("Now continue initialize task...");
+ Class.forName(SQLSERVER_CONTAINER.getDriverClassName());
Connection conn = DriverManager
- .getConnection(MYSQL_CONTAINER.getJdbcUrl(),
MYSQL_CONTAINER.getUsername(),
- MYSQL_CONTAINER.getPassword());
+ .getConnection(SQLSERVER_CONTAINER.getJdbcUrl(),
SQLSERVER_CONTAINER.getUsername(),
+ SQLSERVER_CONTAINER.getPassword());
Statement stat = conn.createStatement();
+ stat.execute("CREATE DATABASE test;");
+ stat.execute("USE test;");
stat.execute(
- "CREATE TABLE test_input1 (\n"
- + " id SERIAL,\n"
- + " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
- + " description VARCHAR(512),\n"
- + " PRIMARY KEY(id)\n"
- + ");");
+ "CREATE TABLE test_input1 (\n" +
+ " id INT IDENTITY(1,1) NOT NULL PRIMARY KEY,\n"
+
+ " name NVARCHAR(255) NOT NULL DEFAULT
'flink',\n" +
+ " description NVARCHAR(512)\n" +
+ ");");
+ stat.execute("if exists(select 1 from sys.databases where
name='test' and is_cdc_enabled=0)\n" +
+ "begin\n" +
+ " exec sys.sp_cdc_enable_db\n" +
+ "end");
+ stat.execute("IF EXISTS(SELECT 1 FROM sys.tables WHERE
name='test_input1' AND is_tracked_by_cdc = 0)\n" +
+ "BEGIN\n" +
+ " EXEC sys.sp_cdc_enable_table\n" +
+ " @source_schema = 'dbo', -- source_schema\n" +
+ " @source_name = 'test_input1', -- table_name\n" +
+ " @capture_instance = NULL, -- capture_instance\n" +
+ " @supports_net_changes = '1', --
capture_instance\n" +
+ " @index_name = NULL, -- \n" +
+ " @captured_column_list = NULL, -- \n" +
+ " @filegroup_name = 'PRIMARY', -- \n" +
+ " @role_name = NULL -- role_name\n" +
+ "END");
stat.close();
conn.close();
} catch (Exception e) {
@@ -118,8 +138,8 @@ public class MysqlToRocksTest extends FlinkContainerTestEnv
{
@AfterClass
public static void teardown() {
- if (MYSQL_CONTAINER != null) {
- MYSQL_CONTAINER.stop();
+ if (SQLSERVER_CONTAINER != null) {
+ SQLSERVER_CONTAINER.stop();
}
if (STAR_ROCKS != null) {
STAR_ROCKS.stop();
@@ -132,20 +152,26 @@ public class MysqlToRocksTest extends
FlinkContainerTestEnv {
* @throws Exception The exception may throws when execute the case
*/
@Test
- public void testMysqlUpdateAndDelete() throws Exception {
- submitSQLJob(sqlFile, jdbcJar, mysqlJar, mysqlJdbcJar);
+ public void testSqlserverUpdateAndDelete() throws Exception {
+ submitSQLJob(sqlFile, jdbcJar, sqlserverJar, mysqlJar);
waitUntilJobRunning(Duration.ofSeconds(10));
// generate input
try (Connection conn =
- DriverManager.getConnection(MYSQL_CONTAINER.getJdbcUrl(),
MYSQL_CONTAINER.getUsername(),
- MYSQL_CONTAINER.getPassword());
+ DriverManager.getConnection(SQLSERVER_CONTAINER.getJdbcUrl(),
SQLSERVER_CONTAINER.getUsername(),
+ SQLSERVER_CONTAINER.getPassword());
Statement stat = conn.createStatement()) {
+ stat.execute("USE test;");
stat.execute(
- "INSERT INTO test_input1 "
- + "VALUES (1,'jacket','water resistent white wind
breaker');");
+ "SET IDENTITY_INSERT test_input1 ON;" +
+ "INSERT INTO test_input1 (id, name, description) "
+ + "VALUES (1, 'jacket','water resistent white wind
breaker');" +
+ "SET IDENTITY_INSERT test_input1 OFF;");
stat.execute(
- "INSERT INTO test_input1 VALUES (2,'scooter','Big 2-wheel
scooter ');");
+ "SET IDENTITY_INSERT test_input1 ON;" +
+ "INSERT INTO test_input1 (id, name, description) "
+
+ "VALUES (2,'scooter','Big 2-wheel scooter ');" +
+ "SET IDENTITY_INSERT test_input1 OFF;");
stat.execute(
"update test_input1 set name = 'tom' where id = 2;");
stat.execute(
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/MSSQLServerContainer.java
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/MSSQLServerContainer.java
new file mode 100644
index 0000000000..21cdf1b3f0
--- /dev/null
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/MSSQLServerContainer.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tests.utils;
+
+import com.google.common.collect.Sets;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.LicenseAcceptance;
+
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+/**
+ * Docker container for MSSQLServerContainer.
+ * Use {@link org.testcontainers.containers.MSSQLServerContainer} and edit
some settings for test.
+ * */
+public class MSSQLServerContainer extends JdbcDatabaseContainer {
+
+ private static final DockerImageName DEFAULT_IMAGE_NAME =
DockerImageName.parse("mcr.microsoft.com/mssql/server");
+ /** @deprecated */
+ @Deprecated
+ public static final String DEFAULT_TAG = "2017-CU12";
+ public static final String NAME = "sqlserver";
+ public static final String IMAGE;
+ public static final Integer MS_SQL_SERVER_PORT;
+ static final String DEFAULT_USER = "SA";
+ static final String DEFAULT_PASSWORD = "A_Str0ng_Required_Password";
+ private String password;
+ private static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;
+ private static final int DEFAULT_CONNECT_TIMEOUT_SECONDS = 240;
+ private static final Pattern[] PASSWORD_CATEGORY_VALIDATION_PATTERNS;
+
+ /** @deprecated */
+ @Deprecated
+ public MSSQLServerContainer() {
+ this(DEFAULT_IMAGE_NAME.withTag("2017-CU12"));
+ }
+
+ public MSSQLServerContainer(String dockerImageName) {
+ this(DockerImageName.parse(dockerImageName));
+ }
+
+ public MSSQLServerContainer(DockerImageName dockerImageName) {
+ super(dockerImageName);
+ this.password = "A_Str0ng_Required_Password";
+ dockerImageName.assertCompatibleWith(new
DockerImageName[]{DEFAULT_IMAGE_NAME});
+ this.withStartupTimeoutSeconds(240);
+ this.withConnectTimeoutSeconds(240);
+ this.addExposedPort(MS_SQL_SERVER_PORT);
+ }
+
+ public Set<Integer> getLivenessCheckPortNumbers() {
+ return Sets.newHashSet(new Integer[]{MS_SQL_SERVER_PORT});
+ }
+
+ protected void configure() {
+ if (!this.getEnvMap().containsKey("ACCEPT_EULA")) {
+ LicenseAcceptance.assertLicenseAccepted(this.getDockerImageName());
+ this.acceptLicense();
+ }
+
+ this.addEnv("SA_PASSWORD", this.password);
+ this.addEnv("MSSQL_AGENT_ENABLED", "true");
+ this.addFixedExposedPort(14433, MS_SQL_SERVER_PORT);
+ }
+
+ public MSSQLServerContainer acceptLicense() {
+ this.addEnv("ACCEPT_EULA", "Y");
+ return this;
+ }
+
+ public String getDriverClassName() {
+ return "com.microsoft.sqlserver.jdbc.SQLServerDriver";
+ }
+
+ protected String constructUrlForConnection(String queryString) {
+
+ if (urlParameters.keySet().stream().map(sp -> ((String)
sp).toLowerCase()).noneMatch("encrypt"::equals)) {
+ urlParameters.put("encrypt", "false");
+ }
+ return super.constructUrlForConnection(queryString);
+ }
+
+ public String getJdbcUrl() {
+ String additionalUrlParams = this.constructUrlParameters(";", ";");
+ return "jdbc:sqlserver://" + this.getHost() + ":" +
this.getMappedPort(MS_SQL_SERVER_PORT)
+ + additionalUrlParams;
+ }
+
+ public String getUsername() {
+ return "SA";
+ }
+
+ public String getPassword() {
+ return this.password;
+ }
+
+ public String getTestQueryString() {
+ return "SELECT 1";
+ }
+
+ public MSSQLServerContainer withPassword(String password) {
+ this.checkPasswordStrength(password);
+ this.password = password;
+ return this;
+ }
+
+ private void checkPasswordStrength(String password) {
+ if (password == null) {
+ throw new IllegalArgumentException("Null password is not allowed");
+ } else if (password.length() < 8) {
+ throw new IllegalArgumentException("Password should be at least 8
characters long");
+ } else if (password.length() > 128) {
+ throw new IllegalArgumentException("Password can be up to 128
characters long");
+ } else {
+ long satisfiedCategories =
Stream.of(PASSWORD_CATEGORY_VALIDATION_PATTERNS).filter((p) -> {
+ return p.matcher(password).find();
+ }).count();
+ if (satisfiedCategories < 3L) {
+ throw new IllegalArgumentException(
+ "Password must contain characters from three of the
following four categories:\n - Latin uppercase letters (A through Z)\n - Latin
lowercase letters (a through z)\n - Base 10 digits (0 through 9)\n -
Non-alphanumeric characters such as: exclamation point (!), dollar sign ($),
number sign (#), or percent (%).");
+ }
+ }
+ }
+
+ static {
+ IMAGE = DEFAULT_IMAGE_NAME.getUnversionedPart();
+ MS_SQL_SERVER_PORT = 1433;
+ PASSWORD_CATEGORY_VALIDATION_PATTERNS = new
Pattern[]{Pattern.compile("[A-Z]+"), Pattern.compile("[a-z]+"),
+ Pattern.compile("[0-9]+"), Pattern.compile("[^a-zA-Z0-9]+",
2)};
+ }
+}
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/sqlserver/setup.sql
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/sqlserver/setup.sql
new file mode 100644
index 0000000000..aadbc1baae
--- /dev/null
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/sqlserver/setup.sql
@@ -0,0 +1,26 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+-- http://www.apache.org/licenses/LICENSE-2.0
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- In production you would almost certainly limit the replication user must be
on the follower (slave) machine,
+-- to prevent other clients accessing the log from other machines. For
example, 'replicator'@'follower.acme.com'.
+-- However, in this database we'll grant 2 users different privileges:
+--
+-- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog
reader (used for testing)
+-- 2) 'inlong' - all privileges
+--
+if exists(select 1 from sys.databases where name='master' and is_cdc_enabled=0)
+begin
+exec sys.sp_cdc_enable_db
+end
\ No newline at end of file
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/sqlserver_test.sql
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/sqlserver_test.sql
new file mode 100644
index 0000000000..8bbd55d736
--- /dev/null
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/sqlserver_test.sql
@@ -0,0 +1,36 @@
+CREATE TABLE test_input1 (
+ `id` INT,
+ name STRING,
+ description STRING,
+ PRIMARY KEY(`id`) NOT ENFORCED
+) WITH (
+ 'connector' = 'sqlserver-cdc-inlong',
+ 'hostname' = 'sqlserver',
+ 'port' = '1433',
+ 'username' = 'sa',
+ 'password' = 'A_Str0ng_Required_Password',
+ 'database-name' = 'test',
+ 'table-name' = 'test_input1',
+ 'schema-name' = 'dbo'
+);
+CREATE TABLE test_output1 (
+ `id` INT,
+ name STRING,
+ description STRING
+) WITH (
+ 'connector' = 'starrocks-inlong',
+ 'jdbc-url' = 'jdbc:mysql://starrocks:9030',
+ 'load-url'='starrocks:8030',
+ 'database-name'='test',
+ 'table-name' = 'test_output1',
+ 'username' = 'inlong',
+ 'password' = 'inlong',
+ 'sink.properties.format' = 'json',
+ 'sink.properties.strip_outer_array' = 'true',
+ 'sink.buffer-flush.interval-ms' = '1000'
+);
+
+INSERT INTO test_output1 select * from test_input1;
+
+
+
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
index 73a81bf262..bf102894cb 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
@@ -34,6 +34,7 @@
<modules>
<module>postgres-cdc</module>
<module>starrocks</module>
+ <module>sqlserver-cdc</module>
<module>mysql-cdc</module>
<module>iceberg</module>
</modules>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/pom.xml
new file mode 100644
index 0000000000..d0e49f7184
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/pom.xml
@@ -0,0 +1,133 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connectors-v1.15</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>sort-connector-sqlserver-cdc-v1.15</artifactId>
+ <packaging>jar</packaging>
+ <name>Apache InLong - Sort-connector-sqlserver-cdc</name>
+
+ <properties>
+
<inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.ververica</groupId>
+ <artifactId>flink-connector-sqlserver-cdc</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.ververica</groupId>
+ <artifactId>flink-connector-debezium</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-log4j-appender</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka-clients.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <artifactSet>
+ <includes>
+ <include>org.apache.inlong:*</include>
+ <include>io.debezium:debezium-api</include>
+
<include>io.debezium:debezium-embedded</include>
+
<include>io.debezium:debezium-core</include>
+
<include>io.debezium:debezium-connector-sqlserver</include>
+ <include>com.ververica:*</include>
+
<include>com.microsoft.sqlserver:*</include>
+ <include>org.apache.kafka:*</include>
+ <include>com.fasterxml.*:*</include>
+ <include>com.google.guava:*</include>
+ <!-- Include fixed version 18.0-13.0 of
flink shaded guava -->
+
<include>org.apache.flink:flink-shaded-guava</include>
+ <include>com.google.protobuf:*</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+
<artifact>org.apache.inlong:sort-connector-*</artifact>
+ <includes>
+ <include>org/apache/inlong/**</include>
+
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
+ </includes>
+ </filter>
+ <filter>
+ <artifact>org.apache.kafka:*</artifact>
+ <excludes>
+
<exclude>kafka/kafka-version.properties</exclude>
+ <exclude>LICENSE</exclude>
+ <!-- Does not contain anything
relevant.
+ Cites a binary dependency on
jersey, but this is neither reflected in the
+ dependency graph, nor are any
jersey files bundled. -->
+ <exclude>NOTICE</exclude>
+ <exclude>common/**</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.kafka</pattern>
+
<shadedPattern>org.apache.inlong.sort.shaded.org.apache.kafka</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google</pattern>
+
<shadedPattern>org.apache.inlong.sort.shaded.com.google</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.fasterxml</pattern>
+
<shadedPattern>org.apache.inlong.sort.shaded.com.fasterxml</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlserverTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlserverTableFactory.java
new file mode 100644
index 0000000000..bab8511709
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlserverTableFactory.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.sqlserver;
+
+import com.ververica.cdc.connectors.sqlserver.table.SqlServerTableSource;
+import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.time.ZoneId;
+import java.util.HashSet;
+import java.util.Set;
+
+import static
com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
+import static
com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
+import static
com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
+
+/** Factory for creating configured instance of {@link
com.ververica.cdc.connectors.sqlserver.SqlServerSource}. */
+public class SqlserverTableFactory implements DynamicTableSourceFactory {
+
+ private static final String IDENTIFIER = "sqlserver-cdc-inlong";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ private static final ConfigOption<String> SCHEMA_NAME =
+ ConfigOptions.key("schema-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Schema name of the SqlServer database to
monitor.");
+
+ private static final ConfigOption<String> HOSTNAME =
+ ConfigOptions.key("hostname")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("IP address or hostname of the SqlServer
database server.");
+
+ private static final ConfigOption<Integer> PORT =
+ ConfigOptions.key("port")
+ .intType()
+ .defaultValue(1433)
+ .withDescription("Integer port number of the SqlServer
database server.");
+
+ private static final ConfigOption<String> USERNAME =
+ ConfigOptions.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Name of the SqlServer database to use when
connecting to the SqlServer database server.");
+
+ private static final ConfigOption<String> PASSWORD =
+ ConfigOptions.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Password to use when connecting to the SqlServer
database server.");
+
+ private static final ConfigOption<String> DATABASE_NAME =
+ ConfigOptions.key("database-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Database name of the SqlServer server to
monitor.");
+
+ private static final ConfigOption<String> TABLE_NAME =
+ ConfigOptions.key("table-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Table name of the SqlServer database to
monitor.");
+
+ public static final ConfigOption<String> SERVER_TIME_ZONE =
+ ConfigOptions.key("server-time-zone")
+ .stringType()
+ .defaultValue("UTC")
+ .withDescription("The session time zone in database
server.");
+
+ public static final ConfigOption<String> SCAN_STARTUP_MODE =
+ ConfigOptions.key("scan.startup.mode")
+ .stringType()
+ .defaultValue("initial")
+ .withDescription(
+ "Optional startup mode for SqlServer CDC consumer,
valid enumerations are "
+ + "\"initial\", \"initial-only\",
\"latest-offset\"");
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(HOSTNAME);
+ options.add(USERNAME);
+ options.add(PASSWORD);
+ options.add(DATABASE_NAME);
+ options.add(SCHEMA_NAME);
+ options.add(TABLE_NAME);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(PORT);
+ options.add(SERVER_TIME_ZONE);
+ options.add(SCAN_STARTUP_MODE);
+
+ return options;
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ final FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(this, context);
+ helper.validateExcept(DEBEZIUM_OPTIONS_PREFIX);
+
+ final ReadableConfig config = helper.getOptions();
+ String hostname = config.get(HOSTNAME);
+ String username = config.get(USERNAME);
+ String password = config.get(PASSWORD);
+ String databaseName = config.get(DATABASE_NAME);
+ String tableName = config.get(TABLE_NAME);
+ String schemaName = config.get(SCHEMA_NAME);
+ ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
+ int port = config.get(PORT);
+ ResolvedSchema physicalSchema =
+
getPhysicalSchema(context.getCatalogTable().getResolvedSchema());
+
+ return new SqlServerTableSource(
+ physicalSchema,
+ port,
+ hostname,
+ databaseName,
+ schemaName,
+ tableName,
+ serverTimeZone,
+ username,
+ password,
+ getDebeziumProperties(context.getCatalogTable().getOptions()),
+ getStartupOptions(config));
+ }
+
+ private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
+ private static final String SCAN_STARTUP_MODE_VALUE_INITIAL_ONLY =
"initial-only";
+ private static final String SCAN_STARTUP_MODE_VALUE_LATEST =
"latest-offset";
+
+ private static com.ververica.cdc.connectors.sqlserver.table.StartupOptions
getStartupOptions(
+ ReadableConfig config) {
+ String modeString = config.get(SCAN_STARTUP_MODE);
+
+ switch (modeString.toLowerCase()) {
+ case SCAN_STARTUP_MODE_VALUE_INITIAL:
+ return
com.ververica.cdc.connectors.sqlserver.table.StartupOptions.initial();
+
+ case SCAN_STARTUP_MODE_VALUE_INITIAL_ONLY:
+ return
com.ververica.cdc.connectors.sqlserver.table.StartupOptions.initialOnly();
+
+ case SCAN_STARTUP_MODE_VALUE_LATEST:
+ return StartupOptions.latest();
+
+ default:
+ throw new ValidationException(
+ String.format(
+ "Invalid value for option '%s'. Supported
values are [%s, %s, %s], but was: %s",
+ SCAN_STARTUP_MODE.key(),
+ SCAN_STARTUP_MODE_VALUE_INITIAL,
+ SCAN_STARTUP_MODE_VALUE_INITIAL_ONLY,
+ SCAN_STARTUP_MODE_VALUE_LATEST,
+ modeString));
+ }
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000..920275133f
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.sqlserver.SqlserverTableFactory
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index e83548b61e..dbc5201855 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -1187,6 +1187,11 @@ The text of each license is also included at
licenses/LICENSE-[project].txt.
com.microsoft.sqlserver:mssql-jdbc:7.2.2.jre8 - Microsoft JDBC Driver for
SQL Server (https://github.com/Microsoft/mssql-jdbc/tree/v7.2.2), (MIT License)
org.slf4j:slf4j-api:1.7.36 - SLF4J API Module (http://www.slf4j.org), (MIT
License)
+Codes:
+ 1.
inlong-sort/sort-end-to-end-tests-v1.15/src/test/org/apache/inlong/sort/tests/utils/MSSQLServerContainer.java
+
+ Source: testcontainers-java:mssqlserver:1.17
+ License:
https://github.com/testcontainers/testcontainers-java/blob/main/LICENSE
========================================================================
CDDL licenses
diff --git
a/licenses/inlong-sort-connectors/licenses/LICENSE-testcontainers-java-mssqlserver.txt
b/licenses/inlong-sort-connectors/licenses/LICENSE-testcontainers-java-mssqlserver.txt
new file mode 100644
index 0000000000..2e38890202
--- /dev/null
+++
b/licenses/inlong-sort-connectors/licenses/LICENSE-testcontainers-java-mssqlserver.txt
@@ -0,0 +1,21 @@
+The MIT License (MIT)
+
+Copyright (c) 2015-2019 Richard North
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 84cc943718..3bfb8d06f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,6 +102,7 @@
<postgresql.version>42.4.3</postgresql.version>
<oracle.jdbc.version>19.3.0.0</oracle.jdbc.version>
<mysql.jdbc.version>8.0.28</mysql.jdbc.version>
+ <mssql.jdbc.version>12.4.1.jre8</mssql.jdbc.version>
<sqlserver.jdbc.version>7.2.2.jre8</sqlserver.jdbc.version>
<mybatis.starter.version>2.1.3</mybatis.starter.version>
<mybatis.version>3.5.9</mybatis.version>