This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/mysql-connector
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3ec82019cc22efd50acf07eb077b9347a99ec887
Author: JackieTien97 <[email protected]>
AuthorDate: Thu Jun 5 21:15:18 2025 +0800

    Support query from mysql
---
 .../processor/TableFunctionDataProcessor.java      |   5 +
 .../processor/TableFunctionLeafProcessor.java      |   5 +
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   2 +
 iotdb-core/datanode/pom.xml                        |   5 +
 .../function/TableFunctionLeafOperator.java        |   4 +-
 .../process/function/TableFunctionOperator.java    |   4 +
 .../function/tvf/connector/JDBCConnectionPool.java | 108 +++++++++
 .../tvf/connector/MySqlConnectorTableFunction.java | 251 +++++++++++++++++++++
 .../tvf/connector/converter/BinaryConverter.java   |  20 ++
 .../tvf/connector/converter/BlobConverter.java     |  21 ++
 .../tvf/connector/converter/BooleanConverter.java  |  19 ++
 .../tvf/connector/converter/DateConverter.java     |  22 ++
 .../tvf/connector/converter/DoubleConverter.java   |  19 ++
 .../tvf/connector/converter/FloatConverter.java    |  19 ++
 .../tvf/connector/converter/Int32Converter.java    |  19 ++
 .../tvf/connector/converter/Int64Converter.java    |  19 ++
 .../connector/converter/ResultSetConverter.java    |  11 +
 .../tvf/connector/converter/StringConverter.java   |  22 ++
 .../tvf/connector/converter/TimeConverter.java     |  23 ++
 .../connector/converter/TimestampConverter.java    |  23 ++
 pom.xml                                            |   1 +
 21 files changed, 621 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionDataProcessor.java
 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionDataProcessor.java
index aa78e50f4d8..0fe40fc894b 100644
--- 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionDataProcessor.java
+++ 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionDataProcessor.java
@@ -65,4 +65,9 @@ public interface TableFunctionDataProcessor {
       List<ColumnBuilder> properColumnBuilders, ColumnBuilder 
passThroughIndexBuilder) {
     // do nothing
   }
+
+  /** This method is mainly used to release the resources used in the UDF. */
+  default void beforeDestroy() {
+    // do nothing
+  }
 }
diff --git 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionLeafProcessor.java
 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionLeafProcessor.java
