This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5795b265cc [Fix][StarRocks] Fix NPE when upstream catalogtable table 
path only have table name part (#6540)
5795b265cc is described below

commit 5795b265ccbb66466220da3e8fd251cf551a6ebb
Author: Jarvis <[email protected]>
AuthorDate: Fri Mar 29 11:28:30 2024 +0800

    [Fix][StarRocks] Fix NPE when upstream catalogtable table path only have 
table name part (#6540)
---
 .../starrocks/sink/StarRocksSinkFactory.java       |  10 +-
 .../e2e/connector/starrocks/StarRocksIT.java       |  34 ++++++-
 .../src/test/resources/fake-to-starrocks.conf      | 106 +++++++++++++++++++++
 3 files changed, 145 insertions(+), 5 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index 08fc690698..f05f912b6f 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -89,13 +89,15 @@ public class StarRocksSinkFactory implements 
TableSinkFactory {
         String sinkDatabaseName = sinkConfig.getDatabase();
         String sinkTableName = sinkConfig.getTable();
         // to replace
-        String finalDatabaseName =
-                sinkDatabaseName.replace(REPLACE_DATABASE_NAME_KEY, 
sourceDatabaseName);
+        sinkDatabaseName =
+                sinkDatabaseName.replace(
+                        REPLACE_DATABASE_NAME_KEY,
+                        sourceDatabaseName != null ? sourceDatabaseName : "");
         String finalTableName = this.replaceFullTableName(sinkTableName, 
tableId);
         // rebuild TableIdentifier and catalogTable
         TableIdentifier newTableId =
                 TableIdentifier.of(
-                        tableId.getCatalogName(), finalDatabaseName, null, 
finalTableName);
+                        tableId.getCatalogName(), sinkDatabaseName, null, 
finalTableName);
         catalogTable =
                 CatalogTable.of(
                         newTableId,
@@ -107,7 +109,7 @@ public class StarRocksSinkFactory implements 
TableSinkFactory {
         CatalogTable finalCatalogTable = catalogTable;
         // reset
         sinkConfig.setTable(finalTableName);
-        sinkConfig.setDatabase(finalDatabaseName);
+        sinkConfig.setDatabase(sinkDatabaseName);
         return () -> new StarRocksSink(sinkConfig, finalCatalogTable, 
context.getOptions());
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
index ff8a934cb4..783b0416ba 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
@@ -110,6 +110,31 @@ public class StarRocksIT extends TestSuiteBase implements 
TestResource {
                     + "\"storage_format\" = \"DEFAULT\""
                     + ")";
 
+    private static final String DDL_FAKE_SINK_TABLE =
+            "create table "
+                    + DATABASE
+                    + "."
+                    + "fake_table_sink"
+                    + " (\n"
+                    + "  id     BIGINT,\n"
+                    + "  c_string   STRING,\n"
+                    + "  c_boolean    BOOLEAN,\n"
+                    + "  c_tinyint    TINYINT,\n"
+                    + "  c_int        INT,\n"
+                    + "  c_bigint     BIGINT,\n"
+                    + "  c_float      FLOAT,\n"
+                    + "  c_double     DOUBLE,\n"
+                    + "  c_decimal    Decimal(2, 1),\n"
+                    + "  c_date       DATE\n"
+                    + ")ENGINE=OLAP\n"
+                    + "DUPLICATE KEY(`id`)\n"
+                    + "DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
+                    + "PROPERTIES (\n"
+                    + "\"replication_num\" = \"1\",\n"
+                    + "\"in_memory\" = \"false\","
+                    + "\"storage_format\" = \"DEFAULT\""
+                    + ")";
+
     private static final String INIT_DATA_SQL =
             "insert into "
                     + DATABASE
@@ -253,6 +278,13 @@ public class StarRocksIT extends TestSuiteBase implements 
TestResource {
         }
     }
 
+    @TestTemplate
+    public void testSinkWithCatalogTableNameOnly(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/fake-to-starrocks.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
     private void initializeJdbcConnection()
             throws SQLException, ClassNotFoundException, MalformedURLException,
                     InstantiationException, IllegalAccessException {
@@ -274,7 +306,7 @@ public class StarRocksIT extends TestSuiteBase implements 
TestResource {
             // create source table
             statement.execute(DDL_SOURCE);
             // create sink table
-            // statement.execute(DDL_SINK);
+            statement.execute(DDL_FAKE_SINK_TABLE);
         } catch (SQLException e) {
             throw new RuntimeException("Initializing table failed!", e);
         }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/fake-to-starrocks.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/fake-to-starrocks.conf
new file mode 100644
index 0000000000..3dca9e725c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/fake-to-starrocks.conf
@@ -0,0 +1,106 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    parallelism = 1
+    result_table_name = "fake"
+    row.num = 100
+    schema {
+        table = "FakeTable"
+        columns = [
+           {
+              name = id
+              type = bigint
+              nullable = false
+              defaultValue = 0
+           },
+           {
+              name = c_string
+              type = string
+              nullable = true
+           },
+           {
+              name = c_boolean
+              type = boolean
+              nullable = true
+           },
+           {
+              name = c_tinyint
+              type = tinyint
+              nullable = true
+           },
+           {
+              name = c_int
+              type = int
+              nullable = true
+           },
+           {
+              name = c_bigint
+              type = bigint
+              nullable = true
+           },
+           {
+              name = c_float
+              type = float
+              nullable = true
+           },
+          {
+             name = c_double
+             type = double
+             nullable = true
+          },
+          {
+             name = c_decimal
+             type = "decimal(2, 1)"
+             nullable = true
+          },
+          {
+             name = c_date
+             type = date
+             nullable = true
+          }
+       ]
+      }
+    }
+}
+
+transform {
+}
+
+sink {
+  StarRocks {
+    source_table_name = "fake"
+    nodeUrls = ["starrocks_e2e:8030"]
+    username = root
+    password = ""
+    database = "test"
+    table = "fake_table_sink"
+    batch_max_rows = 100
+    max_retries = 3
+    base-url="jdbc:mysql://starrocks_e2e:9030/test"
+    starrocks.config = {
+      format = "JSON"
+      strip_outer_array = true
+    }
+  }
+}
\ No newline at end of file

Reply via email to