This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 557159d3ce [feature](JdbcExternalCatalog) support insert data in
JdbcExternalCatalog (#16271)
557159d3ce is described below
commit 557159d3ceff022903839e45ab07d94c922d244d
Author: Tiewei Fang <[email protected]>
AuthorDate: Thu Feb 2 17:31:33 2023 +0800
[feature](JdbcExternalCatalog) support insert data in JdbcExternalCatalog
(#16271)
---
be/src/exec/table_connector.cpp | 9 +++--
.../docker-compose/mysql/init/03-create-table.sql | 5 +++
.../docker-compose/oracle/init/03-create-table.sql | 6 ++++
.../postgresql/init/02-create-table.sql | 6 ++++
.../java/org/apache/doris/analysis/InsertStmt.java | 39 ++++++++++++++++------
.../doris/transaction/DatabaseTransactionMgr.java | 3 +-
.../doris/transaction/GlobalTransactionMgr.java | 5 +--
.../jdbc_catalog_p0/test_mysql_jdbc_catalog.out | 13 ++++++++
.../jdbc_catalog_p0/test_oracle_jdbc_catalog.out | 13 ++++++++
.../data/jdbc_catalog_p0/test_pg_jdbc_catalog.out | 13 ++++++++
.../jdbc_catalog_p0/test_mysql_jdbc_catalog.groovy | 23 ++++++++++---
.../test_oracle_jdbc_catalog.groovy | 14 +++++++-
.../jdbc_catalog_p0/test_pg_jdbc_catalog.groovy | 13 ++++++++
13 files changed, 140 insertions(+), 22 deletions(-)
diff --git a/be/src/exec/table_connector.cpp b/be/src/exec/table_connector.cpp
index 12dc3acdc2..30b01b1d03 100644
--- a/be/src/exec/table_connector.cpp
+++ b/be/src/exec/table_connector.cpp
@@ -226,8 +226,13 @@ Status TableConnector::convert_column_data(const
vectorized::ColumnPtr& column_p
case TYPE_VARCHAR:
case TYPE_CHAR:
case TYPE_STRING: {
- // here need check the ' is used, now for pg array string must be "
- fmt::format_to(_insert_stmt_buffer, "\"{}\"",
fmt::basic_string_view(item, size));
+ // TODO(zhangstar333): check array data type of postgresql
+ // for oracle/pg database string must be '
+ if (table_type == TOdbcTableType::ORACLE || table_type ==
TOdbcTableType::POSTGRESQL) {
+ fmt::format_to(_insert_stmt_buffer, "'{}'",
fmt::basic_string_view(item, size));
+ } else {
+ fmt::format_to(_insert_stmt_buffer, "\"{}\"",
fmt::basic_string_view(item, size));
+ }
break;
}
case TYPE_ARRAY: {
diff --git a/docker/thirdparties/docker-compose/mysql/init/03-create-table.sql
b/docker/thirdparties/docker-compose/mysql/init/03-create-table.sql
index 02c257cbc8..8fb1aebc4b 100644
--- a/docker/thirdparties/docker-compose/mysql/init/03-create-table.sql
+++ b/docker/thirdparties/docker-compose/mysql/init/03-create-table.sql
@@ -223,4 +223,9 @@ create table doris_test.ex_tb20 (
decimal_unsigned_long decimal(65, 5) unsigned
) engine=innodb charset=utf8;
+create table doris_test.test_insert (
+ `id` varchar(128) NULL,
+ `name` varchar(128) NULL,
+ `age` int NULL
+) engine=innodb charset=utf8;
diff --git a/docker/thirdparties/docker-compose/oracle/init/03-create-table.sql
b/docker/thirdparties/docker-compose/oracle/init/03-create-table.sql
index d5dd8cf1c6..d2d8d6af7e 100644
--- a/docker/thirdparties/docker-compose/oracle/init/03-create-table.sql
+++ b/docker/thirdparties/docker-compose/oracle/init/03-create-table.sql
@@ -78,3 +78,9 @@ t4 timestamp,
t5 interval year(3) to month,
t6 interval day(3) to second(6)
);
+
+create table doris_test.test_insert(
+id varchar2(128),
+name varchar2(128),
+age number(5)
+);
diff --git
a/docker/thirdparties/docker-compose/postgresql/init/02-create-table.sql
b/docker/thirdparties/docker-compose/postgresql/init/02-create-table.sql
index b721da297a..6ace3b20cb 100644
--- a/docker/thirdparties/docker-compose/postgresql/init/02-create-table.sql
+++ b/docker/thirdparties/docker-compose/postgresql/init/02-create-table.sql
@@ -150,3 +150,9 @@ CREATE TABLE catalog_pg_test.test12 (
ID INT NOT NULL,
uuid_value uuid
);
+
+CREATE TABLE catalog_pg_test.test_insert (
+ id varchar(128),
+ name varchar(128),
+ age int
+);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
index 44140b24e9..891fe3349b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -31,6 +31,8 @@ import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.JdbcExternalDatabase;
+import org.apache.doris.catalog.external.JdbcExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
@@ -39,6 +41,8 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.JdbcExternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.DataSink;
@@ -110,7 +114,7 @@ public class InsertStmt extends DdlStmt {
private Table targetTable;
- private Database db;
+ private DatabaseIf db;
private long transactionId;
// we need a new TupleDesc for olap table.
@@ -191,12 +195,15 @@ public class InsertStmt extends DdlStmt {
// get dbs of statement
queryStmt.getTables(analyzer, false, tableMap, parentViewNameSet);
tblName.analyze(analyzer);
- // disallow external catalog
- Util.prohibitExternalCatalog(tblName.getCtl(),
this.getClass().getSimpleName());
+ // disallow external catalog except JdbcExternalCatalog
+ if (analyzer.getEnv().getCurrentCatalog() instanceof ExternalCatalog
+ && !(analyzer.getEnv().getCurrentCatalog() instanceof
JdbcExternalCatalog)) {
+ Util.prohibitExternalCatalog(tblName.getCtl(),
this.getClass().getSimpleName());
+ }
String dbName = tblName.getDb();
String tableName = tblName.getTbl();
// check exist
- DatabaseIf db =
analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(dbName);
+ DatabaseIf db =
analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(dbName);
TableIf table = db.getTableOrAnalysisException(tblName.getTbl());
// check access
@@ -247,7 +254,7 @@ public class InsertStmt extends DdlStmt {
return dataSink;
}
- public Database getDbObj() {
+ public DatabaseIf getDbObj() {
return db;
}
@@ -261,8 +268,11 @@ public class InsertStmt extends DdlStmt {
if (targetTable == null) {
tblName.analyze(analyzer);
- // disallow external catalog
- Util.prohibitExternalCatalog(tblName.getCtl(),
this.getClass().getSimpleName());
+ // disallow external catalog except JdbcExternalCatalog
+ if (analyzer.getEnv().getCurrentCatalog() instanceof
ExternalCatalog
+ && !(analyzer.getEnv().getCurrentCatalog() instanceof
JdbcExternalCatalog)) {
+ Util.prohibitExternalCatalog(tblName.getCtl(),
this.getClass().getSimpleName());
+ }
}
// Check privilege
@@ -292,8 +302,7 @@ public class InsertStmt extends DdlStmt {
// create data sink
createDataSink();
- db =
analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(tblName.getDb());
-
+ db =
analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb());
// create label and begin transaction
long timeoutSecond =
ConnectContext.get().getSessionVariable().getQueryTimeoutS();
if (Strings.isNullOrEmpty(label)) {
@@ -322,8 +331,16 @@ public class InsertStmt extends DdlStmt {
private void analyzeTargetTable(Analyzer analyzer) throws
AnalysisException {
// Get table
if (targetTable == null) {
- DatabaseIf db =
Env.getCurrentInternalCatalog().getDbOrAnalysisException(tblName.getDb());
- targetTable = (Table)
db.getTableOrAnalysisException(tblName.getTbl());
+ DatabaseIf db = analyzer.getEnv().getCatalogMgr()
+
.getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb());
+ if (db instanceof Database) {
+ targetTable = (Table)
db.getTableOrAnalysisException(tblName.getTbl());
+ } else if (db instanceof JdbcExternalDatabase) {
+ JdbcExternalTable jdbcTable = (JdbcExternalTable)
db.getTableOrAnalysisException(tblName.getTbl());
+ targetTable = jdbcTable.getJdbcTable();
+ } else {
+ throw new AnalysisException("Not support insert target
table.");
+ }
}
if (targetTable instanceof OlapTable) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 347bb33205..dec2a4298f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -18,6 +18,7 @@
package org.apache.doris.transaction;
import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
@@ -664,7 +665,7 @@ public class DatabaseTransactionMgr {
LOG.info("transaction:[{}] successfully committed", transactionState);
}
- public boolean waitForTransactionFinished(Database db, long transactionId,
long timeoutMillis)
+ public boolean waitForTransactionFinished(DatabaseIf db, long
transactionId, long timeoutMillis)
throws TransactionCommitFailedException {
TransactionState transactionState = null;
readLock();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index f12e219771..3c84056789 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -18,6 +18,7 @@
package org.apache.doris.transaction;
import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
@@ -243,13 +244,13 @@ public class GlobalTransactionMgr implements Writable {
dbTransactionMgr.commitTransaction(null, transactionId, null, null,
true);
}
- public boolean commitAndPublishTransaction(Database db, List<Table>
tableList, long transactionId,
+ public boolean commitAndPublishTransaction(DatabaseIf db, List<Table>
tableList, long transactionId,
List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis)
throws UserException {
return commitAndPublishTransaction(db, tableList, transactionId,
tabletCommitInfos, timeoutMillis, null);
}
- public boolean commitAndPublishTransaction(Database db, List<Table>
tableList, long transactionId,
+ public boolean commitAndPublishTransaction(DatabaseIf db, List<Table>
tableList, long transactionId,
List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis,
TxnCommitAttachment txnCommitAttachment)
throws UserException {
diff --git a/regression-test/data/jdbc_catalog_p0/test_mysql_jdbc_catalog.out
b/regression-test/data/jdbc_catalog_p0/test_mysql_jdbc_catalog.out
index 1d7cfa7a41..22a7349c64 100644
--- a/regression-test/data/jdbc_catalog_p0/test_mysql_jdbc_catalog.out
+++ b/regression-test/data/jdbc_catalog_p0/test_mysql_jdbc_catalog.out
@@ -164,6 +164,19 @@ bca 2022-11-02 2022-11-02 8012 vivo
1.12345 1.12345 1.12345 1.12345 1.12345 1.12345
123456789012345678901234567890123.12345
12345678901234567890123456789012.12345
1234567890123456789012345678901234.12345
123456789012345678901234567890123.12345
123456789012345678901234567890123456789012345678901234567890.12345
123456789012345678901234567890123456789012345678901234567890.12345
+-- !test_insert1 --
+doris1 18
+
+-- !test_insert2 --
+doris2 19
+doris3 20
+
+-- !test_insert3 --
+doris2 19
+doris2 19
+doris3 20
+doris3 20
+
-- !ex_tb1 --
{"k1":"v1", "k2":"v2"}
diff --git a/regression-test/data/jdbc_catalog_p0/test_oracle_jdbc_catalog.out
b/regression-test/data/jdbc_catalog_p0/test_oracle_jdbc_catalog.out
index 9f45b5584c..cc5c0f50f2 100644
--- a/regression-test/data/jdbc_catalog_p0/test_oracle_jdbc_catalog.out
+++ b/regression-test/data/jdbc_catalog_p0/test_oracle_jdbc_catalog.out
@@ -41,3 +41,16 @@
7 \N \N \N \N 223-9 \N
8 \N \N \N \N \N 12 10:23:1.123457
+-- !test_insert1 --
+doris1 18
+
+-- !test_insert2 --
+doris2 19
+doris3 20
+
+-- !test_insert3 --
+doris2 19
+doris2 19
+doris3 20
+doris3 20
+
diff --git a/regression-test/data/jdbc_catalog_p0/test_pg_jdbc_catalog.out
b/regression-test/data/jdbc_catalog_p0/test_pg_jdbc_catalog.out
index 3e1c7f8058..ac0ff72029 100644
--- a/regression-test/data/jdbc_catalog_p0/test_pg_jdbc_catalog.out
+++ b/regression-test/data/jdbc_catalog_p0/test_pg_jdbc_catalog.out
@@ -2136,6 +2136,19 @@ true abc def 2022-10-11 1.234 1
2 99 2022-10-22T10:59:59 34.123
1 980dd890-f7fe-4fff-999d-873516108b2e
2 980dd890-f7fe-4fff-999d-873516108b2e
+-- !test_insert1 --
+doris1 18
+
+-- !test_insert2 --
+doris2 19
+doris3 20
+
+-- !test_insert3 --
+doris2 19
+doris2 19
+doris3 20
+doris3 20
+
-- !test_old --
123 abc
123 abc
diff --git
a/regression-test/suites/jdbc_catalog_p0/test_mysql_jdbc_catalog.groovy
b/regression-test/suites/jdbc_catalog_p0/test_mysql_jdbc_catalog.groovy
index 656fe06848..7132b638c1 100644
--- a/regression-test/suites/jdbc_catalog_p0/test_mysql_jdbc_catalog.groovy
+++ b/regression-test/suites/jdbc_catalog_p0/test_mysql_jdbc_catalog.groovy
@@ -45,6 +45,7 @@ suite("test_mysql_jdbc_catalog", "p0") {
String ex_tb18 = "ex_tb18";
String ex_tb19 = "ex_tb19";
String ex_tb20 = "ex_tb20";
+ String test_insert = "test_insert";
sql """ADMIN SET FRONTEND CONFIG ("enable_decimal_conversion" =
"true");"""
sql """drop catalog if exists ${catalog_name} """
@@ -101,8 +102,20 @@ suite("test_mysql_jdbc_catalog", "p0") {
order_qt_ex_tb19 """ select * from ${ex_tb19} order by date_value; """
order_qt_ex_tb20 """ select * from ${ex_tb20} order by
decimal_normal; """
- sql """drop catalog if exists ${catalog_name} """
- sql """drop resource if exists ${resource_name}"""
+ // test insert
+ String uuid1 = UUID.randomUUID().toString();
+ sql """ insert into ${test_insert} values ('${uuid1}', 'doris1', 18)
"""
+ order_qt_test_insert1 """ select name, age from ${test_insert} where
id = '${uuid1}' order by age """
+
+ String uuid2 = UUID.randomUUID().toString();
+ sql """ insert into ${test_insert} values ('${uuid2}', 'doris2', 19),
('${uuid2}', 'doris3', 20) """
+ order_qt_test_insert2 """ select name, age from ${test_insert} where
id = '${uuid2}' order by age """
+
+ sql """ insert into ${test_insert} select * from ${test_insert} where
id = '${uuid2}' """
+ order_qt_test_insert3 """ select name, age from ${test_insert} where
id = '${uuid2}' order by age """
+
+ sql """ drop catalog if exists ${catalog_name} """
+ sql """ drop resource if exists ${resource_name} """
// test old create-catalog syntax for compatibility
sql """ CREATE CATALOG ${catalog_name} PROPERTIES (
@@ -113,9 +126,9 @@ suite("test_mysql_jdbc_catalog", "p0") {
"jdbc.driver_url" =
"https://doris-community-test-1308700295.cos.ap-hongkong.myqcloud.com/jdbc_driver/mysql-connector-java-8.0.25.jar",
"jdbc.driver_class" = "com.mysql.cj.jdbc.Driver");
"""
- sql """switch ${catalog_name}"""
- sql """use ${ex_db_name}"""
+ sql """ switch ${catalog_name} """
+ sql """ use ${ex_db_name} """
order_qt_ex_tb1 """ select * from ${ex_tb1} order by id; """
- sql """drop resource if exists ${resource_name}"""
+ sql """ drop catalog if exists ${catalog_name} """
}
}
diff --git
a/regression-test/suites/jdbc_catalog_p0/test_oracle_jdbc_catalog.groovy
b/regression-test/suites/jdbc_catalog_p0/test_oracle_jdbc_catalog.groovy
index 51a7b174f7..7ddcce6c10 100644
--- a/regression-test/suites/jdbc_catalog_p0/test_oracle_jdbc_catalog.groovy
+++ b/regression-test/suites/jdbc_catalog_p0/test_oracle_jdbc_catalog.groovy
@@ -23,7 +23,8 @@ suite("test_oracle_jdbc_catalog", "p0") {
String internal_db_name = "regression_test_jdbc_catalog_p0";
String ex_db_name = "DORIS_TEST";
String oracle_port = context.config.otherConfigs.get("oracle_11_port");
- String SID = "XE"
+ String SID = "XE";
+ String test_insert = "TEST_INSERT";
String inDorisTable = "doris_in_tb";
@@ -68,6 +69,17 @@ suite("test_oracle_jdbc_catalog", "p0") {
// So instead of qt, we're using sql here.
sql """ select * from TEST_RAW order by ID; """
+ // test insert
+ String uuid1 = UUID.randomUUID().toString();
+ sql """ insert into ${test_insert} values ('${uuid1}', 'doris1', 18)
"""
+ order_qt_test_insert1 """ select name, age from ${test_insert} where
id = '${uuid1}' order by age """
+
+ String uuid2 = UUID.randomUUID().toString();
+ sql """ insert into ${test_insert} values ('${uuid2}', 'doris2', 19),
('${uuid2}', 'doris3', 20) """
+ order_qt_test_insert2 """ select name, age from ${test_insert} where
id = '${uuid2}' order by age """
+
+ sql """ insert into ${test_insert} select * from ${test_insert} where
id = '${uuid2}' """
+ order_qt_test_insert3 """ select name, age from ${test_insert} where
id = '${uuid2}' order by age """
sql """drop catalog if exists ${catalog_name} """
sql """drop resource if exists jdbc_resource_catalog_pg"""
diff --git a/regression-test/suites/jdbc_catalog_p0/test_pg_jdbc_catalog.groovy
b/regression-test/suites/jdbc_catalog_p0/test_pg_jdbc_catalog.groovy
index 71462cc307..883d15be9e 100644
--- a/regression-test/suites/jdbc_catalog_p0/test_pg_jdbc_catalog.groovy
+++ b/regression-test/suites/jdbc_catalog_p0/test_pg_jdbc_catalog.groovy
@@ -25,6 +25,7 @@ suite("test_pg_jdbc_catalog", "p0") {
String ex_schema_name2 = "catalog_pg_test";
String pg_port = context.config.otherConfigs.get("pg_14_port");
String inDorisTable = "doris_in_tb";
+ String test_insert = "test_insert";
sql """drop catalog if exists ${catalog_name} """
sql """drop resource if exists ${resource_name}"""
@@ -73,6 +74,18 @@ suite("test_pg_jdbc_catalog", "p0") {
order_qt_test13 """ select * from test11 order by id; """
order_qt_test14 """ select * from test12 order by id; """
+ // test insert
+ String uuid1 = UUID.randomUUID().toString();
+ sql """ insert into ${test_insert} values ('${uuid1}', 'doris1', 18)
"""
+ order_qt_test_insert1 """ select name, age from ${test_insert} where
id = '${uuid1}' order by age """
+
+ String uuid2 = UUID.randomUUID().toString();
+ sql """ insert into ${test_insert} values ('${uuid2}', 'doris2', 19),
('${uuid2}', 'doris3', 20) """
+ order_qt_test_insert2 """ select name, age from ${test_insert} where
id = '${uuid2}' order by age """
+
+ sql """ insert into ${test_insert} select * from ${test_insert} where
id = '${uuid2}' """
+ order_qt_test_insert3 """ select name, age from ${test_insert} where
id = '${uuid2}' order by age """
+
sql """drop catalog if exists ${catalog_name} """
sql """drop resource if exists ${resource_name}"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]