index 41750fc3b53..ab8763f3c4a 100644
--- 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionLeafProcessor.java
+++ 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionLeafProcessor.java
@@ -39,4 +39,9 @@ public interface TableFunctionLeafProcessor {
 
   /** This method is called to determine if the processor has finished 
processing all data. */
   boolean isFinish();
+
+  /** This method is mainly used to release the resources used in the UDF. */
+  default void beforeDestroy() {
+    // do nothing
+  }
 }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 3f7eb8f6507..3b8367716bf 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -142,6 +142,8 @@ public enum TSStatusCode {
   QUERY_TIMEOUT(720),
   PLAN_FAILED_NETWORK_PARTITION(721),
   CANNOT_FETCH_FI_STATE(722),
+  EXECUTION_FAILED_IN_EXTERNAL_DB(723),
+  CLOSE_FAILED_IN_EXTERNAL_DB(723),
 
   // Arithmetic
   NUMERIC_VALUE_OUT_OF_RANGE(750),
diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index 0a4d9418916..c0ae646b895 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -313,6 +313,11 @@
             <groupId>org.java-websocket</groupId>
             <artifactId>Java-WebSocket</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.mysql</groupId>
+            <artifactId>mysql-connector-j</artifactId>
+            <version>${mysql.jdbc.version}</version>
+        </dependency>
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionLeafOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionLeafOperator.java
index 47f2ec9de35..fcce22ea0f4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionLeafOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionLeafOperator.java
@@ -82,7 +82,9 @@ public class TableFunctionLeafOperator implements 
ProcessOperator {
   }
 
   @Override
-  public void close() throws Exception {}
+  public void close() throws Exception {
+    processor.beforeDestroy();
+  }
 
   @Override
   public boolean isFinished() throws Exception {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
index 4e9f7e436a5..c00e05ec17f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
@@ -174,6 +174,7 @@ public class TableFunctionOperator implements 
ProcessOperator {
           processor.finish(properColumnBuilders, passThroughIndexBuilder);
           resultTsBlocks.addAll(buildTsBlock(properColumnBuilders, 
passThroughIndexBuilder));
           partitionCache.clear();
+          processor.beforeDestroy();
           processor = null;
           return resultTsBlocks.poll();
         } else {
@@ -259,6 +260,9 @@ public class TableFunctionOperator implements 
ProcessOperator {
   public void close() throws Exception {
     partitionCache.close();
     inputOperator.close();
+    if (processor != null) {
+      processor.beforeDestroy();
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/JDBCConnectionPool.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/JDBCConnectionPool.java
new file mode 100644
index 00000000000..2f7d0c8d821
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/JDBCConnectionPool.java
@@ -0,0 +1,108 @@
+package org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector;
+
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter.*;
+import org.apache.iotdb.udf.api.type.Type;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+public class JDBCConnectionPool {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(JDBCConnectionPool.class);
+
+  private JDBCConnectionPool() {}
+
+  static {
+    try {
+      Class.forName("com.mysql.cj.jdbc.Driver").newInstance();
+    } catch (Exception e) {
+      LOGGER.warn("Failed to initialize mysql JDBC driver", e);
+    }
+  }
+
+  public static Connection getConnection(String url, String userName, String 
password)
+      throws SQLException {
+    return DriverManager.getConnection(url, userName, password);
+  }
+
+  private static class ConnectionWrapper {}
+
+  public static Type translateJDBCTypeToUDFType(int type) {
+    switch (type) {
+      case java.sql.Types.TINYINT:
+      case java.sql.Types.SMALLINT:
+      case java.sql.Types.INTEGER:
+        return Type.INT32;
+      case java.sql.Types.BIGINT:
+        return Type.INT64;
+      case java.sql.Types.FLOAT:
+        return Type.FLOAT;
+      case java.sql.Types.DOUBLE:
+        return Type.DOUBLE;
+      case java.sql.Types.CHAR:
+      case java.sql.Types.VARCHAR:
+      case java.sql.Types.LONGVARCHAR:
+      case java.sql.Types.NCHAR:
+      case java.sql.Types.NVARCHAR:
+      case java.sql.Types.LONGNVARCHAR:
+        return Type.STRING;
+      case java.sql.Types.DATE:
+        return Type.DATE;
+      case java.sql.Types.TIME:
+      case java.sql.Types.TIMESTAMP:
+        return Type.TIMESTAMP;
+      case java.sql.Types.BINARY:
+      case java.sql.Types.VARBINARY:
+      case java.sql.Types.LONGVARBINARY:
+      case java.sql.Types.BLOB:
+        return Type.BLOB;
+      case java.sql.Types.BOOLEAN:
+        return Type.BOOLEAN;
+      default:
+        throw new SemanticException("Unsupported JDBC type: " + type);
+    }
+  }
+
+  public static ResultSetConverter getResultSetConverter(int type) {
+    switch (type) {
+      case java.sql.Types.TINYINT:
+      case java.sql.Types.SMALLINT:
+      case java.sql.Types.INTEGER:
+        return new Int32Converter();
+      case java.sql.Types.BIGINT:
+        return new Int64Converter();
+      case java.sql.Types.FLOAT:
+        return new FloatConverter();
+      case java.sql.Types.DOUBLE:
+        return new DoubleConverter();
+      case java.sql.Types.CHAR:
+      case java.sql.Types.VARCHAR:
+      case java.sql.Types.LONGVARCHAR:
+      case java.sql.Types.NCHAR:
+      case java.sql.Types.NVARCHAR:
+      case java.sql.Types.LONGNVARCHAR:
+        return new StringConverter();
+      case java.sql.Types.DATE:
+        return new DateConverter();
+      case java.sql.Types.TIME:
+        return new TimeConverter();
+      case java.sql.Types.TIMESTAMP:
+        return new TimestampConverter();
+      case java.sql.Types.BINARY:
+      case java.sql.Types.VARBINARY:
+      case java.sql.Types.LONGVARBINARY:
+        return new BinaryConverter();
+      case java.sql.Types.BLOB:
+        return new BlobConverter();
+      case java.sql.Types.BOOLEAN:
+        return new BooleanConverter();
+      default:
+        throw new SemanticException("Unsupported JDBC type: " + type);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/MySqlConnectorTableFunction.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/MySqlConnectorTableFunction.java
new file mode 100644
index 00000000000..3981d290b10
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/MySqlConnectorTableFunction.java
@@ -0,0 +1,251 @@
+package org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector;
+
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter.ResultSetConverter;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.TableFunction;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
+import 
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
+import org.apache.iotdb.udf.api.relational.table.argument.Argument;
+import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
+import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
+import 
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionLeafProcessor;
+import 
org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
+import 
org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification;
+import org.apache.iotdb.udf.api.type.Type;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.JDBCConnectionPool.translateJDBCTypeToUDFType;
+import static org.apache.iotdb.rpc.TSStatusCode.CLOSE_FAILED_IN_EXTERNAL_DB;
+import static 
org.apache.iotdb.rpc.TSStatusCode.EXECUTION_FAILED_IN_EXTERNAL_DB;
+
+public class MySqlConnectorTableFunction implements TableFunction {
+
+  private static class MySqlConnectorTableFunctionHandle implements 
TableFunctionHandle {
+    String sql;
+    String url;
+    String userName;
+    String password;
+    int[] types;
+
+    public MySqlConnectorTableFunctionHandle() {}
+
+    public MySqlConnectorTableFunctionHandle(
+        String sql, String url, String userName, String password, int[] types) 
{
+      this.sql = sql;
+      this.url = url;
+      this.userName = userName;
+      this.password = password;
+      this.types = types;
+    }
+
+    List<ResultSetConverter> getConverters() {
+      List<ResultSetConverter> converters = new ArrayList<>(types.length);
+      for (int type : types) {
+        converters.add(JDBCConnectionPool.getResultSetConverter(type));
+      }
+      return converters;
+    }
+
+    @Override
+    public byte[] serialize() {
+      try (PublicBAOS publicBAOS = new PublicBAOS();
+          DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
+        ReadWriteIOUtils.write(sql, outputStream);
+        ReadWriteIOUtils.write(url, outputStream);
+        ReadWriteIOUtils.write(userName, outputStream);
+        ReadWriteIOUtils.write(password, outputStream);
+        ReadWriteIOUtils.write(types.length, outputStream);
+        for (int type : types) {
+          ReadWriteIOUtils.write(type, outputStream);
+        }
+        outputStream.flush();
+        return publicBAOS.toByteArray();
+      } catch (IOException e) {
+        throw new IoTDBRuntimeException(
+            String.format(
+                "Error occurred while serializing 
MySqlConnectorTableFunctionHandle: %s",
+                e.getMessage()),
+            TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      }
+    }
+
+    @Override
+    public void deserialize(byte[] bytes) {
+      ByteBuffer buffer = ByteBuffer.wrap(bytes);
+      this.sql = ReadWriteIOUtils.readString(buffer);
+      this.url = ReadWriteIOUtils.readString(buffer);
+      this.userName = ReadWriteIOUtils.readString(buffer);
+      this.password = ReadWriteIOUtils.readString(buffer);
+      this.types = new int[ReadWriteIOUtils.readInt(buffer)];
+      for (int i = 0; i < types.length; i++) {
+        types[i] = ReadWriteIOUtils.readInt(buffer);
+      }
+    }
+  }
+
+  private static final String SQL = "SQL";
+  private static final String URL = "URL";
+  private static final String DEFAULT_URL = 
"jdbc:mysql://localhost:3306/test?";
+  private static final String USERNAME = "USERNAME";
+  private static final String DEFAULT_USERNAME = "root";
+  private static final String PASSWORD = "PASSWORD";
+  private static final String DEFAULT_PASSWORD = "root";
+
+  @Override
+  public List<ParameterSpecification> getArgumentsSpecifications() {
+    return Arrays.asList(
+        
ScalarParameterSpecification.builder().name(SQL).type(Type.STRING).build(),
+        ScalarParameterSpecification.builder()
+            .name(URL)
+            .type(Type.STRING)
+            .defaultValue(DEFAULT_URL)
+            .build(),
+        ScalarParameterSpecification.builder()
+            .name(USERNAME)
+            .type(Type.STRING)
+            .defaultValue(DEFAULT_USERNAME)
+            .build(),
+        ScalarParameterSpecification.builder()
+            .name(PASSWORD)
+            .type(Type.STRING)
+            .defaultValue(DEFAULT_PASSWORD)
+            .build());
+  }
+
+  @Override
+  public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws 
UDFException {
+
+    String sql = (String) ((ScalarArgument) arguments.get(SQL)).getValue();
+    String url = (String) ((ScalarArgument) arguments.get(URL)).getValue();
+    String userName = (String) ((ScalarArgument) 
arguments.get(USERNAME)).getValue();
+    String password = (String) ((ScalarArgument) 
arguments.get(PASSWORD)).getValue();
+
+    DescribedSchema.Builder schemaBuilder = DescribedSchema.builder();
+    int[] types;
+    try (Connection connection = JDBCConnectionPool.getConnection(url, 
userName, password);
+        PreparedStatement statement = connection.prepareStatement(sql)) {
+      ResultSetMetaData metaData = statement.getMetaData();
+      types = new int[metaData.getColumnCount()];
+      for (int i = 1, size = metaData.getColumnCount(); i <= size; i++) {
+        int type = metaData.getColumnType(i);
+        schemaBuilder.addField(metaData.getColumnName(i), 
translateJDBCTypeToUDFType(type));
+        types[i - 1] = type;
+      }
+    } catch (SQLException e) {
+      throw new SemanticException(e);
+    }
+    MySqlConnectorTableFunctionHandle handle =
+        new MySqlConnectorTableFunctionHandle(sql, url, userName, password, 
types);
+    return TableFunctionAnalysis.builder()
+        .properColumnSchema(schemaBuilder.build())
+        .handle(handle)
+        .build();
+  }
+
+  @Override
+  public TableFunctionHandle createTableFunctionHandle() {
+    return new MySqlConnectorTableFunctionHandle();
+  }
+
+  @Override
+  public TableFunctionProcessorProvider getProcessorProvider(
+      TableFunctionHandle tableFunctionHandle) {
+    return new TableFunctionProcessorProvider() {
+      @Override
+      public TableFunctionLeafProcessor getSplitProcessor() {
+        return new MysqlProcessor((MySqlConnectorTableFunctionHandle) 
tableFunctionHandle);
+      }
+    };
+  }
+
+  private static class MysqlProcessor implements TableFunctionLeafProcessor {
+    private static final int MAX_TSBLOCK_SIZE_IN_BYTES =
+        TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+    private static final int MAX_TSBLOCK_LINE_NUMBER =
+        TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+    private final MySqlConnectorTableFunctionHandle handle;
+    private final List<ResultSetConverter> converters;
+    private Connection connection;
+    private Statement statement;
+    private ResultSet resultSet;
+    private boolean finished = false;
+
+    MysqlProcessor(MySqlConnectorTableFunctionHandle handle) {
+      this.handle = handle;
+      this.converters = handle.getConverters();
+    }
+
+    @Override
+    public void beforeStart() {
+      try {
+        this.connection =
+            JDBCConnectionPool.getConnection(handle.url, handle.userName, 
handle.password);
+        this.statement = connection.createStatement();
+        this.resultSet = statement.executeQuery(handle.sql);
+      } catch (SQLException e) {
+        throw new IoTDBRuntimeException(e, 
EXECUTION_FAILED_IN_EXTERNAL_DB.getStatusCode(), true);
+      }
+    }
+
+    @Override
+    public void process(List<ColumnBuilder> columnBuilders) {
+      finished = true;
+      try {
+        int count = 0;
+        while (resultSet.next()) {
+          long retainedSize = 0;
+          for (int i = 0, size = columnBuilders.size(); i < size; i++) {
+            converters.get(i).append(resultSet, i + 1, columnBuilders.get(i));
+            retainedSize += columnBuilders.get(i).getRetainedSizeInBytes();
+          }
+          count++;
+          if (retainedSize >= MAX_TSBLOCK_SIZE_IN_BYTES || count >= 
MAX_TSBLOCK_LINE_NUMBER) {
+            finished = false;
+            break;
+          }
+        }
+      } catch (SQLException e) {
+        throw new IoTDBRuntimeException(e, 
EXECUTION_FAILED_IN_EXTERNAL_DB.getStatusCode(), true);
+      }
+    }
+
+    @Override
+    public boolean isFinish() {
+      return finished;
+    }
+
+    @Override
+    public void beforeDestroy() {
+      try {
+        if (resultSet != null) {
+          resultSet.close();
+        }
+        if (statement != null) {
+          statement.close();
+        }
+        if (connection != null) {
+          connection.close();
+        }
+      } catch (SQLException e) {
+        throw new IoTDBRuntimeException(e, 
CLOSE_FAILED_IN_EXTERNAL_DB.getStatusCode(), true);
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/BinaryConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/BinaryConverter.java
new file mode 100644
index 00000000000..8742244da0c
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/BinaryConverter.java
@@ -0,0 +1,20 @@
+package 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.utils.Binary;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class BinaryConverter implements ResultSetConverter {
+  @Override
+  public void append(ResultSet row, int columnIndex, ColumnBuilder 
properColumnBuilder)
+      throws SQLException {
+    byte[] value = row.getBytes(columnIndex);
+    if (value == null || row.wasNull()) {
+      properColumnBuilder.appendNull();
+    } else {
+      properColumnBuilder.writeBinary(new Binary(value));
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/BlobConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/BlobConverter.java
new file mode 100644
index 00000000000..7445d370600
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/BlobConverter.java
@@ -0,0 +1,21 @@
+package 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.utils.Binary;
+
+import java.sql.Blob;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class BlobConverter implements ResultSetConverter {
+  @Override
+  public void append(ResultSet row, int columnIndex, ColumnBuilder 
properColumnBuilder)
+      throws SQLException {
+    Blob blob = row.getBlob(columnIndex);
+    if (blob == null || row.wasNull()) {
+      properColumnBuilder.appendNull();
+    } else {
+      properColumnBuilder.writeBinary(new Binary(blob.getBytes(1, (int) 
blob.length())));
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/BooleanConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/BooleanConverter.java
new file mode 100644
index 00000000000..00b9edbea68
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/BooleanConverter.java
@@ -0,0 +1,19 @@
+package 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class BooleanConverter implements ResultSetConverter {
+  @Override
+  public void append(ResultSet row, int columnIndex, ColumnBuilder 
properColumnBuilder)
+      throws SQLException {
+    boolean value = row.getBoolean(columnIndex);
+    if (row.wasNull()) {
+      properColumnBuilder.appendNull();
+    } else {
+      properColumnBuilder.writeBoolean(value);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/DateConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/DateConverter.java
new file mode 100644
index 00000000000..c4bfd8798c4
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/DateConverter.java
@@ -0,0 +1,22 @@
+package 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class DateConverter implements ResultSetConverter {
+  @Override
+  public void append(ResultSet row, int columnIndex, ColumnBuilder 
properColumnBuilder)
+      throws SQLException {
+    java.sql.Date value = row.getDate(columnIndex);
+    if (value == null || row.wasNull()) {
+      properColumnBuilder.appendNull();
+    } else {
+      int year = value.getYear();
+      int month = value.getMonth();
+      int day = value.getDate();
+      properColumnBuilder.writeInt((year + 1900) * 10_000 + (month + 1) * 100 
+ day);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/DoubleConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/DoubleConverter.java
new file mode 100644
index 00000000000..9c6fcc07d5f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/DoubleConverter.java
@@ -0,0 +1,19 @@
+package 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class DoubleConverter implements ResultSetConverter {
+  @Override
+  public void append(ResultSet row, int columnIndex, ColumnBuilder 
properColumnBuilder)
+      throws SQLException {
+    double value = row.getDouble(columnIndex);
+    if (row.wasNull()) {
+      properColumnBuilder.appendNull();
+    } else {
+      properColumnBuilder.writeDouble(value);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/FloatConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/FloatConverter.java
new file mode 100644
index 00000000000..4be85a509f7
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/FloatConverter.java
@@ -0,0 +1,19 @@
+package 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class FloatConverter implements ResultSetConverter {
+  @Override
+  public void append(ResultSet row, int columnIndex, ColumnBuilder 
properColumnBuilder)
+      throws SQLException {
+    float value = row.getFloat(columnIndex);
+    if (row.wasNull()) {
+      properColumnBuilder.appendNull();
+    } else {
+      properColumnBuilder.writeFloat(value);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/Int32Converter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/Int32Converter.java
new file mode 100644
index 00000000000..a36f82390e3
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/Int32Converter.java
@@ -0,0 +1,19 @@
+package 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class Int32Converter implements ResultSetConverter {
+  @Override
+  public void append(ResultSet row, int columnIndex, ColumnBuilder 
properColumnBuilder)
+      throws SQLException {
+    int value = row.getInt(columnIndex);
+    if (row.wasNull()) {
+      properColumnBuilder.appendNull();
+    } else {
+      properColumnBuilder.writeInt(value);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/Int64Converter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/Int64Converter.java
new file mode 100644
index 00000000000..27369c6a11f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/Int64Converter.java
@@ -0,0 +1,19 @@
+package 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class Int64Converter implements ResultSetConverter {
+  @Override
+  public void append(ResultSet row, int columnIndex, ColumnBuilder 
properColumnBuilder)
+      throws SQLException {
+    long value = row.getLong(columnIndex);
+    if (row.wasNull()) {
+      properColumnBuilder.appendNull();
+    } else {
+      properColumnBuilder.writeLong(value);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/ResultSetConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/ResultSetConverter.java
new file mode 100644
index 00000000000..47638ca1927
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/ResultSetConverter.java
@@ -0,0 +1,11 @@
+package 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public interface ResultSetConverter {
+  void append(ResultSet row, int columnIndex, ColumnBuilder 
properColumnBuilder)
+      throws SQLException;
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/StringConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/StringConverter.java
new file mode 100644
index 00000000000..8d74d457a37
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/StringConverter.java
@@ -0,0 +1,22 @@
+package 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.utils.Binary;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class StringConverter implements ResultSetConverter {
+
+  @Override
+  public void append(ResultSet row, int columnIndex, ColumnBuilder 
properColumnBuilder)
+      throws SQLException {
+    String value = row.getString(columnIndex);
+    if (row.wasNull()) {
+      properColumnBuilder.appendNull();
+    } else {
+      properColumnBuilder.writeBinary(new Binary(value, 
TSFileConfig.STRING_CHARSET));
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/TimeConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/TimeConverter.java
new file mode 100644
index 00000000000..dc83dddb19f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/TimeConverter.java
@@ -0,0 +1,23 @@
+package 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter;
+
+import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.concurrent.TimeUnit;
+
+public class TimeConverter implements ResultSetConverter {
+  @Override
+  public void append(ResultSet row, int columnIndex, ColumnBuilder 
properColumnBuilder)
+      throws SQLException {
+    java.sql.Time value = row.getTime(columnIndex);
+    if (value == null || row.wasNull()) {
+      properColumnBuilder.appendNull();
+    } else {
+      properColumnBuilder.writeLong(
+          TimestampPrecisionUtils.convertToCurrPrecision(value.getTime(), 
TimeUnit.MILLISECONDS));
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/TimestampConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/TimestampConverter.java
new file mode 100644
index 00000000000..96ff79a452c
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/TimestampConverter.java
@@ -0,0 +1,23 @@
+package 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter;
+
+import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.concurrent.TimeUnit;
+
+public class TimestampConverter implements ResultSetConverter {
+  @Override
+  public void append(ResultSet row, int columnIndex, ColumnBuilder 
properColumnBuilder)
+      throws SQLException {
+    java.sql.Timestamp value = row.getTimestamp(columnIndex);
+    if (row.wasNull()) {
+      properColumnBuilder.appendNull();
+    } else {
+      properColumnBuilder.writeLong(
+          TimestampPrecisionUtils.convertToCurrPrecision(value.getTime(), 
TimeUnit.MILLISECONDS));
+    }
+  }
+}
diff --git a/pom.xml b/pom.xml
index ee463c157f7..7632de34dd9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -176,6 +176,7 @@
         <xz.version>1.9</xz.version>
         <zstd-jni.version>1.5.6-3</zstd-jni.version>
         <tsfile.version>2.1.0-250521-SNAPSHOT</tsfile.version>
+        <mysql.jdbc.version>9.3.0</mysql.jdbc.version>
     </properties>
     <!--
     if we claim dependencies in dependencyManagement, then we do not claim


Reply via email to