This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new edcaacecb1 [Feature][Connector] update sqlserver catalog for save mode
(#6086)
edcaacecb1 is described below
commit edcaacecb13f432c0ce0022aa6febc2f9de450de
Author: 老王 <[email protected]>
AuthorDate: Wed Jan 3 11:28:59 2024 +0800
[Feature][Connector] update sqlserver catalog for save mode (#6086)
---
.../jdbc/catalog/sqlserver/SqlServerCatalog.java | 10 +++++++
.../connectors/seatunnel/jdbc/JdbcSqlServerIT.java | 35 ++++++++++++++++++++++
2 files changed, 45 insertions(+)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
index 478ef873c7..bf58323ba0 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
@@ -237,4 +237,14 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
Connection defaultConnection = getConnection(defaultUrl);
return CatalogUtils.getCatalogTable(defaultConnection, sqlQuery, new
SqlserverTypeMapper());
}
+
+ @Override
+ public String getExistDataSql(TablePath tablePath) {
+ return String.format("select TOP 1 * from %s ;",
tablePath.getFullNameWithQuoted("[", "]"));
+ }
+
+ @Override
+ protected String getTruncateTableSql(TablePath tablePath) throws
CatalogException {
+ return String.format("TRUNCATE TABLE %s",
tablePath.getFullNameWithQuoted("[", "]"));
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
index e56fa37573..e871d81fd1 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
@@ -17,14 +17,19 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerCatalog;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerURLParser;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.commons.lang3.tuple.Pair;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.MSSQLServerContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -190,4 +195,34 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
SQLSERVER_SCHEMA);
catalog.open();
}
+
+ @TestTemplate
+ public void testCatalog(TestContainer container) throws IOException,
InterruptedException {
+ TablePath tablePathSqlserver = TablePath.of("master", "dbo", "source");
+ TablePath tablePathSqlserver_Sink = TablePath.of("master", "dbo",
"sink_lw");
+ SqlServerCatalog sqlServerCatalog = (SqlServerCatalog) catalog;
+ CatalogTable catalogTable =
sqlServerCatalog.getTable(tablePathSqlserver);
+ // sink tableExists ?
+ boolean tableExistsBefore =
sqlServerCatalog.tableExists(tablePathSqlserver_Sink);
+ Assertions.assertFalse(tableExistsBefore);
+ // create table
+ sqlServerCatalog.createTable(tablePathSqlserver_Sink, catalogTable,
true);
+ boolean tableExistsAfter =
sqlServerCatalog.tableExists(tablePathSqlserver_Sink);
+ Assertions.assertTrue(tableExistsAfter);
+ // isExistsData ?
+ boolean existsDataBefore =
sqlServerCatalog.isExistsData(tablePathSqlserver_Sink);
+ Assertions.assertFalse(existsDataBefore);
+ // insert one data
+ sqlServerCatalog.executeSql(
+ tablePathSqlserver_Sink, "insert into sink_lw(age, name)
values(12,'laowang')");
+ boolean existsDataAfter =
sqlServerCatalog.isExistsData(tablePathSqlserver_Sink);
+ Assertions.assertTrue(existsDataAfter);
+ // truncateTable
+ sqlServerCatalog.truncateTable(tablePathSqlserver_Sink, true);
+
Assertions.assertFalse(sqlServerCatalog.isExistsData(tablePathSqlserver_Sink));
+ // drop table
+ sqlServerCatalog.dropTable(tablePathSqlserver_Sink, true);
+
Assertions.assertFalse(sqlServerCatalog.tableExists(tablePathSqlserver_Sink));
+ sqlServerCatalog.close();
+ }
}