This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new fafe05a382 [INLONG-8999][Sort] Add Sqlserver connector on flink 1.15 
(#9008)
fafe05a382 is described below

commit fafe05a38276c2a8c264c976eed0427a133fa50f
Author: EpicMo <[email protected]>
AuthorDate: Mon Oct 9 18:46:16 2023 +0800

    [INLONG-8999][Sort] Add Sqlserver connector on flink 1.15 (#9008)
---
 .../src/main/assemblies/sort-connectors-v1.15.xml  |   8 +
 inlong-sort/sort-core/pom.xml                      |   6 +
 .../sort-end-to-end-tests-v1.15/pom.xml            |  22 +++
 .../apache/inlong/sort/tests/MysqlToRocksTest.java |   3 +-
 ...ocksTest.java => SqlserverToStarRocksTest.java} | 100 +++++++----
 .../sort/tests/utils/MSSQLServerContainer.java     | 148 ++++++++++++++++
 .../src/test/resources/docker/sqlserver/setup.sql  |  26 +++
 .../src/test/resources/flinkSql/sqlserver_test.sql |  36 ++++
 .../sort-flink-v1.15/sort-connectors/pom.xml       |   1 +
 .../sort-connectors/sqlserver-cdc/pom.xml          | 133 +++++++++++++++
 .../sort/sqlserver/SqlserverTableFactory.java      | 190 +++++++++++++++++++++
 .../org.apache.flink.table.factories.Factory       |  16 ++
 licenses/inlong-sort-connectors/LICENSE            |   5 +
 .../LICENSE-testcontainers-java-mssqlserver.txt    |  21 +++
 pom.xml                                            |   1 +
 15 files changed, 678 insertions(+), 38 deletions(-)

diff --git a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml 
b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
index 184e727a91..a91ea91e24 100644
--- a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
+++ b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
@@ -51,6 +51,14 @@
             </includes>
             <fileMode>0644</fileMode>
         </fileSet>
+        <fileSet>
+            
<directory>../inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/target</directory>
+            <outputDirectory>inlong-sort/connectors</outputDirectory>
+            <includes>
+                
<include>sort-connector-sqlserver-cdc-v1.15-${project.version}.jar</include>
+            </includes>
+            <fileMode>0644</fileMode>
+        </fileSet>
         <fileSet>
             
<directory>../inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/target</directory>
             <outputDirectory>inlong-sort/connectors</outputDirectory>
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index b18604b67d..8e78f08a1c 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -257,6 +257,12 @@
                     <version>${project.version}</version>
                     <scope>test</scope>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.inlong</groupId>
+                    <artifactId>sort-connector-sqlserver-cdc-v1.15</artifactId>
+                    <version>${project.version}</version>
+                    <scope>test</scope>
+                </dependency>
                 <dependency>
                     <groupId>org.apache.inlong</groupId>
                     <artifactId>sort-connector-mysql-cdc-v1.15</artifactId>
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
index 6841331770..37244cf14f 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
@@ -52,6 +52,12 @@
             <artifactId>mysql-connector-java</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.microsoft.sqlserver</groupId>
+            <artifactId>mssql-jdbc</artifactId>
+            <version>12.4.1.jre8</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.postgresql</groupId>
             <artifactId>postgresql</artifactId>
@@ -157,6 +163,14 @@
                             <type>jar</type>
                             
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
                         </dependency>
+                        <dependency>
+                            <groupId>com.microsoft.sqlserver</groupId>
+                            <artifactId>mssql-jdbc</artifactId>
+                            <version>${mssql.jdbc.version}</version>
+                            <destFileName>mssql-driver.jar</destFileName>
+                            <type>jar</type>
+                            
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                        </dependency>
                         <artifactItem>
                             <groupId>org.apache.inlong</groupId>
                             
<artifactId>sort-connector-postgres-cdc-v1.15</artifactId>
@@ -181,6 +195,14 @@
                             <type>jar</type>
                             
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
                         </artifactItem>
+                        <artifactItem>
+                            <groupId>org.apache.inlong</groupId>
+                            
<artifactId>sort-connector-sqlserver-cdc-v1.15</artifactId>
+                            <version>${project.version}</version>
+                            
<destFileName>sort-connector-sqlserver-cdc.jar</destFileName>
+                            <type>jar</type>
+                            
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                        </artifactItem>
                     </artifactItems>
                 </configuration>
                 <executions>
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksTest.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksTest.java
index e02501cfd1..0b777cf429 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksTest.java
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksTest.java
@@ -47,6 +47,7 @@ import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINE
 import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG;
 import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName;
 import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable;
+import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.buildStarRocksImage;
 
 /**
  * End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar.
@@ -65,7 +66,7 @@ public class MysqlToRocksTest extends FlinkContainerTestEnv {
         try {
             sqlFile =
                     
Paths.get(MysqlToRocksTest.class.getResource("/flinkSql/mysql_test.sql").toURI()).toString();
-            StarRocksManager.buildStarRocksImage();
+            buildStarRocksImage();
         } catch (URISyntaxException e) {
             throw new RuntimeException(e);
         }
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksTest.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/SqlserverToStarRocksTest.java
similarity index 53%
copy from 
inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksTest.java
copy to 
inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/SqlserverToStarRocksTest.java
index e02501cfd1..01ab9eadb1 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/MysqlToRocksTest.java
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/SqlserverToStarRocksTest.java
@@ -19,9 +19,8 @@ package org.apache.inlong.sort.tests;
 
 import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv;
 import org.apache.inlong.sort.tests.utils.JdbcProxy;
-import org.apache.inlong.sort.tests.utils.MySqlContainer;
+import org.apache.inlong.sort.tests.utils.MSSQLServerContainer;
 import org.apache.inlong.sort.tests.utils.StarRocksContainer;
-import org.apache.inlong.sort.tests.utils.StarRocksManager;
 import org.apache.inlong.sort.tests.utils.TestUtils;
 
 import org.junit.AfterClass;
@@ -31,6 +30,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerImageName;
 
 import java.net.URISyntaxException;
 import java.nio.file.Path;
@@ -47,25 +47,24 @@ import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINE
 import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG;
 import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName;
 import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable;
+import static 
org.apache.inlong.sort.tests.utils.StarRocksManager.buildStarRocksImage;
 
-/**
- * End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar.
- * Test flink sql Mysql cdc to StarRocks
- */
-public class MysqlToRocksTest extends FlinkContainerTestEnv {
+public class SqlserverToStarRocksTest extends FlinkContainerTestEnv {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(MysqlToRocksTest.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(SqlserverToStarRocksTest.class);
 
-    private static final Path mysqlJar = 
TestUtils.getResource("sort-connector-mysql-cdc.jar");
+    private static final Path sqlserverJar = 
TestUtils.getResource("sort-connector-sqlserver-cdc.jar");
     private static final Path jdbcJar = 
TestUtils.getResource("sort-connector-starrocks.jar");
-    private static final Path mysqlJdbcJar = 
TestUtils.getResource("mysql-driver.jar");
+
+    private static final Path mysqlJar = 
TestUtils.getResource("mysql-driver.jar");
+
     private static final String sqlFile;
 
     static {
         try {
-            sqlFile =
-                    
Paths.get(MysqlToRocksTest.class.getResource("/flinkSql/mysql_test.sql").toURI()).toString();
-            StarRocksManager.buildStarRocksImage();
+            sqlFile = 
Paths.get(SqlserverToStarRocksTest.class.getResource("/flinkSql/sqlserver_test.sql").toURI())
+                    .toString();
+            buildStarRocksImage();
         } catch (URISyntaxException e) {
             throw new RuntimeException(e);
         }
@@ -81,34 +80,55 @@ public class MysqlToRocksTest extends FlinkContainerTestEnv 
{
                     .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));
 
     @ClassRule
-    public static final MySqlContainer MYSQL_CONTAINER =
-            (MySqlContainer) new 
MySqlContainer(MySqlContainer.MySqlVersion.V8_0)
-                    .withDatabaseName("test")
+    public static final MSSQLServerContainer SQLSERVER_CONTAINER = 
(MSSQLServerContainer) new MSSQLServerContainer(
+            
DockerImageName.parse("mcr.microsoft.com/mssql/server").withTag("2022-latest"))
+                    .acceptLicense()
                     .withNetwork(NETWORK)
-                    .withNetworkAliases("mysql")
+                    .withNetworkAliases("sqlserver")
                     .withLogConsumer(new Slf4jLogConsumer(LOG));
 
     @Before
     public void setup() {
         waitUntilJobRunning(Duration.ofSeconds(30));
-        initializeMysqlTable();
+        initializeSqlserverTable();
         initializeStarRocksTable(STAR_ROCKS);
     }
 
-    private void initializeMysqlTable() {
+    private void initializeSqlserverTable() {
         try {
-            Class.forName(MYSQL_CONTAINER.getDriverClassName());
+            // Waiting for MSSQL Agent started.
+            LOG.info("Sleep until the MSSQL Agent is started...");
+            Thread.sleep(20 * 1000);
+            LOG.info("Now continue initialize task...");
+            Class.forName(SQLSERVER_CONTAINER.getDriverClassName());
             Connection conn = DriverManager
-                    .getConnection(MYSQL_CONTAINER.getJdbcUrl(), 
MYSQL_CONTAINER.getUsername(),
-                            MYSQL_CONTAINER.getPassword());
+                    .getConnection(SQLSERVER_CONTAINER.getJdbcUrl(), 
SQLSERVER_CONTAINER.getUsername(),
+                            SQLSERVER_CONTAINER.getPassword());
             Statement stat = conn.createStatement();
+            stat.execute("CREATE DATABASE test;");
+            stat.execute("USE test;");
             stat.execute(
-                    "CREATE TABLE test_input1 (\n"
-                            + "  id SERIAL,\n"
-                            + "  name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
-                            + "  description VARCHAR(512),\n"
-                            + "  PRIMARY  KEY(id)\n"
-                            + ");");
+                    "CREATE TABLE test_input1 (\n" +
+                            "    id INT IDENTITY(1,1) NOT NULL PRIMARY KEY,\n" 
+
+                            "    name NVARCHAR(255) NOT NULL DEFAULT 
'flink',\n" +
+                            "    description NVARCHAR(512)\n" +
+                            ");");
+            stat.execute("if exists(select 1 from sys.databases where 
name='test' and is_cdc_enabled=0)\n" +
+                    "begin\n" +
+                    "    exec sys.sp_cdc_enable_db\n" +
+                    "end");
+            stat.execute("IF EXISTS(SELECT 1 FROM sys.tables WHERE 
name='test_input1' AND is_tracked_by_cdc = 0)\n" +
+                    "BEGIN\n" +
+                    "    EXEC sys.sp_cdc_enable_table\n" +
+                    "        @source_schema = 'dbo', -- source_schema\n" +
+                    "        @source_name = 'test_input1', -- table_name\n" +
+                    "        @capture_instance = NULL, -- capture_instance\n" +
+                    "        @supports_net_changes = '1', -- 
capture_instance\n" +
+                    "        @index_name = NULL,  -- \n" +
+                    "        @captured_column_list  = NULL, -- \n" +
+                    "        @filegroup_name = 'PRIMARY', -- \n" +
+                    "        @role_name = NULL -- role_name\n" +
+                    "END");
             stat.close();
             conn.close();
         } catch (Exception e) {
@@ -118,8 +138,8 @@ public class MysqlToRocksTest extends FlinkContainerTestEnv 
{
 
     @AfterClass
     public static void teardown() {
-        if (MYSQL_CONTAINER != null) {
-            MYSQL_CONTAINER.stop();
+        if (SQLSERVER_CONTAINER != null) {
+            SQLSERVER_CONTAINER.stop();
         }
         if (STAR_ROCKS != null) {
             STAR_ROCKS.stop();
@@ -132,20 +152,26 @@ public class MysqlToRocksTest extends 
FlinkContainerTestEnv {
      * @throws Exception The exception may throws when execute the case
      */
     @Test
-    public void testMysqlUpdateAndDelete() throws Exception {
-        submitSQLJob(sqlFile, jdbcJar, mysqlJar, mysqlJdbcJar);
+    public void testSqlserverUpdateAndDelete() throws Exception {
+        submitSQLJob(sqlFile, jdbcJar, sqlserverJar, mysqlJar);
         waitUntilJobRunning(Duration.ofSeconds(10));
 
         // generate input
         try (Connection conn =
-                DriverManager.getConnection(MYSQL_CONTAINER.getJdbcUrl(), 
MYSQL_CONTAINER.getUsername(),
-                        MYSQL_CONTAINER.getPassword());
+                DriverManager.getConnection(SQLSERVER_CONTAINER.getJdbcUrl(), 
SQLSERVER_CONTAINER.getUsername(),
+                        SQLSERVER_CONTAINER.getPassword());
                 Statement stat = conn.createStatement()) {
+            stat.execute("USE test;");
             stat.execute(
-                    "INSERT INTO test_input1 "
-                            + "VALUES (1,'jacket','water resistent white wind 
breaker');");
+                    "SET IDENTITY_INSERT test_input1 ON;" +
+                            "INSERT INTO test_input1 (id, name, description) "
+                            + "VALUES (1, 'jacket','water resistent white wind 
breaker');" +
+                            "SET IDENTITY_INSERT test_input1 OFF;");
             stat.execute(
-                    "INSERT INTO test_input1 VALUES (2,'scooter','Big 2-wheel 
scooter ');");
+                    "SET IDENTITY_INSERT test_input1 ON;" +
+                            "INSERT INTO test_input1 (id, name, description) " 
+
+                            "VALUES (2,'scooter','Big 2-wheel scooter ');" +
+                            "SET IDENTITY_INSERT test_input1 OFF;");
             stat.execute(
                     "update test_input1 set name = 'tom' where id = 2;");
             stat.execute(
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/MSSQLServerContainer.java
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/MSSQLServerContainer.java
new file mode 100644
index 0000000000..21cdf1b3f0
--- /dev/null
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/MSSQLServerContainer.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tests.utils;
+
+import com.google.common.collect.Sets;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.LicenseAcceptance;
+
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+/**
+ * Docker container for MSSQLServerContainer.
+ * Use {@link org.testcontainers.containers.MSSQLServerContainer} and edit 
some settings for test.
+ * */
+public class MSSQLServerContainer extends JdbcDatabaseContainer {
+
+    private static final DockerImageName DEFAULT_IMAGE_NAME = 
DockerImageName.parse("mcr.microsoft.com/mssql/server");
+    /** @deprecated */
+    @Deprecated
+    public static final String DEFAULT_TAG = "2017-CU12";
+    public static final String NAME = "sqlserver";
+    public static final String IMAGE;
+    public static final Integer MS_SQL_SERVER_PORT;
+    static final String DEFAULT_USER = "SA";
+    static final String DEFAULT_PASSWORD = "A_Str0ng_Required_Password";
+    private String password;
+    private static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;
+    private static final int DEFAULT_CONNECT_TIMEOUT_SECONDS = 240;
+    private static final Pattern[] PASSWORD_CATEGORY_VALIDATION_PATTERNS;
+
+    /** @deprecated */
+    @Deprecated
+    public MSSQLServerContainer() {
+        this(DEFAULT_IMAGE_NAME.withTag("2017-CU12"));
+    }
+
+    public MSSQLServerContainer(String dockerImageName) {
+        this(DockerImageName.parse(dockerImageName));
+    }
+
+    public MSSQLServerContainer(DockerImageName dockerImageName) {
+        super(dockerImageName);
+        this.password = "A_Str0ng_Required_Password";
+        dockerImageName.assertCompatibleWith(new 
DockerImageName[]{DEFAULT_IMAGE_NAME});
+        this.withStartupTimeoutSeconds(240);
+        this.withConnectTimeoutSeconds(240);
+        this.addExposedPort(MS_SQL_SERVER_PORT);
+    }
+
+    public Set<Integer> getLivenessCheckPortNumbers() {
+        return Sets.newHashSet(new Integer[]{MS_SQL_SERVER_PORT});
+    }
+
+    protected void configure() {
+        if (!this.getEnvMap().containsKey("ACCEPT_EULA")) {
+            LicenseAcceptance.assertLicenseAccepted(this.getDockerImageName());
+            this.acceptLicense();
+        }
+
+        this.addEnv("SA_PASSWORD", this.password);
+        this.addEnv("MSSQL_AGENT_ENABLED", "true");
+        this.addFixedExposedPort(14433, MS_SQL_SERVER_PORT);
+    }
+
+    public MSSQLServerContainer acceptLicense() {
+        this.addEnv("ACCEPT_EULA", "Y");
+        return this;
+    }
+
+    public String getDriverClassName() {
+        return "com.microsoft.sqlserver.jdbc.SQLServerDriver";
+    }
+
+    protected String constructUrlForConnection(String queryString) {
+
+        if (urlParameters.keySet().stream().map(sp -> ((String) 
sp).toLowerCase()).noneMatch("encrypt"::equals)) {
+            urlParameters.put("encrypt", "false");
+        }
+        return super.constructUrlForConnection(queryString);
+    }
+
+    public String getJdbcUrl() {
+        String additionalUrlParams = this.constructUrlParameters(";", ";");
+        return "jdbc:sqlserver://" + this.getHost() + ":" + 
this.getMappedPort(MS_SQL_SERVER_PORT)
+                + additionalUrlParams;
+    }
+
+    public String getUsername() {
+        return "SA";
+    }
+
+    public String getPassword() {
+        return this.password;
+    }
+
+    public String getTestQueryString() {
+        return "SELECT 1";
+    }
+
+    public MSSQLServerContainer withPassword(String password) {
+        this.checkPasswordStrength(password);
+        this.password = password;
+        return this;
+    }
+
+    private void checkPasswordStrength(String password) {
+        if (password == null) {
+            throw new IllegalArgumentException("Null password is not allowed");
+        } else if (password.length() < 8) {
+            throw new IllegalArgumentException("Password should be at least 8 
characters long");
+        } else if (password.length() > 128) {
+            throw new IllegalArgumentException("Password can be up to 128 
characters long");
+        } else {
+            long satisfiedCategories = 
Stream.of(PASSWORD_CATEGORY_VALIDATION_PATTERNS).filter((p) -> {
+                return p.matcher(password).find();
+            }).count();
+            if (satisfiedCategories < 3L) {
+                throw new IllegalArgumentException(
+                        "Password must contain characters from three of the 
following four categories:\n - Latin uppercase letters (A through Z)\n - Latin 
lowercase letters (a through z)\n - Base 10 digits (0 through 9)\n - 
Non-alphanumeric characters such as: exclamation point (!), dollar sign ($), 
number sign (#), or percent (%).");
+            }
+        }
+    }
+
+    static {
+        IMAGE = DEFAULT_IMAGE_NAME.getUnversionedPart();
+        MS_SQL_SERVER_PORT = 1433;
+        PASSWORD_CATEGORY_VALIDATION_PATTERNS = new 
Pattern[]{Pattern.compile("[A-Z]+"), Pattern.compile("[a-z]+"),
+                Pattern.compile("[0-9]+"), Pattern.compile("[^a-zA-Z0-9]+", 
2)};
+    }
+}
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/sqlserver/setup.sql
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/sqlserver/setup.sql
new file mode 100644
index 0000000000..aadbc1baae
--- /dev/null
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/docker/sqlserver/setup.sql
@@ -0,0 +1,26 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--   http://www.apache.org/licenses/LICENSE-2.0
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- In production you would almost certainly limit the replication user must be 
on the follower (slave) machine,
+-- to prevent other clients accessing the log from other machines. For 
example, 'replicator'@'follower.acme.com'.
+-- However, in this database we'll grant 2 users different privileges:
+--
+-- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog 
reader (used for testing)
+-- 2) 'inlong' - all privileges
+--
+if exists(select 1 from sys.databases where name='master' and is_cdc_enabled=0)
+begin
+exec sys.sp_cdc_enable_db
+end
\ No newline at end of file
diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/sqlserver_test.sql
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/sqlserver_test.sql
new file mode 100644
index 0000000000..8bbd55d736
--- /dev/null
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/sqlserver_test.sql
@@ -0,0 +1,36 @@
+CREATE TABLE test_input1 (
+    `id` INT,
+    name STRING,
+    description STRING,
+    PRIMARY KEY(`id`) NOT ENFORCED
+) WITH (
+    'connector' = 'sqlserver-cdc-inlong',
+    'hostname' = 'sqlserver',
+    'port' = '1433',
+    'username' = 'sa',
+    'password' = 'A_Str0ng_Required_Password',
+    'database-name' = 'test',
+    'table-name' = 'test_input1',
+    'schema-name' = 'dbo'
+);
+CREATE TABLE test_output1 (
+    `id` INT,
+    name STRING,
+    description STRING
+) WITH (
+    'connector' = 'starrocks-inlong',
+    'jdbc-url' = 'jdbc:mysql://starrocks:9030',
+    'load-url'='starrocks:8030',
+    'database-name'='test',
+    'table-name' = 'test_output1',
+    'username' = 'inlong',
+    'password' = 'inlong',
+    'sink.properties.format' = 'json',
+    'sink.properties.strip_outer_array' = 'true',
+    'sink.buffer-flush.interval-ms' = '1000'
+);
+
+INSERT INTO test_output1 select * from test_input1;
+
+
+
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
index 73a81bf262..bf102894cb 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
@@ -34,6 +34,7 @@
     <modules>
         <module>postgres-cdc</module>
         <module>starrocks</module>
+        <module>sqlserver-cdc</module>
         <module>mysql-cdc</module>
         <module>iceberg</module>
     </modules>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/pom.xml
new file mode 100644
index 0000000000..d0e49f7184
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/pom.xml
@@ -0,0 +1,133 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License. You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>sort-connectors-v1.15</artifactId>
+        <version>1.10.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>sort-connector-sqlserver-cdc-v1.15</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache InLong - Sort-connector-sqlserver-cdc</name>
+
+    <properties>
+        
<inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.ververica</groupId>
+            <artifactId>flink-connector-sqlserver-cdc</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.ververica</groupId>
+            <artifactId>flink-connector-debezium</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.kafka</groupId>
+                    <artifactId>kafka-log4j-appender</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka-clients.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-flink</id>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <artifactSet>
+                                <includes>
+                                    <include>org.apache.inlong:*</include>
+                                    <include>io.debezium:debezium-api</include>
+                                    
<include>io.debezium:debezium-embedded</include>
+                                    
<include>io.debezium:debezium-core</include>
+                                    
<include>io.debezium:debezium-connector-sqlserver</include>
+                                    <include>com.ververica:*</include>
+                                    
<include>com.microsoft.sqlserver:*</include>
+                                    <include>org.apache.kafka:*</include>
+                                    <include>com.fasterxml.*:*</include>
+                                    <include>com.google.guava:*</include>
+                                    <!--  Include fixed version 18.0-13.0 of 
flink shaded guava  -->
+                                    
<include>org.apache.flink:flink-shaded-guava</include>
+                                    <include>com.google.protobuf:*</include>
+                                </includes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    
<artifact>org.apache.inlong:sort-connector-*</artifact>
+                                    <includes>
+                                        <include>org/apache/inlong/**</include>
+                                        
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
+                                    </includes>
+                                </filter>
+                                <filter>
+                                    <artifact>org.apache.kafka:*</artifact>
+                                    <excludes>
+                                        
<exclude>kafka/kafka-version.properties</exclude>
+                                        <exclude>LICENSE</exclude>
+                                        <!-- Does not contain anything 
relevant.
+                                            Cites a binary dependency on 
jersey, but this is neither reflected in the
+                                            dependency graph, nor are any 
jersey files bundled. -->
+                                        <exclude>NOTICE</exclude>
+                                        <exclude>common/**</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.apache.kafka</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.shaded.org.apache.kafka</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.google</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.shaded.com.google</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.fasterxml</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.shaded.com.fasterxml</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlserverTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlserverTableFactory.java
new file mode 100644
index 0000000000..bab8511709
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlserverTableFactory.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.sqlserver;
+
+import com.ververica.cdc.connectors.sqlserver.table.SqlServerTableSource;
+import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.time.ZoneId;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
+import static 
com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
+import static 
com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
+
+/** Factory for creating configured instance of {@link 
com.ververica.cdc.connectors.sqlserver.SqlServerSource}. */
+public class SqlserverTableFactory implements DynamicTableSourceFactory {
+
+    private static final String IDENTIFIER = "sqlserver-cdc-inlong";
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    private static final ConfigOption<String> SCHEMA_NAME =
+            ConfigOptions.key("schema-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Schema name of the SqlServer database to 
monitor.");
+
+    private static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the SqlServer 
database server.");
+
+    private static final ConfigOption<Integer> PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(1433)
+                    .withDescription("Integer port number of the SqlServer 
database server.");
+
+    private static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the SqlServer database to use when 
connecting to the SqlServer database server.");
+
+    private static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the SqlServer 
database server.");
+
+    private static final ConfigOption<String> DATABASE_NAME =
+            ConfigOptions.key("database-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Database name of the SqlServer server to 
monitor.");
+
+    private static final ConfigOption<String> TABLE_NAME =
+            ConfigOptions.key("table-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Table name of the SqlServer database to 
monitor.");
+
+    public static final ConfigOption<String> SERVER_TIME_ZONE =
+            ConfigOptions.key("server-time-zone")
+                    .stringType()
+                    .defaultValue("UTC")
+                    .withDescription("The session time zone in database 
server.");
+
+    public static final ConfigOption<String> SCAN_STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
+                    .stringType()
+                    .defaultValue("initial")
+                    .withDescription(
+                            "Optional startup mode for SqlServer CDC consumer, 
valid enumerations are "
+                                    + "\"initial\", \"initial-only\", 
\"latest-offset\"");
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(HOSTNAME);
+        options.add(USERNAME);
+        options.add(PASSWORD);
+        options.add(DATABASE_NAME);
+        options.add(SCHEMA_NAME);
+        options.add(TABLE_NAME);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(PORT);
+        options.add(SERVER_TIME_ZONE);
+        options.add(SCAN_STARTUP_MODE);
+
+        return options;
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        final FactoryUtil.TableFactoryHelper helper =
+                FactoryUtil.createTableFactoryHelper(this, context);
+        helper.validateExcept(DEBEZIUM_OPTIONS_PREFIX);
+
+        final ReadableConfig config = helper.getOptions();
+        String hostname = config.get(HOSTNAME);
+        String username = config.get(USERNAME);
+        String password = config.get(PASSWORD);
+        String databaseName = config.get(DATABASE_NAME);
+        String tableName = config.get(TABLE_NAME);
+        String schemaName = config.get(SCHEMA_NAME);
+        ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
+        int port = config.get(PORT);
+        ResolvedSchema physicalSchema =
+                
getPhysicalSchema(context.getCatalogTable().getResolvedSchema());
+
+        return new SqlServerTableSource(
+                physicalSchema,
+                port,
+                hostname,
+                databaseName,
+                schemaName,
+                tableName,
+                serverTimeZone,
+                username,
+                password,
+                getDebeziumProperties(context.getCatalogTable().getOptions()),
+                getStartupOptions(config));
+    }
+
+    private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
+    private static final String SCAN_STARTUP_MODE_VALUE_INITIAL_ONLY = 
"initial-only";
+    private static final String SCAN_STARTUP_MODE_VALUE_LATEST = 
"latest-offset";
+
+    private static com.ververica.cdc.connectors.sqlserver.table.StartupOptions 
getStartupOptions(
+            ReadableConfig config) {
+        String modeString = config.get(SCAN_STARTUP_MODE);
+
+        switch (modeString.toLowerCase()) {
+            case SCAN_STARTUP_MODE_VALUE_INITIAL:
+                return 
com.ververica.cdc.connectors.sqlserver.table.StartupOptions.initial();
+
+            case SCAN_STARTUP_MODE_VALUE_INITIAL_ONLY:
+                return 
com.ververica.cdc.connectors.sqlserver.table.StartupOptions.initialOnly();
+
+            case SCAN_STARTUP_MODE_VALUE_LATEST:
+                return StartupOptions.latest();
+
+            default:
+                throw new ValidationException(
+                        String.format(
+                                "Invalid value for option '%s'. Supported 
values are [%s, %s, %s], but was: %s",
+                                SCAN_STARTUP_MODE.key(),
+                                SCAN_STARTUP_MODE_VALUE_INITIAL,
+                                SCAN_STARTUP_MODE_VALUE_INITIAL_ONLY,
+                                SCAN_STARTUP_MODE_VALUE_LATEST,
+                                modeString));
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000..920275133f
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.sqlserver.SqlserverTableFactory
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index e83548b61e..dbc5201855 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -1187,6 +1187,11 @@ The text of each license is also included at 
licenses/LICENSE-[project].txt.
   com.microsoft.sqlserver:mssql-jdbc:7.2.2.jre8 - Microsoft JDBC Driver for 
SQL Server (https://github.com/Microsoft/mssql-jdbc/tree/v7.2.2), (MIT License)
   org.slf4j:slf4j-api:1.7.36 - SLF4J API Module (http://www.slf4j.org), (MIT 
License)
 
+Codes:
+  1. 
inlong-sort/sort-end-to-end-tests-v1.15/src/test/org/apache/inlong/sort/tests/utils/MSSQLServerContainer.java
+
+  Source: testcontainers-java:mssqlserver:1.17
+  License: 
https://github.com/testcontainers/testcontainers-java/blob/main/LICENSE
 
 ========================================================================
 CDDL licenses
diff --git 
a/licenses/inlong-sort-connectors/licenses/LICENSE-testcontainers-java-mssqlserver.txt
 
b/licenses/inlong-sort-connectors/licenses/LICENSE-testcontainers-java-mssqlserver.txt
new file mode 100644
index 0000000000..2e38890202
--- /dev/null
+++ 
b/licenses/inlong-sort-connectors/licenses/LICENSE-testcontainers-java-mssqlserver.txt
@@ -0,0 +1,21 @@
+The MIT License (MIT)
+
+Copyright (c) 2015-2019 Richard North
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 84cc943718..3bfb8d06f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,6 +102,7 @@
         <postgresql.version>42.4.3</postgresql.version>
         <oracle.jdbc.version>19.3.0.0</oracle.jdbc.version>
         <mysql.jdbc.version>8.0.28</mysql.jdbc.version>
+        <mssql.jdbc.version>12.4.1.jre8</mssql.jdbc.version>
         <sqlserver.jdbc.version>7.2.2.jre8</sqlserver.jdbc.version>
         <mybatis.starter.version>2.1.3</mybatis.starter.version>
         <mybatis.version>3.5.9</mybatis.version>


Reply via email to