This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/mysql-connector in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3ec82019cc22efd50acf07eb077b9347a99ec887 Author: JackieTien97 <[email protected]> AuthorDate: Thu Jun 5 21:15:18 2025 +0800 Support query from mysql --- .../processor/TableFunctionDataProcessor.java | 5 + .../processor/TableFunctionLeafProcessor.java | 5 + .../java/org/apache/iotdb/rpc/TSStatusCode.java | 2 + iotdb-core/datanode/pom.xml | 5 + .../function/TableFunctionLeafOperator.java | 4 +- .../process/function/TableFunctionOperator.java | 4 + .../function/tvf/connector/JDBCConnectionPool.java | 108 +++++++++ .../tvf/connector/MySqlConnectorTableFunction.java | 251 +++++++++++++++++++++ .../tvf/connector/converter/BinaryConverter.java | 20 ++ .../tvf/connector/converter/BlobConverter.java | 21 ++ .../tvf/connector/converter/BooleanConverter.java | 19 ++ .../tvf/connector/converter/DateConverter.java | 22 ++ .../tvf/connector/converter/DoubleConverter.java | 19 ++ .../tvf/connector/converter/FloatConverter.java | 19 ++ .../tvf/connector/converter/Int32Converter.java | 19 ++ .../tvf/connector/converter/Int64Converter.java | 19 ++ .../connector/converter/ResultSetConverter.java | 11 + .../tvf/connector/converter/StringConverter.java | 22 ++ .../tvf/connector/converter/TimeConverter.java | 23 ++ .../connector/converter/TimestampConverter.java | 23 ++ pom.xml | 1 + 21 files changed, 621 insertions(+), 1 deletion(-) diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionDataProcessor.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionDataProcessor.java index aa78e50f4d8..0fe40fc894b 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionDataProcessor.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionDataProcessor.java @@ -65,4 +65,9 @@ public interface TableFunctionDataProcessor { List<ColumnBuilder> properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { // do nothing } + + /** This method is mainly used to release the resources used in the UDF. */ + default void beforeDestroy() { + // do nothing + } } diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionLeafProcessor.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionLeafProcessor.java index 41750fc3b53..ab8763f3c4a 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionLeafProcessor.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionLeafProcessor.java @@ -39,4 +39,9 @@ public interface TableFunctionLeafProcessor { /** This method is called to determine if the processor has finished processing all data. */ boolean isFinish(); + + /** This method is mainly used to release the resources used in the UDF. */ + default void beforeDestroy() { + // do nothing + } } diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index 3f7eb8f6507..3b8367716bf 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -142,6 +142,8 @@ public enum TSStatusCode { QUERY_TIMEOUT(720), PLAN_FAILED_NETWORK_PARTITION(721), CANNOT_FETCH_FI_STATE(722), + EXECUTION_FAILED_IN_EXTERNAL_DB(723), + CLOSE_FAILED_IN_EXTERNAL_DB(723), // Arithmetic NUMERIC_VALUE_OUT_OF_RANGE(750), diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml index 0a4d9418916..c0ae646b895 100644 --- a/iotdb-core/datanode/pom.xml +++ b/iotdb-core/datanode/pom.xml @@ -313,6 +313,11 @@ <groupId>org.java-websocket</groupId> <artifactId>Java-WebSocket</artifactId> </dependency> + <dependency> + <groupId>com.mysql</groupId> + <artifactId>mysql-connector-j</artifactId> + <version>${mysql.jdbc.version}</version> + </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionLeafOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionLeafOperator.java index 47f2ec9de35..fcce22ea0f4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionLeafOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionLeafOperator.java @@ -82,7 +82,9 @@ public class TableFunctionLeafOperator implements ProcessOperator { } @Override - public void close() throws Exception {} + public void close() throws Exception { + processor.beforeDestroy(); + } @Override public boolean isFinished() throws Exception { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java index 4e9f7e436a5..c00e05ec17f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java @@ -174,6 +174,7 @@ public class TableFunctionOperator implements ProcessOperator { processor.finish(properColumnBuilders, passThroughIndexBuilder); resultTsBlocks.addAll(buildTsBlock(properColumnBuilders, passThroughIndexBuilder)); partitionCache.clear(); + processor.beforeDestroy(); processor = null; return resultTsBlocks.poll(); } else { @@ -259,6 +260,9 @@ public class TableFunctionOperator implements ProcessOperator { public void close() throws Exception { partitionCache.close(); inputOperator.close(); + if (processor != null) { + processor.beforeDestroy(); + } } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/JDBCConnectionPool.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/JDBCConnectionPool.java new file mode 100644 index 00000000000..2f7d0c8d821 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/JDBCConnectionPool.java @@ -0,0 +1,108 @@ +package org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector; + +import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter.*; +import org.apache.iotdb.udf.api.type.Type; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +public class JDBCConnectionPool { + + private static final Logger LOGGER = LoggerFactory.getLogger(JDBCConnectionPool.class); + + private JDBCConnectionPool() {} + + static { + try { + Class.forName("com.mysql.cj.jdbc.Driver").newInstance(); + } catch (Exception e) { + LOGGER.warn("Failed to initialize mysql JDBC driver", e); + } + } + + public static Connection getConnection(String url, String userName, String password) + throws SQLException { + return DriverManager.getConnection(url, userName, password); + } + + private static class ConnectionWrapper {} + + public static Type translateJDBCTypeToUDFType(int type) { + switch (type) { + case java.sql.Types.TINYINT: + case java.sql.Types.SMALLINT: + case java.sql.Types.INTEGER: + return Type.INT32; + case java.sql.Types.BIGINT: + return Type.INT64; + case java.sql.Types.FLOAT: + return Type.FLOAT; + case java.sql.Types.DOUBLE: + return Type.DOUBLE; + case java.sql.Types.CHAR: + case java.sql.Types.VARCHAR: + case java.sql.Types.LONGVARCHAR: + case java.sql.Types.NCHAR: + case java.sql.Types.NVARCHAR: + case java.sql.Types.LONGNVARCHAR: + return Type.STRING; + case java.sql.Types.DATE: + return Type.DATE; + case java.sql.Types.TIME: + case java.sql.Types.TIMESTAMP: + return Type.TIMESTAMP; + case java.sql.Types.BINARY: + case java.sql.Types.VARBINARY: + case java.sql.Types.LONGVARBINARY: + case java.sql.Types.BLOB: + return Type.BLOB; + case java.sql.Types.BOOLEAN: + return Type.BOOLEAN; + default: + throw new SemanticException("Unsupported JDBC type: " + type); + } + } + + public static ResultSetConverter getResultSetConverter(int type) { + switch (type) { + case java.sql.Types.TINYINT: + case java.sql.Types.SMALLINT: + case java.sql.Types.INTEGER: + return new Int32Converter(); + case java.sql.Types.BIGINT: + return new Int64Converter(); + case java.sql.Types.FLOAT: + return new FloatConverter(); + case java.sql.Types.DOUBLE: + return new DoubleConverter(); + case java.sql.Types.CHAR: + case java.sql.Types.VARCHAR: + case java.sql.Types.LONGVARCHAR: + case java.sql.Types.NCHAR: + case java.sql.Types.NVARCHAR: + case java.sql.Types.LONGNVARCHAR: + return new StringConverter(); + case java.sql.Types.DATE: + return new DateConverter(); + case java.sql.Types.TIME: + return new TimeConverter(); + case java.sql.Types.TIMESTAMP: + return new TimestampConverter(); + case java.sql.Types.BINARY: + case java.sql.Types.VARBINARY: + case java.sql.Types.LONGVARBINARY: + return new BinaryConverter(); + case java.sql.Types.BLOB: + return new BlobConverter(); + case java.sql.Types.BOOLEAN: + return new BooleanConverter(); + default: + throw new SemanticException("Unsupported JDBC type: " + type); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/MySqlConnectorTableFunction.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/MySqlConnectorTableFunction.java new file mode 100644 index 00000000000..3981d290b10 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/MySqlConnectorTableFunction.java @@ -0,0 +1,251 @@ +package org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector; + +import org.apache.iotdb.commons.exception.IoTDBRuntimeException; +import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter.ResultSetConverter; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.relational.TableFunction; +import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; +import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; +import org.apache.iotdb.udf.api.relational.table.argument.Argument; +import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; +import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument; +import org.apache.iotdb.udf.api.relational.table.processor.TableFunctionLeafProcessor; +import org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification; +import org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification; +import org.apache.iotdb.udf.api.type.Type; + +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.utils.PublicBAOS; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.sql.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.JDBCConnectionPool.translateJDBCTypeToUDFType; +import static org.apache.iotdb.rpc.TSStatusCode.CLOSE_FAILED_IN_EXTERNAL_DB; +import static org.apache.iotdb.rpc.TSStatusCode.EXECUTION_FAILED_IN_EXTERNAL_DB; + +public class MySqlConnectorTableFunction implements TableFunction { + + private static class MySqlConnectorTableFunctionHandle implements TableFunctionHandle { + String sql; + String url; + String userName; + String password; + int[] types; + + public MySqlConnectorTableFunctionHandle() {} + + public MySqlConnectorTableFunctionHandle( + String sql, String url, String userName, String password, int[] types) { + this.sql = sql; + this.url = url; + this.userName = userName; + this.password = password; + this.types = types; + } + + List<ResultSetConverter> getConverters() { + List<ResultSetConverter> converters = new ArrayList<>(types.length); + for (int type : types) { + converters.add(JDBCConnectionPool.getResultSetConverter(type)); + } + return converters; + } + + @Override + public byte[] serialize() { + try (PublicBAOS publicBAOS = new PublicBAOS(); + DataOutputStream outputStream = new DataOutputStream(publicBAOS)) { + ReadWriteIOUtils.write(sql, outputStream); + ReadWriteIOUtils.write(url, outputStream); + ReadWriteIOUtils.write(userName, outputStream); + ReadWriteIOUtils.write(password, outputStream); + ReadWriteIOUtils.write(types.length, outputStream); + for (int type : types) { + ReadWriteIOUtils.write(type, outputStream); + } + outputStream.flush(); + return publicBAOS.toByteArray(); + } catch (IOException e) { + throw new IoTDBRuntimeException( + String.format( + "Error occurred while serializing MySqlConnectorTableFunctionHandle: %s", + e.getMessage()), + TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + } + } + + @Override + public void deserialize(byte[] bytes) { + ByteBuffer buffer = ByteBuffer.wrap(bytes); + this.sql = ReadWriteIOUtils.readString(buffer); + this.url = ReadWriteIOUtils.readString(buffer); + this.userName = ReadWriteIOUtils.readString(buffer); + this.password = ReadWriteIOUtils.readString(buffer); + this.types = new int[ReadWriteIOUtils.readInt(buffer)]; + for (int i = 0; i < types.length; i++) { + types[i] = ReadWriteIOUtils.readInt(buffer); + } + } + } + + private static final String SQL = "SQL"; + private static final String URL = "URL"; + private static final String DEFAULT_URL = "jdbc:mysql://localhost:3306/test?"; + private static final String USERNAME = "USERNAME"; + private static final String DEFAULT_USERNAME = "root"; + private static final String PASSWORD = "PASSWORD"; + private static final String DEFAULT_PASSWORD = "root"; + + @Override + public List<ParameterSpecification> getArgumentsSpecifications() { + return Arrays.asList( + ScalarParameterSpecification.builder().name(SQL).type(Type.STRING).build(), + ScalarParameterSpecification.builder() + .name(URL) + .type(Type.STRING) + .defaultValue(DEFAULT_URL) + .build(), + ScalarParameterSpecification.builder() + .name(USERNAME) + .type(Type.STRING) + .defaultValue(DEFAULT_USERNAME) + .build(), + ScalarParameterSpecification.builder() + .name(PASSWORD) + .type(Type.STRING) + .defaultValue(DEFAULT_PASSWORD) + .build()); + } + + @Override + public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws UDFException { + + String sql = (String) ((ScalarArgument) arguments.get(SQL)).getValue(); + String url = (String) ((ScalarArgument) arguments.get(URL)).getValue(); + String userName = (String) ((ScalarArgument) arguments.get(USERNAME)).getValue(); + String password = (String) ((ScalarArgument) arguments.get(PASSWORD)).getValue(); + + DescribedSchema.Builder schemaBuilder = DescribedSchema.builder(); + int[] types; + try (Connection connection = JDBCConnectionPool.getConnection(url, userName, password); + PreparedStatement statement = connection.prepareStatement(sql)) { + ResultSetMetaData metaData = statement.getMetaData(); + types = new int[metaData.getColumnCount()]; + for (int i = 1, size = metaData.getColumnCount(); i <= size; i++) { + int type = metaData.getColumnType(i); + schemaBuilder.addField(metaData.getColumnName(i), translateJDBCTypeToUDFType(type)); + types[i - 1] = type; + } + } catch (SQLException e) { + throw new SemanticException(e); + } + MySqlConnectorTableFunctionHandle handle = + new MySqlConnectorTableFunctionHandle(sql, url, userName, password, types); + return TableFunctionAnalysis.builder() + .properColumnSchema(schemaBuilder.build()) + .handle(handle) + .build(); + } + + @Override + public TableFunctionHandle createTableFunctionHandle() { + return new MySqlConnectorTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { + return new TableFunctionProcessorProvider() { + @Override + public TableFunctionLeafProcessor getSplitProcessor() { + return new MysqlProcessor((MySqlConnectorTableFunctionHandle) tableFunctionHandle); + } + }; + } + + private static class MysqlProcessor implements TableFunctionLeafProcessor { + private static final int MAX_TSBLOCK_SIZE_IN_BYTES = + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + private static final int MAX_TSBLOCK_LINE_NUMBER = + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(); + private final MySqlConnectorTableFunctionHandle handle; + private final List<ResultSetConverter> converters; + private Connection connection; + private Statement statement; + private ResultSet resultSet; + private boolean finished = false; + + MysqlProcessor(MySqlConnectorTableFunctionHandle handle) { + this.handle = handle; + this.converters = handle.getConverters(); + } + + @Override + public void beforeStart() { + try { + this.connection = + JDBCConnectionPool.getConnection(handle.url, handle.userName, handle.password); + this.statement = connection.createStatement(); + this.resultSet = statement.executeQuery(handle.sql); + } catch (SQLException e) { + throw new IoTDBRuntimeException(e, EXECUTION_FAILED_IN_EXTERNAL_DB.getStatusCode(), true); + } + } + + @Override + public void process(List<ColumnBuilder> columnBuilders) { + finished = true; + try { + int count = 0; + while (resultSet.next()) { + long retainedSize = 0; + for (int i = 0, size = columnBuilders.size(); i < size; i++) { + converters.get(i).append(resultSet, i + 1, columnBuilders.get(i)); + retainedSize += columnBuilders.get(i).getRetainedSizeInBytes(); + } + count++; + if (retainedSize >= MAX_TSBLOCK_SIZE_IN_BYTES || count >= MAX_TSBLOCK_LINE_NUMBER) { + finished = false; + break; + } + } + } catch (SQLException e) { + throw new IoTDBRuntimeException(e, EXECUTION_FAILED_IN_EXTERNAL_DB.getStatusCode(), true); + } + } + + @Override + public boolean isFinish() { + return finished; + } + + @Override + public void beforeDestroy() { + try { + if (resultSet != null) { + resultSet.close(); + } + if (statement != null) { + statement.close(); + } + if (connection != null) { + connection.close(); + } + } catch (SQLException e) { + throw new IoTDBRuntimeException(e, CLOSE_FAILED_IN_EXTERNAL_DB.getStatusCode(), true); + } + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/BinaryConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/BinaryConverter.java new file mode 100644 index 00000000000..8742244da0c --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/BinaryConverter.java @@ -0,0 +1,20 @@ +package org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter; + +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.utils.Binary; + +import java.sql.ResultSet; +import java.sql.SQLException; + +public class BinaryConverter implements ResultSetConverter { + @Override + public void append(ResultSet row, int columnIndex, ColumnBuilder properColumnBuilder) + throws SQLException { + byte[] value = row.getBytes(columnIndex); + if (value == null || row.wasNull()) { + properColumnBuilder.appendNull(); + } else { + properColumnBuilder.writeBinary(new Binary(value)); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/BlobConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/BlobConverter.java new file mode 100644 index 00000000000..7445d370600 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/BlobConverter.java @@ -0,0 +1,21 @@ +package org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter; + +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.utils.Binary; + +import java.sql.Blob; +import java.sql.ResultSet; +import java.sql.SQLException; + +public class BlobConverter implements ResultSetConverter { + @Override + public void append(ResultSet row, int columnIndex, ColumnBuilder properColumnBuilder) + throws SQLException { + Blob blob = row.getBlob(columnIndex); + if (blob == null || row.wasNull()) { + properColumnBuilder.appendNull(); + } else { + properColumnBuilder.writeBinary(new Binary(blob.getBytes(1, (int) blob.length()))); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/BooleanConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/BooleanConverter.java new file mode 100644 index 00000000000..00b9edbea68 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/BooleanConverter.java @@ -0,0 +1,19 @@ +package org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter; + +import org.apache.tsfile.block.column.ColumnBuilder; + +import java.sql.ResultSet; +import java.sql.SQLException; + +public class BooleanConverter implements ResultSetConverter { + @Override + public void append(ResultSet row, int columnIndex, ColumnBuilder properColumnBuilder) + throws SQLException { + boolean value = row.getBoolean(columnIndex); + if (row.wasNull()) { + properColumnBuilder.appendNull(); + } else { + properColumnBuilder.writeBoolean(value); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/DateConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/DateConverter.java new file mode 100644 index 00000000000..c4bfd8798c4 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/DateConverter.java @@ -0,0 +1,22 @@ +package org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter; + +import org.apache.tsfile.block.column.ColumnBuilder; + +import java.sql.ResultSet; +import java.sql.SQLException; + +public class DateConverter implements ResultSetConverter { + @Override + public void append(ResultSet row, int columnIndex, ColumnBuilder properColumnBuilder) + throws SQLException { + java.sql.Date value = row.getDate(columnIndex); + if (value == null || row.wasNull()) { + properColumnBuilder.appendNull(); + } else { + int year = value.getYear(); + int month = value.getMonth(); + int day = value.getDate(); + properColumnBuilder.writeInt((year + 1900) * 10_000 + (month + 1) * 100 + day); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/DoubleConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/DoubleConverter.java new file mode 100644 index 00000000000..9c6fcc07d5f --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/DoubleConverter.java @@ -0,0 +1,19 @@ +package org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter; + +import org.apache.tsfile.block.column.ColumnBuilder; + +import java.sql.ResultSet; +import java.sql.SQLException; + +public class DoubleConverter implements ResultSetConverter { + @Override + public void append(ResultSet row, int columnIndex, ColumnBuilder properColumnBuilder) + throws SQLException { + double value = row.getDouble(columnIndex); + if (row.wasNull()) { + properColumnBuilder.appendNull(); + } else { + properColumnBuilder.writeDouble(value); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/FloatConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/FloatConverter.java new file mode 100644 index 00000000000..4be85a509f7 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/FloatConverter.java @@ -0,0 +1,19 @@ +package org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter; + +import org.apache.tsfile.block.column.ColumnBuilder; + +import java.sql.ResultSet; +import java.sql.SQLException; + +public class FloatConverter implements ResultSetConverter { + @Override + public void append(ResultSet row, int columnIndex, ColumnBuilder properColumnBuilder) + throws SQLException { + float value = row.getFloat(columnIndex); + if (row.wasNull()) { + properColumnBuilder.appendNull(); + } else { + properColumnBuilder.writeFloat(value); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/Int32Converter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/Int32Converter.java new file mode 100644 index 00000000000..a36f82390e3 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/Int32Converter.java @@ -0,0 +1,19 @@ +package org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter; + +import org.apache.tsfile.block.column.ColumnBuilder; + +import java.sql.ResultSet; +import java.sql.SQLException; + +public class Int32Converter implements ResultSetConverter { + @Override + public void append(ResultSet row, int columnIndex, ColumnBuilder properColumnBuilder) + throws SQLException { + int value = row.getInt(columnIndex); + if (row.wasNull()) { + properColumnBuilder.appendNull(); + } else { + properColumnBuilder.writeInt(value); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/Int64Converter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/Int64Converter.java new file mode 100644 index 00000000000..27369c6a11f --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/Int64Converter.java @@ -0,0 +1,19 @@ +package org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter; + +import org.apache.tsfile.block.column.ColumnBuilder; + +import java.sql.ResultSet; +import java.sql.SQLException; + +public class Int64Converter implements ResultSetConverter { + @Override + public void append(ResultSet row, int columnIndex, ColumnBuilder properColumnBuilder) + throws SQLException { + long value = row.getLong(columnIndex); + if (row.wasNull()) { + properColumnBuilder.appendNull(); + } else { + properColumnBuilder.writeLong(value); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/ResultSetConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/ResultSetConverter.java new file mode 100644 index 00000000000..47638ca1927 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/ResultSetConverter.java @@ -0,0 +1,11 @@ +package org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter; + +import org.apache.tsfile.block.column.ColumnBuilder; + +import java.sql.ResultSet; +import java.sql.SQLException; + +public interface ResultSetConverter { + void append(ResultSet row, int columnIndex, ColumnBuilder properColumnBuilder) + throws SQLException; +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/StringConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/StringConverter.java new file mode 100644 index 00000000000..8d74d457a37 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/StringConverter.java @@ -0,0 +1,22 @@ +package org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter; + +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.utils.Binary; + +import java.sql.ResultSet; +import java.sql.SQLException; + +public class StringConverter implements ResultSetConverter { + + @Override + public void append(ResultSet row, int columnIndex, ColumnBuilder properColumnBuilder) + throws SQLException { + String value = row.getString(columnIndex); + if (row.wasNull()) { + properColumnBuilder.appendNull(); + } else { + properColumnBuilder.writeBinary(new Binary(value, TSFileConfig.STRING_CHARSET)); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/TimeConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/TimeConverter.java new file mode 100644 index 00000000000..dc83dddb19f --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/TimeConverter.java @@ -0,0 +1,23 @@ +package org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter; + +import org.apache.iotdb.db.utils.TimestampPrecisionUtils; + +import org.apache.tsfile.block.column.ColumnBuilder; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.concurrent.TimeUnit; + +public class TimeConverter implements ResultSetConverter { + @Override + public void append(ResultSet row, int columnIndex, ColumnBuilder properColumnBuilder) + throws SQLException { + java.sql.Time value = row.getTime(columnIndex); + if (value == null || row.wasNull()) { + properColumnBuilder.appendNull(); + } else { + properColumnBuilder.writeLong( + TimestampPrecisionUtils.convertToCurrPrecision(value.getTime(), TimeUnit.MILLISECONDS)); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/TimestampConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/TimestampConverter.java new file mode 100644 index 00000000000..96ff79a452c --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/connector/converter/TimestampConverter.java @@ -0,0 +1,23 @@ +package org.apache.iotdb.db.queryengine.plan.relational.function.tvf.connector.converter; + +import org.apache.iotdb.db.utils.TimestampPrecisionUtils; + +import org.apache.tsfile.block.column.ColumnBuilder; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.concurrent.TimeUnit; + +public class TimestampConverter implements ResultSetConverter { + @Override + public void append(ResultSet row, int columnIndex, ColumnBuilder properColumnBuilder) + throws SQLException { + java.sql.Timestamp value = row.getTimestamp(columnIndex); + if (row.wasNull()) { + properColumnBuilder.appendNull(); + } else { + properColumnBuilder.writeLong( + TimestampPrecisionUtils.convertToCurrPrecision(value.getTime(), TimeUnit.MILLISECONDS)); + } + } +} diff --git a/pom.xml b/pom.xml index ee463c157f7..7632de34dd9 100644 --- a/pom.xml +++ b/pom.xml @@ -176,6 +176,7 @@ <xz.version>1.9</xz.version> <zstd-jni.version>1.5.6-3</zstd-jni.version> <tsfile.version>2.1.0-250521-SNAPSHOT</tsfile.version> + <mysql.jdbc.version>9.3.0</mysql.jdbc.version> </properties> <!-- if we claim dependencies in dependencyManagement, then we do not claim
