This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 9bee4582e7a branch-2.1: [fix](query tvf): resolve column mismatch
error in JDBC query function (#54077) (#54249)
9bee4582e7a is described below
commit 9bee4582e7a058247dd6b7b61c6c13cb37b637c7
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Mon Aug 4 19:44:27 2025 -0700
branch-2.1: [fix](query tvf): resolve column mismatch error in JDBC query
function (#54077) (#54249)
bp #54077
---------
Co-authored-by: XnY-wei <[email protected]>
Co-authored-by: weixingyu12 <[email protected]>
---
be/src/pipeline/exec/jdbc_scan_operator.cpp | 3 +-
be/src/pipeline/exec/jdbc_scan_operator.h | 1 +
be/src/vec/exec/scan/new_jdbc_scan_node.cpp | 7 +-
be/src/vec/exec/scan/new_jdbc_scan_node.h | 1 +
be/src/vec/exec/scan/new_jdbc_scanner.cpp | 13 ++-
be/src/vec/exec/scan/new_jdbc_scanner.h | 5 +-
be/src/vec/exec/vjdbc_connector.cpp | 7 +-
be/src/vec/exec/vjdbc_connector.h | 1 +
.../org/apache/doris/jdbc/BaseJdbcExecutor.java | 94 +++++++++++++++---
.../org/apache/doris/jdbc/MySQLJdbcExecutor.java | 105 +++++++++++----------
.../doris/datasource/jdbc/source/JdbcScanNode.java | 1 +
gensrc/thrift/PlanNodes.thrift | 1 +
gensrc/thrift/Types.thrift | 1 +
.../jdbc/test_doris_jdbc_catalog.out | Bin 3627 -> 3724 bytes
.../jdbc/test_doris_jdbc_catalog.groovy | 36 ++++---
15 files changed, 188 insertions(+), 88 deletions(-)
diff --git a/be/src/pipeline/exec/jdbc_scan_operator.cpp
b/be/src/pipeline/exec/jdbc_scan_operator.cpp
index 35ad7ec0490..e145a719e36 100644
--- a/be/src/pipeline/exec/jdbc_scan_operator.cpp
+++ b/be/src/pipeline/exec/jdbc_scan_operator.cpp
@@ -30,7 +30,7 @@ std::string JDBCScanLocalState::name_suffix() const {
Status JDBCScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>*
scanners) {
auto& p = _parent->cast<JDBCScanOperatorX>();
std::unique_ptr<vectorized::NewJdbcScanner> scanner =
vectorized::NewJdbcScanner::create_unique(
- state(), this, p._limit, p._tuple_id, p._query_string,
p._table_type,
+ state(), this, p._limit, p._tuple_id, p._query_string,
p._table_type, p._is_tvf,
_scanner_profile.get());
RETURN_IF_ERROR(scanner->prepare(state(), _conjuncts));
scanners->push_back(std::move(scanner));
@@ -45,6 +45,7 @@ JDBCScanOperatorX::JDBCScanOperatorX(ObjectPool* pool, const
TPlanNode& tnode, i
_query_string(tnode.jdbc_scan_node.query_string),
_table_type(tnode.jdbc_scan_node.table_type) {
_output_tuple_id = tnode.jdbc_scan_node.tuple_id;
+ _is_tvf = tnode.jdbc_scan_node.__isset.is_tvf ?
tnode.jdbc_scan_node.is_tvf : false;
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/jdbc_scan_operator.h
b/be/src/pipeline/exec/jdbc_scan_operator.h
index 825e01acc2a..07dfec37239 100644
--- a/be/src/pipeline/exec/jdbc_scan_operator.h
+++ b/be/src/pipeline/exec/jdbc_scan_operator.h
@@ -62,6 +62,7 @@ private:
TupleId _tuple_id;
std::string _query_string;
TOdbcTableType::type _table_type;
+ bool _is_tvf;
};
} // namespace doris::pipeline
diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
index f8219b4337e..f6a1e3e6ec1 100644
--- a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
@@ -44,6 +44,7 @@ NewJdbcScanNode::NewJdbcScanNode(ObjectPool* pool, const
TPlanNode& tnode,
_query_string(tnode.jdbc_scan_node.query_string),
_table_type(tnode.jdbc_scan_node.table_type) {
_output_tuple_id = tnode.jdbc_scan_node.tuple_id;
+ _is_tvf = tnode.jdbc_scan_node.__isset.is_tvf ?
tnode.jdbc_scan_node.is_tvf : false;
}
std::string NewJdbcScanNode::get_name() {
@@ -65,9 +66,9 @@ Status
NewJdbcScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
if (_eos == true) {
return Status::OK();
}
- std::unique_ptr<NewJdbcScanner> scanner =
- NewJdbcScanner::create_unique(_state, this, _limit_per_scanner,
_tuple_id,
- _query_string, _table_type,
_state->runtime_profile());
+ std::unique_ptr<NewJdbcScanner> scanner = NewJdbcScanner::create_unique(
+ _state, this, _limit_per_scanner, _tuple_id, _query_string,
_table_type, _is_tvf,
+ _state->runtime_profile());
RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts));
scanners->push_back(std::move(scanner));
return Status::OK();
diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.h
b/be/src/vec/exec/scan/new_jdbc_scan_node.h
index 54a8dc7d3dd..ee284b7e4a3 100644
--- a/be/src/vec/exec/scan/new_jdbc_scan_node.h
+++ b/be/src/vec/exec/scan/new_jdbc_scan_node.h
@@ -51,6 +51,7 @@ private:
TupleId _tuple_id;
std::string _query_string;
TOdbcTableType::type _table_type;
+ bool _is_tvf;
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp
b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
index 830b4e51383..cdcdf677e59 100644
--- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
@@ -38,26 +38,30 @@
namespace doris::vectorized {
NewJdbcScanner::NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent,
int64_t limit,
const TupleId& tuple_id, const std::string&
query_string,
- TOdbcTableType::type table_type,
RuntimeProfile* profile)
+ TOdbcTableType::type table_type, bool is_tvf,
+ RuntimeProfile* profile)
: VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
_jdbc_eos(false),
_tuple_id(tuple_id),
_query_string(query_string),
_tuple_desc(nullptr),
- _table_type(table_type) {
+ _table_type(table_type),
+ _is_tvf(is_tvf) {
_init_profile(get_parent()->_scanner_profile);
}
NewJdbcScanner::NewJdbcScanner(RuntimeState* state,
doris::pipeline::JDBCScanLocalState*
local_state, int64_t limit,
const TupleId& tuple_id, const std::string&
query_string,
- TOdbcTableType::type table_type,
RuntimeProfile* profile)
+ TOdbcTableType::type table_type, bool is_tvf,
+ RuntimeProfile* profile)
: VScanner(state, local_state, limit, profile),
_jdbc_eos(false),
_tuple_id(tuple_id),
_query_string(query_string),
_tuple_desc(nullptr),
- _table_type(table_type) {
+ _table_type(table_type),
+ _is_tvf(is_tvf) {
_init_profile(local_state->_scanner_profile);
}
@@ -97,6 +101,7 @@ Status NewJdbcScanner::prepare(RuntimeState* state, const
VExprContextSPtrs& con
_jdbc_param.query_string = std::move(_query_string);
_jdbc_param.use_transaction = false; // not useful for scanner but only
sink.
_jdbc_param.table_type = _table_type;
+ _jdbc_param.is_tvf = _is_tvf;
_jdbc_param.connection_pool_min_size =
jdbc_table->connection_pool_min_size();
_jdbc_param.connection_pool_max_size =
jdbc_table->connection_pool_max_size();
_jdbc_param.connection_pool_max_life_time =
jdbc_table->connection_pool_max_life_time();
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.h
b/be/src/vec/exec/scan/new_jdbc_scanner.h
index bda6d6b1dc8..5020241836b 100644
--- a/be/src/vec/exec/scan/new_jdbc_scanner.h
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.h
@@ -48,10 +48,10 @@ public:
NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit,
const TupleId& tuple_id, const std::string& query_string,
- TOdbcTableType::type table_type, RuntimeProfile* profile);
+ TOdbcTableType::type table_type, bool is_tvf,
RuntimeProfile* profile);
NewJdbcScanner(RuntimeState* state, doris::pipeline::JDBCScanLocalState*
parent, int64_t limit,
const TupleId& tuple_id, const std::string& query_string,
- TOdbcTableType::type table_type, RuntimeProfile* profile);
+ TOdbcTableType::type table_type, bool is_tvf,
RuntimeProfile* profile);
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
@@ -87,6 +87,7 @@ private:
const TupleDescriptor* _tuple_desc = nullptr;
// the sql query database type: like mysql, PG...
TOdbcTableType::type _table_type;
+ bool _is_tvf;
// Scanner of JDBC.
std::unique_ptr<JdbcConnector> _jdbc_connector;
JdbcConnectorParam _jdbc_param;
diff --git a/be/src/vec/exec/vjdbc_connector.cpp
b/be/src/vec/exec/vjdbc_connector.cpp
index f3f4b4d8a1c..1a68e5d6c61 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -144,6 +144,7 @@ Status JdbcConnector::open(RuntimeState* state, bool read) {
ctor_params.__set_connection_pool_cache_clear_time(
config::jdbc_connection_pool_cache_clear_time_sec);
ctor_params.__set_connection_pool_keep_alive(_conn_param.connection_pool_keep_alive);
+ ctor_params.__set_is_tvf(_conn_param.is_tvf);
jbyteArray ctor_params_bytes;
// Pushed frame will be popped when jni_frame goes out-of-scope.
@@ -209,8 +210,10 @@ Status JdbcConnector::query() {
return Status::InternalError("GetJniExceptionMsg meet error,
query={}, msg={}",
_conn_param.query_string,
status.to_string());
}
- if (colunm_count != materialize_num) {
- return Status::InternalError("input and output column num not
equal of jdbc query.");
+ if (colunm_count < materialize_num) {
+ return Status::InternalError(
+ "JDBC query returned fewer columns ({}) than required
({}).", colunm_count,
+ materialize_num);
}
}
diff --git a/be/src/vec/exec/vjdbc_connector.h
b/be/src/vec/exec/vjdbc_connector.h
index a09d390dc7c..ed202e21350 100644
--- a/be/src/vec/exec/vjdbc_connector.h
+++ b/be/src/vec/exec/vjdbc_connector.h
@@ -56,6 +56,7 @@ struct JdbcConnectorParam {
std::string table_name;
bool use_transaction = false;
TOdbcTableType::type table_type;
+ bool is_tvf = false;
int32_t connection_pool_min_size = -1;
int32_t connection_pool_max_size = -1;
int32_t connection_pool_max_wait_time = -1;
diff --git
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java
index 383f1fe9aa1..82eb3cfeefa 100644
---
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java
+++
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java
@@ -56,6 +56,7 @@ import java.sql.Statement;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@@ -79,6 +80,19 @@ public abstract class BaseJdbcExecutor implements
JdbcExecutor {
protected String jdbcDriverVersion;
private static final Map<URL, ClassLoader> classLoaderMap =
Maps.newConcurrentMap();
+ // col name(lowercase) -> index in resultSetMetaData
+ // this map is only used for "query()" tvf, so only valid if isTvf is true.
+ // Because for "query()" tvf, the sql string is written by user, so the
column name in resultSetMetaData
+ // maybe larger than the column name in outputTable.
+ // For example, if the sql is "select a from query('select a,b from tbl')",
+ // the column num in resultSetMetaData is 2, but the outputTable only has
1 column "a".
+ // But if the sql is "select a from (select a,b from tbl)x",
+ // the column num in resultSetMetaData is 1, and the outputTable also has
1 column "a".
+ // Because the planner will do the column pruning before generating the
sql string.
+ // So, for query() tvf, we need to map the column name in outputTable to
the column index in resultSetMetaData.
+ private Map<String, Integer> resultSetColumnMap = null;
+ private boolean isTvf = false;
+
public BaseJdbcExecutor(byte[] thriftParams) throws Exception {
setJdbcDriverSystemProperties();
TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams();
@@ -107,6 +121,7 @@ public abstract class BaseJdbcExecutor implements
JdbcExecutor {
JdbcDataSource.getDataSource().setCleanupInterval(request.connection_pool_cache_clear_time);
init(config, request.statement);
this.jdbcDriverVersion = getJdbcDriverVersion();
+ this.isTvf = request.isSetIsTvf() ? request.is_tvf : false;
}
public void close() throws Exception {
@@ -206,32 +221,59 @@ public abstract class BaseJdbcExecutor implements
JdbcExecutor {
if (isNullableString == null || replaceString == null) {
throw new IllegalArgumentException(
- "Output parameters 'is_nullable' and 'replace_string'
are required.");
+ "Output parameters 'is_nullable' and 'replace_string' are
required.");
}
String[] nullableList = isNullableString.split(",");
String[] replaceStringList = replaceString.split(",");
curBlockRows = 0;
- int columnCount = resultSetMetaData.getColumnCount();
- initializeBlock(columnCount, replaceStringList, batchSize,
outputTable);
+ int outputColumnCount = outputTable.getColumns().length;
+ initializeBlock(outputColumnCount, replaceStringList, batchSize,
outputTable);
+
+ // the resultSetColumnMap is only for "query()" tvf
+ if (this.isTvf && this.resultSetColumnMap == null) {
+ this.resultSetColumnMap = new HashMap<>();
+ int resultSetColumnCount = resultSetMetaData.getColumnCount();
+ for (int i = 1; i <= resultSetColumnCount; i++) {
+ String columnName =
resultSetMetaData.getColumnName(i).trim().toLowerCase();
+ resultSetColumnMap.put(columnName, i);
+ }
+ }
do {
- for (int i = 0; i < columnCount; ++i) {
- ColumnType type = outputTable.getColumnType(i);
- block.get(i)[curBlockRows] = getColumnValue(i, type,
replaceStringList);
+ for (int i = 0; i < outputColumnCount; ++i) {
+ String outputColumnName = outputTable.getFields()[i];
+ int columnIndex = getRealColumnIndex(outputColumnName, i);
+ if (columnIndex > -1) {
+ ColumnType type = convertTypeIfNecessary(i,
outputTable.getColumnType(i), replaceStringList);
+ block.get(i)[curBlockRows] =
getColumnValue(columnIndex, type, replaceStringList);
+ } else {
+ throw new RuntimeException("Column not found in
resultSetColumnMap: " + outputColumnName);
+ }
}
curBlockRows++;
} while (curBlockRows < batchSize && resultSet.next());
- for (int i = 0; i < columnCount; ++i) {
- ColumnType type = outputTable.getColumnType(i);
- Object[] columnData = block.get(i);
- Class<?> componentType =
columnData.getClass().getComponentType();
- Object[] newColumn = (Object[])
Array.newInstance(componentType, curBlockRows);
- System.arraycopy(columnData, 0, newColumn, 0, curBlockRows);
- boolean isNullable = Boolean.parseBoolean(nullableList[i]);
- outputTable.appendData(i, newColumn, getOutputConverter(type,
replaceStringList[i]), isNullable);
+ for (int i = 0; i < outputColumnCount; ++i) {
+ String outputColumnName = outputTable.getFields()[i];
+ int columnIndex = getRealColumnIndex(outputColumnName, i);
+ if (columnIndex > -1) {
+ ColumnType type = outputTable.getColumnType(i);
+ Object[] columnData = block.get(i);
+ Class<?> componentType =
columnData.getClass().getComponentType();
+ Object[] newColumn = (Object[])
Array.newInstance(componentType, curBlockRows);
+ System.arraycopy(columnData, 0, newColumn, 0,
curBlockRows);
+ boolean isNullable = Boolean.parseBoolean(nullableList[i]);
+ outputTable.appendData(
+ i,
+ newColumn,
+ getOutputConverter(type, replaceStringList[i]),
+ isNullable
+ );
+ } else {
+ throw new RuntimeException("Column not found in
resultSetColumnMap: " + outputColumnName);
+ }
}
} catch (Exception e) {
LOG.warn("jdbc get block address exception: ", e);
@@ -242,6 +284,14 @@ public abstract class BaseJdbcExecutor implements
JdbcExecutor {
return outputTable.getMetaAddress();
}
+ private int getRealColumnIndex(String outputColumnName, int
indexInOutputTable) {
+ // -1 because ResultSetMetaData column index starts from 1, but index
in outputTable starts from 0.
+ int columnIndex = this.isTvf
+ ?
resultSetColumnMap.getOrDefault(outputColumnName.toLowerCase(), 0) - 1 :
indexInOutputTable;
+ return columnIndex;
+ }
+
+
protected void initializeBlock(int columnCount, String[]
replaceStringList, int batchSizeNum,
VectorTable outputTable) {
for (int i = 0; i < columnCount; ++i) {
@@ -442,6 +492,19 @@ public abstract class BaseJdbcExecutor implements
JdbcExecutor {
protected abstract Object getColumnValue(int columnIndex, ColumnType type,
String[] replaceStringList)
throws SQLException;
+ /**
+ * Some special column types (like bitmap/hll in Doris) may need to be
converted to string.
+ * Subclass can override this method to handle such conversions.
+ *
+ * @param outputIdx
+ * @param origType
+ * @param replaceStringList
+ * @return
+ */
+ protected ColumnType convertTypeIfNecessary(int outputIdx, ColumnType
origType, String[] replaceStringList) {
+ return origType;
+ }
+
/*
| Type | Java Array Type
|
|---------------------------------------------|----------------------------|
@@ -650,3 +713,6 @@ public abstract class BaseJdbcExecutor implements
JdbcExecutor {
return hexString.toString();
}
}
+
+
+
diff --git
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/MySQLJdbcExecutor.java
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/MySQLJdbcExecutor.java
index 4e5af95211b..9ceb613425a 100644
---
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/MySQLJdbcExecutor.java
+++
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/MySQLJdbcExecutor.java
@@ -101,60 +101,67 @@ public class MySQLJdbcExecutor extends BaseJdbcExecutor {
}
}
+ @Override
+ protected ColumnType convertTypeIfNecessary(int outputIdx, ColumnType
origType, String[] replaceStringList) {
+ if (replaceStringList[outputIdx].equals("bitmap") ||
replaceStringList[outputIdx].equals("hll")) {
+ return new ColumnType(origType.getName(), Type.BYTE);
+ }
+ return origType;
+ }
+
@Override
protected Object getColumnValue(int columnIndex, ColumnType type, String[]
replaceStringList) throws SQLException {
- if (replaceStringList[columnIndex].equals("bitmap") ||
replaceStringList[columnIndex].equals("hll")) {
- byte[] data = resultSet.getBytes(columnIndex + 1);
- if (resultSet.wasNull()) {
- return null;
- }
- return data;
- } else {
- switch (type.getType()) {
- case BOOLEAN:
- return resultSet.getObject(columnIndex + 1, Boolean.class);
- case TINYINT:
- case SMALLINT:
- case LARGEINT:
+ switch (type.getType()) {
+ case BOOLEAN:
+ return resultSet.getObject(columnIndex + 1, Boolean.class);
+ case TINYINT:
+ case SMALLINT:
+ case LARGEINT:
+ return resultSet.getObject(columnIndex + 1);
+ case INT:
+ return resultSet.getObject(columnIndex + 1, Integer.class);
+ case BIGINT:
+ return resultSet.getObject(columnIndex + 1, Long.class);
+ case FLOAT:
+ return resultSet.getObject(columnIndex + 1, Float.class);
+ case DOUBLE:
+ return resultSet.getObject(columnIndex + 1, Double.class);
+ case DECIMALV2:
+ case DECIMAL32:
+ case DECIMAL64:
+ case DECIMAL128:
+ return resultSet.getObject(columnIndex + 1, BigDecimal.class);
+ case DATE:
+ case DATEV2:
+ return resultSet.getObject(columnIndex + 1, LocalDate.class);
+ case DATETIME:
+ case DATETIMEV2:
+ return resultSet.getObject(columnIndex + 1,
LocalDateTime.class);
+ case CHAR:
+ case VARCHAR:
+ case ARRAY:
+ return resultSet.getObject(columnIndex + 1, String.class);
+ case STRING: {
+ int jdbcType = resultSetMetaData.getColumnType(columnIndex +
1);
+ // If it is a time type in mysql, or use mysql driver connect
mariadb
+ // We need to obtain the string directly to ensure that we can
obtain a time other than 24 hours.
+ // If it is another database, such as oceanbase, this
processing will lose precision information,
+ // so the original processing method will be maintained for
the time being.
+ if (jdbcType == Types.TIME && config.getTableType() ==
TOdbcTableType.MYSQL) {
+ return resultSet.getString(columnIndex + 1);
+ } else {
return resultSet.getObject(columnIndex + 1);
- case INT:
- return resultSet.getObject(columnIndex + 1, Integer.class);
- case BIGINT:
- return resultSet.getObject(columnIndex + 1, Long.class);
- case FLOAT:
- return resultSet.getObject(columnIndex + 1, Float.class);
- case DOUBLE:
- return resultSet.getObject(columnIndex + 1, Double.class);
- case DECIMALV2:
- case DECIMAL32:
- case DECIMAL64:
- case DECIMAL128:
- return resultSet.getObject(columnIndex + 1,
BigDecimal.class);
- case DATE:
- case DATEV2:
- return resultSet.getObject(columnIndex + 1,
LocalDate.class);
- case DATETIME:
- case DATETIMEV2:
- return resultSet.getObject(columnIndex + 1,
LocalDateTime.class);
- case CHAR:
- case VARCHAR:
- case ARRAY:
- return resultSet.getObject(columnIndex + 1, String.class);
- case STRING: {
- int jdbcType = resultSetMetaData.getColumnType(columnIndex
+ 1);
- // If it is a time type in mysql, or use mysql driver
connect mariadb
- // We need to obtain the string directly to ensure that we
can obtain a time other than 24 hours.
- // If it is another database, such as oceanbase, this
processing will lose precision information,
- // so the original processing method will be maintained
for the time being.
- if (jdbcType == Types.TIME && config.getTableType() ==
TOdbcTableType.MYSQL) {
- return resultSet.getString(columnIndex + 1);
- } else {
- return resultSet.getObject(columnIndex + 1);
- }
}
- default:
- throw new IllegalArgumentException("Unsupported column
type: " + type.getType());
}
+ case BYTE: {
+ byte[] data = resultSet.getBytes(columnIndex + 1);
+ if (resultSet.wasNull()) {
+ return null;
+ }
+ return data;
+ }
+ default:
+ throw new IllegalArgumentException("Unsupported column type: "
+ type.getType());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
index 346c27a68bd..d1198bde357 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
@@ -282,6 +282,7 @@ public class JdbcScanNode extends ExternalScanNode {
msg.jdbc_scan_node.setQueryString(getJdbcQueryStr());
}
msg.jdbc_scan_node.setTableType(jdbcType);
+ msg.jdbc_scan_node.setIsTvf(isTableValuedFunction);
}
@Override
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 7ccb12b3331..1f1b36782a0 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -613,6 +613,7 @@ struct TJdbcScanNode {
2: optional string table_name
3: optional string query_string
4: optional Types.TOdbcTableType table_type
+ 5: optional bool is_tvf
}
struct TBrokerScanNode {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 336988e4b88..7b0111c1ed9 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -437,6 +437,7 @@ struct TJdbcExecutorCtorParams {
15: optional bool connection_pool_keep_alive
16: optional i64 catalog_id
17: optional string jdbc_driver_checksum
+ 18: optional bool is_tvf
}
struct TJavaUdfExecutorCtorParams {
diff --git
a/regression-test/data/external_table_p0/jdbc/test_doris_jdbc_catalog.out
b/regression-test/data/external_table_p0/jdbc/test_doris_jdbc_catalog.out
index 9695f628fee..220c756a0e2 100644
Binary files
a/regression-test/data/external_table_p0/jdbc/test_doris_jdbc_catalog.out and
b/regression-test/data/external_table_p0/jdbc/test_doris_jdbc_catalog.out differ
diff --git
a/regression-test/suites/external_table_p0/jdbc/test_doris_jdbc_catalog.groovy
b/regression-test/suites/external_table_p0/jdbc/test_doris_jdbc_catalog.groovy
index c4fce17c62c..ec77abb51a4 100644
---
a/regression-test/suites/external_table_p0/jdbc/test_doris_jdbc_catalog.groovy
+++
b/regression-test/suites/external_table_p0/jdbc/test_doris_jdbc_catalog.groovy
@@ -231,20 +231,30 @@ suite("test_doris_jdbc_catalog",
"p0,external,doris,external_docker,external_doc
// test query tvf
qt_sql """desc function query("catalog" = "doris_jdbc_catalog", "query" =
"select * from regression_test_jdbc_catalog_p0.base");"""
- order_qt_sql """ select varchar_col,tinyint_col from query("catalog" =
"doris_jdbc_catalog", "query" = "select varchar_col,tinyint_col from
regression_test_jdbc_catalog_p0.base");"""
+ order_qt_sql1 """ select varchar_col,tinyint_col from query("catalog" =
"doris_jdbc_catalog", "query" = "select varchar_col,tinyint_col from
regression_test_jdbc_catalog_p0.base");"""
- order_qt_sql """ select tinyint_col,varchar_col from query("catalog" =
"doris_jdbc_catalog", "query" = "select varchar_col,tinyint_col from
regression_test_jdbc_catalog_p0.base");"""
+ order_qt_sql2 """ select tinyint_col,varchar_col from query("catalog" =
"doris_jdbc_catalog", "query" = "select varchar_col,tinyint_col from
regression_test_jdbc_catalog_p0.base");"""
- //clean
- qt_sql """select current_catalog()"""
- sql "switch internal"
- qt_sql """select current_catalog()"""
- sql "use regression_test_jdbc_catalog_p0"
- sql """ drop table if exists test_doris_jdbc_doris_in_tb """
- sql """ drop table if exists bowen_hll_test """
- sql """ drop table if exists base """
- sql """ drop table if exists all_null_tbl """
- sql """ drop table if exists arr """
- sql """ drop table if exists test_insert_order """
+ order_qt_sql3 """ select varchar_col from query("catalog" =
"doris_jdbc_catalog", "query" = "select varchar_col,tinyint_col from
regression_test_jdbc_catalog_p0.base");"""
+
+ order_qt_sql4 """ select tinyint_col from query("catalog" =
"doris_jdbc_catalog", "query" = "select varchar_col,tinyint_col from
regression_test_jdbc_catalog_p0.base");"""
+
+ order_qt_sql5 """ with tmp as (select varchar_col,tinyint_col from
query("catalog" = "doris_jdbc_catalog", "query" = "select
varchar_col,tinyint_col from regression_test_jdbc_catalog_p0.base")) select
tinyint_col from tmp;"""
+
+ order_qt_sql6 """ with tmp as (select varchar_col,tinyint_col from
query("catalog" = "doris_jdbc_catalog", "query" = "select
varchar_col,tinyint_col from regression_test_jdbc_catalog_p0.base")) select
tinyint_col,varchar_col from tmp;"""
+
+ order_qt_sql7 """ with tmp as (select tinyint_col,varchar_col from
query("catalog" = "doris_jdbc_catalog", "query" = "select
varchar_col,tinyint_col from regression_test_jdbc_catalog_p0.base")) select
tinyint_col from tmp;"""
+
+ // //clean
+ // qt_sql """select current_catalog()"""
+ // sql "switch internal"
+ // qt_sql """select current_catalog()"""
+ // sql "use regression_test_jdbc_catalog_p0"
+ // sql """ drop table if exists test_doris_jdbc_doris_in_tb """
+ // sql """ drop table if exists bowen_hll_test """
+ // sql """ drop table if exists base """
+ // sql """ drop table if exists all_null_tbl """
+ // sql """ drop table if exists arr """
+ // sql """ drop table if exists test_insert_order """
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]