This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5795b265cc [Fix][StarRocks] Fix NPE when upstream catalogtable table
path only have table name part (#6540)
5795b265cc is described below
commit 5795b265ccbb66466220da3e8fd251cf551a6ebb
Author: Jarvis <[email protected]>
AuthorDate: Fri Mar 29 11:28:30 2024 +0800
[Fix][StarRocks] Fix NPE when upstream catalogtable table path only have
table name part (#6540)
---
.../starrocks/sink/StarRocksSinkFactory.java | 10 +-
.../e2e/connector/starrocks/StarRocksIT.java | 34 ++++++-
.../src/test/resources/fake-to-starrocks.conf | 106 +++++++++++++++++++++
3 files changed, 145 insertions(+), 5 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index 08fc690698..f05f912b6f 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -89,13 +89,15 @@ public class StarRocksSinkFactory implements
TableSinkFactory {
String sinkDatabaseName = sinkConfig.getDatabase();
String sinkTableName = sinkConfig.getTable();
// to replace
- String finalDatabaseName =
- sinkDatabaseName.replace(REPLACE_DATABASE_NAME_KEY,
sourceDatabaseName);
+ sinkDatabaseName =
+ sinkDatabaseName.replace(
+ REPLACE_DATABASE_NAME_KEY,
+ sourceDatabaseName != null ? sourceDatabaseName : "");
String finalTableName = this.replaceFullTableName(sinkTableName,
tableId);
// rebuild TableIdentifier and catalogTable
TableIdentifier newTableId =
TableIdentifier.of(
- tableId.getCatalogName(), finalDatabaseName, null,
finalTableName);
+ tableId.getCatalogName(), sinkDatabaseName, null,
finalTableName);
catalogTable =
CatalogTable.of(
newTableId,
@@ -107,7 +109,7 @@ public class StarRocksSinkFactory implements
TableSinkFactory {
CatalogTable finalCatalogTable = catalogTable;
// reset
sinkConfig.setTable(finalTableName);
- sinkConfig.setDatabase(finalDatabaseName);
+ sinkConfig.setDatabase(sinkDatabaseName);
return () -> new StarRocksSink(sinkConfig, finalCatalogTable,
context.getOptions());
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
index ff8a934cb4..783b0416ba 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
@@ -110,6 +110,31 @@ public class StarRocksIT extends TestSuiteBase implements
TestResource {
+ "\"storage_format\" = \"DEFAULT\""
+ ")";
+ private static final String DDL_FAKE_SINK_TABLE =
+ "create table "
+ + DATABASE
+ + "."
+ + "fake_table_sink"
+ + " (\n"
+ + " id BIGINT,\n"
+ + " c_string STRING,\n"
+ + " c_boolean BOOLEAN,\n"
+ + " c_tinyint TINYINT,\n"
+ + " c_int INT,\n"
+ + " c_bigint BIGINT,\n"
+ + " c_float FLOAT,\n"
+ + " c_double DOUBLE,\n"
+ + " c_decimal Decimal(2, 1),\n"
+ + " c_date DATE\n"
+ + ")ENGINE=OLAP\n"
+ + "DUPLICATE KEY(`id`)\n"
+ + "DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
+ + "PROPERTIES (\n"
+ + "\"replication_num\" = \"1\",\n"
+ + "\"in_memory\" = \"false\","
+ + "\"storage_format\" = \"DEFAULT\""
+ + ")";
+
private static final String INIT_DATA_SQL =
"insert into "
+ DATABASE
@@ -253,6 +278,13 @@ public class StarRocksIT extends TestSuiteBase implements
TestResource {
}
}
+ @TestTemplate
+ public void testSinkWithCatalogTableNameOnly(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/fake-to-starrocks.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
private void initializeJdbcConnection()
throws SQLException, ClassNotFoundException, MalformedURLException,
InstantiationException, IllegalAccessException {
@@ -274,7 +306,7 @@ public class StarRocksIT extends TestSuiteBase implements
TestResource {
// create source table
statement.execute(DDL_SOURCE);
// create sink table
- // statement.execute(DDL_SINK);
+ statement.execute(DDL_FAKE_SINK_TABLE);
} catch (SQLException e) {
throw new RuntimeException("Initializing table failed!", e);
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/fake-to-starrocks.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/fake-to-starrocks.conf
new file mode 100644
index 0000000000..3dca9e725c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/fake-to-starrocks.conf
@@ -0,0 +1,106 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ parallelism = 1
+ result_table_name = "fake"
+ row.num = 100
+ schema {
+ table = "FakeTable"
+ columns = [
+ {
+ name = id
+ type = bigint
+ nullable = false
+ defaultValue = 0
+ },
+ {
+ name = c_string
+ type = string
+ nullable = true
+ },
+ {
+ name = c_boolean
+ type = boolean
+ nullable = true
+ },
+ {
+ name = c_tinyint
+ type = tinyint
+ nullable = true
+ },
+ {
+ name = c_int
+ type = int
+ nullable = true
+ },
+ {
+ name = c_bigint
+ type = bigint
+ nullable = true
+ },
+ {
+ name = c_float
+ type = float
+ nullable = true
+ },
+ {
+ name = c_double
+ type = double
+ nullable = true
+ },
+ {
+ name = c_decimal
+ type = "decimal(2, 1)"
+ nullable = true
+ },
+ {
+ name = c_date
+ type = date
+ nullable = true
+ }
+ ]
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ StarRocks {
+ source_table_name = "fake"
+ nodeUrls = ["starrocks_e2e:8030"]
+ username = root
+ password = ""
+ database = "test"
+ table = "fake_table_sink"
+ batch_max_rows = 100
+ max_retries = 3
+ base-url="jdbc:mysql://starrocks_e2e:9030/test"
+ starrocks.config = {
+ format = "JSON"
+ strip_outer_array = true
+ }
+ }
+}
\ No newline at end of file