This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ai-code/flight-sql in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5d60e292e88cf7ca749854b370459c62253ded99 Author: JackieTien97 <[email protected]> AuthorDate: Thu Feb 12 20:57:26 2026 +0800 Add Arrow Flight SQL service as external service plugin - Add new external-service-impl/flight-sql module with Arrow 17.0.0 - Implement FlightSqlService (IExternalService lifecycle management) - Implement IoTDBFlightSqlProducer (SQL execution via Coordinator, TsBlock→Arrow streaming) - Implement TsBlockToArrowConverter supporting all 10 data types - Implement Bearer token authentication (FlightSqlAuthHandler, FlightSqlSessionManager) - Add enableArrowFlightSqlService and arrowFlightSqlPort config to IoTDBConfig - Add FLIGHT_SQL entry to BuiltinExternalServices - Add Arrow Flight SQL port allocation and configuration to IT framework (13 files) - Add IoTDBArrowFlightSqlIT with 5 integration test cases - Add unit tests for TsBlockToArrowConverter (16 test cases) --- external-service-impl/flight-sql/pom.xml | 121 +++++ .../apache/iotdb/flight/FlightSqlAuthHandler.java | 59 +++ .../iotdb/flight/FlightSqlAuthMiddleware.java | 89 ++++ .../org/apache/iotdb/flight/FlightSqlService.java | 134 ++++++ .../iotdb/flight/FlightSqlSessionManager.java | 149 ++++++ .../iotdb/flight/IoTDBFlightSqlProducer.java | 531 +++++++++++++++++++++ .../iotdb/flight/TsBlockToArrowConverter.java | 277 +++++++++++ .../iotdb/flight/TsBlockToArrowConverterTest.java | 493 +++++++++++++++++++ external-service-impl/pom.xml | 1 + integration-test/pom.xml | 10 + .../iotdb/it/env/cluster/ClusterConstant.java | 1 + .../it/env/cluster/config/MppCommonConfig.java | 6 + .../it/env/cluster/config/MppDataNodeConfig.java | 6 + .../env/cluster/config/MppSharedCommonConfig.java | 7 + .../iotdb/it/env/cluster/env/AbstractEnv.java | 13 +- .../iotdb/it/env/cluster/node/DataNodeWrapper.java | 9 + .../it/env/remote/config/RemoteCommonConfig.java | 5 + .../it/env/remote/config/RemoteDataNodeConfig.java | 5 + .../iotdb/it/env/remote/env/RemoteServerEnv.java | 5 + .../java/org/apache/iotdb/itbase/env/BaseEnv.java | 2 + .../org/apache/iotdb/itbase/env/CommonConfig.java | 2 + .../apache/iotdb/itbase/env/DataNodeConfig.java | 2 + .../it/flightsql/IoTDBArrowFlightSqlIT.java | 239 ++++++++++ .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 45 +- .../externalservice/BuiltinExternalServices.java | 6 +- pom.xml | 11 + 26 files changed, 2217 insertions(+), 11 deletions(-) diff --git a/external-service-impl/flight-sql/pom.xml b/external-service-impl/flight-sql/pom.xml new file mode 100644 index 00000000000..be363c336b7 --- /dev/null +++ b/external-service-impl/flight-sql/pom.xml @@ -0,0 +1,121 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.iotdb</groupId> + <artifactId>external-service-impl</artifactId> + <version>2.0.7-SNAPSHOT</version> + </parent> + <artifactId>flight-sql</artifactId> + <name>IoTDB: External-Service-Impl: Arrow Flight SQL</name> + <properties> + <maven.compiler.source>11</maven.compiler.source> + <maven.compiler.target>11</maven.compiler.target> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + <dependencies> + <!-- Arrow Flight SQL (bundled in jar-with-dependencies) --> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>flight-sql</artifactId> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-netty</artifactId> + <scope>runtime</scope> + </dependency> + <!-- IoTDB dependencies (provided at runtime by DataNode classloader) --> + <dependency> + <groupId>org.apache.iotdb</groupId> + <artifactId>iotdb-server</artifactId> + <scope>provided</scope> + <version>2.0.7-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.iotdb</groupId> + <artifactId>node-commons</artifactId> + <version>2.0.7-SNAPSHOT</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tsfile</groupId> + <artifactId>tsfile</artifactId> + <version>${tsfile.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tsfile</groupId> + <artifactId>common</artifactId> + <version>${tsfile.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>${maven.assembly.version}</version> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <!-- bind to the packaging phase --> + <goals> + <goal>single</goal> + </goals> + <!-- this is used for inheritance merges --> + <phase>package</phase> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <configuration> + <ignoredDependencies> + <ignoredDependency>org.apache.tsfile:common</ignoredDependency> + </ignoredDependencies> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java new file mode 100644 index 00000000000..b591665eab0 --- /dev/null +++ b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.flight; + +import org.apache.arrow.flight.CallStatus; +import org.apache.arrow.flight.auth2.BasicCallHeaderAuthenticator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Arrow Flight SQL credential validator using Arrow's built-in auth2 framework. Validates + * username/password credentials via IoTDB's SessionManager and returns a Bearer token string as the + * peer identity for subsequent requests. + * + * <p>Used with {@link BasicCallHeaderAuthenticator} and {@link + * org.apache.arrow.flight.auth2.GeneratedBearerTokenAuthenticator} to provide Basic → Bearer token + * authentication flow. + */ +public class FlightSqlAuthHandler implements BasicCallHeaderAuthenticator.CredentialValidator { + + private static final Logger LOGGER = LoggerFactory.getLogger(FlightSqlAuthHandler.class); + private final FlightSqlSessionManager sessionManager; + + public FlightSqlAuthHandler(FlightSqlSessionManager sessionManager) { + this.sessionManager = sessionManager; + } + + @Override + public org.apache.arrow.flight.auth2.CallHeaderAuthenticator.AuthResult validate( + String username, String password) { + LOGGER.debug("Validating credentials for user: {}", username); + + try { + String token = sessionManager.authenticate(username, password, "unknown"); + // Return the token as the peer identity; GeneratedBearerTokenAuthenticator + // wraps it in a Bearer token and sets it in the response header. + return () -> token; + } catch (SecurityException e) { + throw CallStatus.UNAUTHENTICATED.withDescription(e.getMessage()).toRuntimeException(); + } + } +} diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthMiddleware.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthMiddleware.java new file mode 100644 index 00000000000..2cf0d048556 --- /dev/null +++ b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthMiddleware.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.flight; + +import org.apache.arrow.flight.CallHeaders; +import org.apache.arrow.flight.CallInfo; +import org.apache.arrow.flight.CallStatus; +import org.apache.arrow.flight.FlightServerMiddleware; +import org.apache.arrow.flight.RequestContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Flight Server middleware for handling Bearer token / Basic authentication. Supports initial login + * via Basic auth header (username:password), returning a Bearer token. Subsequent requests use the + * Bearer token. + */ +public class FlightSqlAuthMiddleware implements FlightServerMiddleware { + + private static final Logger LOGGER = LoggerFactory.getLogger(FlightSqlAuthMiddleware.class); + + /** The middleware key used to retrieve this middleware in the CallContext. */ + public static final Key<FlightSqlAuthMiddleware> KEY = Key.of("flight-sql-auth-middleware"); + + private final CallHeaders incomingHeaders; + + FlightSqlAuthMiddleware(CallHeaders incomingHeaders) { + this.incomingHeaders = incomingHeaders; + } + + /** Returns the incoming call headers for session lookup. */ + public CallHeaders getCallHeaders() { + return incomingHeaders; + } + + @Override + public void onBeforeSendingHeaders(CallHeaders outgoingHeaders) { + // no-op: token is set during Handshake response, not here + } + + @Override + public void onCallCompleted(CallStatus status) { + // no-op + } + + @Override + public void onCallErrored(Throwable err) { + // no-op + } + + // ===================== Factory ===================== + + /** Factory that creates FlightSqlAuthMiddleware for each call. */ + public static class Factory implements FlightServerMiddleware.Factory<FlightSqlAuthMiddleware> { + + private final FlightSqlSessionManager sessionManager; + + public Factory(FlightSqlSessionManager sessionManager) { + this.sessionManager = sessionManager; + } + + @Override + public FlightSqlAuthMiddleware onCallStarted( + CallInfo callInfo, CallHeaders incomingHeaders, RequestContext context) { + return new FlightSqlAuthMiddleware(incomingHeaders); + } + + public FlightSqlSessionManager getSessionManager() { + return sessionManager; + } + } +} diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlService.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlService.java new file mode 100644 index 00000000000..96480449fcc --- /dev/null +++ b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlService.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.flight; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.externalservice.api.IExternalService; + +import org.apache.arrow.flight.FlightServer; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.auth2.BasicCallHeaderAuthenticator; +import org.apache.arrow.flight.auth2.GeneratedBearerTokenAuthenticator; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * Arrow Flight SQL service implementation for IoTDB. Implements the IExternalService interface to + * integrate with IoTDB's external service management framework (plugin lifecycle). + * + * <p>This service starts a gRPC-based Arrow Flight SQL server that allows clients to execute SQL + * queries using the Arrow Flight SQL protocol and receive results in columnar Arrow format. + */ +public class FlightSqlService implements IExternalService { + + private static final Logger LOGGER = LoggerFactory.getLogger(FlightSqlService.class); + private static final long SESSION_TIMEOUT_MINUTES = 30; + + private FlightServer flightServer; + private BufferAllocator allocator; + private FlightSqlSessionManager flightSessionManager; + private IoTDBFlightSqlProducer producer; + + @Override + public void start() { + int port = IoTDBDescriptor.getInstance().getConfig().getArrowFlightSqlPort(); + LOGGER.info("Starting Arrow Flight SQL service on port {}", port); + + try { + // Create the root allocator for Arrow memory management + allocator = new RootAllocator(Long.MAX_VALUE); + + // Create session manager with TTL + flightSessionManager = new FlightSqlSessionManager(SESSION_TIMEOUT_MINUTES); + + // Create the auth handler + FlightSqlAuthHandler authHandler = new FlightSqlAuthHandler(flightSessionManager); + + // Create the Flight SQL producer + producer = new IoTDBFlightSqlProducer(allocator, flightSessionManager); + + // Build the Flight server with auth2 Bearer token authentication + Location location = Location.forGrpcInsecure("0.0.0.0", port); + flightServer = + FlightServer.builder(allocator, location, producer) + .headerAuthenticator( + new GeneratedBearerTokenAuthenticator( + new BasicCallHeaderAuthenticator(authHandler))) + .build(); + + flightServer.start(); + LOGGER.info( + "Arrow Flight SQL service started successfully on port {}", flightServer.getPort()); + } catch (IOException e) { + LOGGER.error("Failed to start Arrow Flight SQL service", e); + stop(); + throw new RuntimeException("Failed to start Arrow Flight SQL service", e); + } + } + + @Override + public void stop() { + LOGGER.info("Stopping Arrow Flight SQL service"); + + if (flightServer != null) { + try { + flightServer.shutdown(); + flightServer.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.warn("Interrupted while waiting for Flight server shutdown", e); + Thread.currentThread().interrupt(); + try { + flightServer.close(); + } catch (Exception ex) { + LOGGER.warn("Error force-closing Flight server", ex); + } + } catch (Exception e) { + LOGGER.warn("Error shutting down Flight server", e); + } + flightServer = null; + } + + if (producer != null) { + try { + producer.close(); + } catch (Exception e) { + LOGGER.warn("Error closing Flight SQL producer", e); + } + producer = null; + } + + if (flightSessionManager != null) { + flightSessionManager.close(); + flightSessionManager = null; + } + + if (allocator != null) { + allocator.close(); + allocator = null; + } + + LOGGER.info("Arrow Flight SQL service stopped"); + } +} diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlSessionManager.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlSessionManager.java new file mode 100644 index 00000000000..ad829888f22 --- /dev/null +++ b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlSessionManager.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.flight; + +import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.InternalClientSession; +import org.apache.iotdb.db.protocol.session.SessionManager; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import org.apache.arrow.flight.CallHeaders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +/** + * Manages Arrow Flight SQL client sessions using Bearer token authentication. Maps Bearer tokens to + * IoTDB IClientSession objects with a TTL-based Caffeine cache. + */ +public class FlightSqlSessionManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(FlightSqlSessionManager.class); + private static final String AUTHORIZATION_HEADER = "authorization"; + private static final String BEARER_PREFIX = "Bearer "; + + private final SessionManager sessionManager = SessionManager.getInstance(); + + /** Cache of Bearer token -> IClientSession with configurable TTL. */ + private final Cache<String, IClientSession> tokenCache; + + public FlightSqlSessionManager(long sessionTimeoutMinutes) { + this.tokenCache = + Caffeine.newBuilder() + .expireAfterAccess(sessionTimeoutMinutes, TimeUnit.MINUTES) + .removalListener( + (String token, IClientSession session, RemovalCause cause) -> { + if (session != null && cause != RemovalCause.REPLACED) { + LOGGER.info("Flight SQL session expired, closing: {}", session); + sessionManager.closeSession( + session, + queryId -> + org.apache.iotdb.db.queryengine.plan.Coordinator.getInstance() + .cleanupQueryExecution(queryId), + false); + sessionManager.removeCurrSessionForMqtt(null); // handled via sessions map only + } + }) + .build(); + } + + /** + * Authenticates a user with username/password and returns a Bearer token. + * + * @param username the username + * @param password the password + * @param clientAddress the client's IP address + * @return the Bearer token if authentication succeeds + * @throws SecurityException if authentication fails + */ + public String authenticate(String username, String password, String clientAddress) { + // Create a session for this client + IClientSession session = new InternalClientSession("FlightSQL-" + clientAddress); + session.setSqlDialect(IClientSession.SqlDialect.TABLE); + + // Register the session before login (MQTT pattern) + sessionManager.registerSessionForMqtt(session); + + // Use SessionManager's login method + org.apache.iotdb.db.protocol.basic.BasicOpenSessionResp loginResp = + sessionManager.login( + session, + username, + password, + java.time.ZoneId.systemDefault().getId(), + SessionManager.CURRENT_RPC_VERSION, + IoTDBConstant.ClientVersion.V_1_0, + IClientSession.SqlDialect.TABLE); + + if (loginResp.getCode() != org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + // Remove the session if login failed + sessionManager.removeCurrSessionForMqtt(null); + throw new SecurityException("Authentication failed: " + loginResp.getMessage()); + } + + // Generate Bearer token and store in cache + String token = UUID.randomUUID().toString(); + tokenCache.put(token, session); + LOGGER.info("Flight SQL user '{}' authenticated, session: {}", username, session); + return token; + } + + /** + * Retrieves the IClientSession associated with a Bearer token from request headers. + * + * @param headers the Call headers containing the Authorization header + * @return the associated IClientSession + * @throws SecurityException if the token is invalid or expired + */ + public IClientSession getSession(CallHeaders headers) { + String authHeader = headers.get(AUTHORIZATION_HEADER); + if (authHeader == null || !authHeader.startsWith(BEARER_PREFIX)) { + throw new SecurityException("Missing or invalid Authorization header"); + } + String token = authHeader.substring(BEARER_PREFIX.length()); + return getSessionByToken(token); + } + + /** + * Retrieves the IClientSession associated with a Bearer token. + * + * @param token the Bearer token + * @return the associated IClientSession + * @throws SecurityException if the token is invalid or expired + */ + public IClientSession getSessionByToken(String token) { + IClientSession session = tokenCache.getIfPresent(token); + if (session == null) { + throw new SecurityException("Invalid or expired Bearer token"); + } + return session; + } + + /** Invalidates all sessions and cleans up resources. */ + public void close() { + tokenCache.invalidateAll(); + tokenCache.cleanUp(); + } +} diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java new file mode 100644 index 00000000000..067ccfb20d4 --- /dev/null +++ b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java @@ -0,0 +1,531 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.flight; + +import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.SessionManager; +import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; +import org.apache.iotdb.db.queryengine.plan.Coordinator; +import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; +import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; +import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement; +import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser; +import org.apache.iotdb.rpc.TSStatusCode; + +import com.google.protobuf.ByteString; +import org.apache.arrow.flight.CallStatus; +import org.apache.arrow.flight.Criteria; +import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightEndpoint; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.PutResult; +import org.apache.arrow.flight.Result; +import org.apache.arrow.flight.SchemaResult; +import org.apache.arrow.flight.Ticket; +import org.apache.arrow.flight.sql.FlightSqlProducer; +import org.apache.arrow.flight.sql.impl.FlightSql; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.tsfile.read.common.block.TsBlock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.time.ZoneId; +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Apache Arrow Flight SQL producer implementation for IoTDB. Handles SQL query execution via the + * Arrow Flight SQL protocol, using the Table model SQL dialect. + */ +public class IoTDBFlightSqlProducer implements FlightSqlProducer { + + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBFlightSqlProducer.class); + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + + private final BufferAllocator allocator; + private final FlightSqlSessionManager flightSessionManager; + private final Coordinator coordinator = Coordinator.getInstance(); + private final SessionManager sessionManager = SessionManager.getInstance(); + private final SqlParser sqlParser = new SqlParser(); + private final Metadata metadata = LocalExecutionPlanner.getInstance().metadata; + + /** Stores query execution context by queryId for streaming results via getStream. */ + private final ConcurrentHashMap<Long, QueryContext> activeQueries = new ConcurrentHashMap<>(); + + public IoTDBFlightSqlProducer( + BufferAllocator allocator, FlightSqlSessionManager flightSessionManager) { + this.allocator = allocator; + this.flightSessionManager = flightSessionManager; + } + + // ===================== Session Retrieval ===================== + + /** + * Retrieves the IClientSession for the current call. The auth2 framework sets the peer identity + * to the Bearer token after handshake. + */ + private IClientSession getSessionFromContext(CallContext context) { + String peerIdentity = context.peerIdentity(); + if (peerIdentity == null || peerIdentity.isEmpty()) { + throw CallStatus.UNAUTHENTICATED.withDescription("Not authenticated").toRuntimeException(); + } + try { + return flightSessionManager.getSessionByToken(peerIdentity); + } catch (SecurityException e) { + throw CallStatus.UNAUTHENTICATED.withDescription(e.getMessage()).toRuntimeException(); + } + } + + // ===================== SQL Query Execution ===================== + + @Override + public FlightInfo getFlightInfoStatement( + FlightSql.CommandStatementQuery command, CallContext context, FlightDescriptor descriptor) { + String sql = command.getQuery(); + LOGGER.debug("getFlightInfoStatement: {}", sql); + + IClientSession session = getSessionFromContext(context); + + Long queryId = null; + try { + queryId = sessionManager.requestQueryId(); + + // Parse the SQL statement + Statement statement = sqlParser.createStatement(sql, ZoneId.systemDefault(), session); + + // Execute via Coordinator (Table model) + ExecutionResult result = + coordinator.executeForTableModel( + statement, + sqlParser, + session, + queryId, + sessionManager.getSessionInfo(session), + sql, + metadata, + CONFIG.getQueryTimeoutThreshold(), + true); + + if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + throw CallStatus.INTERNAL + .withDescription("Query execution failed: " + result.status.getMessage()) + .toRuntimeException(); + } + + IQueryExecution queryExecution = coordinator.getQueryExecution(queryId); + if (queryExecution == null) { + throw CallStatus.INTERNAL + .withDescription("Query execution not found after execution") + .toRuntimeException(); + } + + DatasetHeader header = queryExecution.getDatasetHeader(); + Schema arrowSchema = TsBlockToArrowConverter.toArrowSchema(header); + + // Store the query context for later getStream calls + activeQueries.put(queryId, new QueryContext(queryExecution, header, session)); + + // Build ticket containing the queryId + byte[] ticketBytes = Long.toString(queryId).getBytes(StandardCharsets.UTF_8); + Ticket ticket = new Ticket(ticketBytes); + FlightEndpoint endpoint = new FlightEndpoint(ticket); + + return new FlightInfo(arrowSchema, descriptor, Collections.singletonList(endpoint), -1, -1); + + } catch (RuntimeException e) { + // Cleanup on error + if (queryId != null) { + coordinator.cleanupQueryExecution(queryId); + activeQueries.remove(queryId); + } + throw e; + } + } + + @Override + public SchemaResult getSchemaStatement( + FlightSql.CommandStatementQuery command, CallContext context, FlightDescriptor descriptor) { + throw CallStatus.UNIMPLEMENTED + .withDescription("getSchemaStatement not implemented") + .toRuntimeException(); + } + + @Override + public void getStreamStatement( + FlightSql.TicketStatementQuery ticketQuery, + CallContext context, + ServerStreamListener listener) { + ByteString handle = ticketQuery.getStatementHandle(); + long queryId = Long.parseLong(handle.toStringUtf8()); + streamQueryResults(queryId, listener); + } + + /** Streams query results for a given queryId as Arrow VectorSchemaRoot batches. */ + private void streamQueryResults(long queryId, ServerStreamListener listener) { + QueryContext ctx = activeQueries.get(queryId); + if (ctx == null) { + listener.error( + CallStatus.NOT_FOUND + .withDescription("Query not found for id: " + queryId) + .toRuntimeException()); + return; + } + + VectorSchemaRoot root = null; + try { + root = TsBlockToArrowConverter.createVectorSchemaRoot(ctx.header, allocator); + listener.start(root); + + while (true) { + Optional<TsBlock> optionalTsBlock = ctx.queryExecution.getBatchResult(); + if (!optionalTsBlock.isPresent() || optionalTsBlock.get().isEmpty()) { + break; + } + + TsBlock tsBlock = optionalTsBlock.get(); + root.clear(); + TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, ctx.header); + listener.putNext(); + } + + listener.completed(); + } catch (IoTDBException e) { + LOGGER.error("Error streaming query results for queryId={}", queryId, e); + listener.error(CallStatus.INTERNAL.withDescription(e.getMessage()).toRuntimeException()); + } finally { + coordinator.cleanupQueryExecution(queryId); + activeQueries.remove(queryId); + if (root != null) { + root.close(); + } + } + } + + // ===================== Generic Flight Methods ===================== + + @Override + public void listFlights( + CallContext context, Criteria criteria, StreamListener<FlightInfo> listener) { + listener.onCompleted(); + } + + // ===================== Prepared Statements ===================== + + @Override + public void createPreparedStatement( + FlightSql.ActionCreatePreparedStatementRequest request, + CallContext context, + StreamListener<Result> listener) { + listener.onError( + CallStatus.UNIMPLEMENTED + .withDescription("Prepared statements are not yet supported") + .toRuntimeException()); + } + + @Override + public void closePreparedStatement( + FlightSql.ActionClosePreparedStatementRequest request, + CallContext context, + StreamListener<Result> listener) { + listener.onError( + CallStatus.UNIMPLEMENTED + .withDescription("Prepared statements are not yet supported") + .toRuntimeException()); + } + + @Override + public FlightInfo getFlightInfoPreparedStatement( + FlightSql.CommandPreparedStatementQuery command, + CallContext context, + FlightDescriptor descriptor) { + throw CallStatus.UNIMPLEMENTED + .withDescription("Prepared statements are not yet supported") + .toRuntimeException(); + } + + @Override + public SchemaResult getSchemaPreparedStatement( + FlightSql.CommandPreparedStatementQuery command, + CallContext context, + FlightDescriptor descriptor) { + throw CallStatus.UNIMPLEMENTED + .withDescription("Prepared statements are not yet supported") + .toRuntimeException(); + } + + @Override + public void getStreamPreparedStatement( + FlightSql.CommandPreparedStatementQuery command, + CallContext context, + ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED + .withDescription("Prepared statements are not yet supported") + .toRuntimeException(); + } + + // ===================== DML/Update ===================== + + @Override + public Runnable acceptPutStatement( + FlightSql.CommandStatementUpdate command, + CallContext context, + FlightStream flightStream, + StreamListener<PutResult> ackStream) { + throw CallStatus.UNIMPLEMENTED + .withDescription("Statement updates are not yet supported") + .toRuntimeException(); + } + + @Override + public Runnable acceptPutPreparedStatementUpdate( + FlightSql.CommandPreparedStatementUpdate command, + CallContext context, + FlightStream flightStream, + StreamListener<PutResult> ackStream) { + throw CallStatus.UNIMPLEMENTED + .withDescription("Prepared statements are not yet supported") + .toRuntimeException(); + } + + @Override + public Runnable acceptPutPreparedStatementQuery( + FlightSql.CommandPreparedStatementQuery command, + CallContext context, + FlightStream flightStream, + StreamListener<PutResult> ackStream) { + throw CallStatus.UNIMPLEMENTED + .withDescription("Prepared statements are not yet supported") + .toRuntimeException(); + } + + // ===================== Catalog/Schema/Table Metadata ===================== + + @Override + public FlightInfo getFlightInfoSqlInfo( + FlightSql.CommandGetSqlInfo command, CallContext context, FlightDescriptor descriptor) { + throw CallStatus.UNIMPLEMENTED + .withDescription("GetSqlInfo not yet implemented") + .toRuntimeException(); + } + + @Override + public void getStreamSqlInfo( + FlightSql.CommandGetSqlInfo command, CallContext context, ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED + .withDescription("GetSqlInfo not yet implemented") + .toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoTypeInfo( + FlightSql.CommandGetXdbcTypeInfo command, CallContext context, FlightDescriptor descriptor) { + throw CallStatus.UNIMPLEMENTED + .withDescription("GetTypeInfo not yet implemented") + .toRuntimeException(); + } + + @Override + public void getStreamTypeInfo( + FlightSql.CommandGetXdbcTypeInfo command, + CallContext context, + ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED + .withDescription("GetTypeInfo not yet implemented") + .toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoCatalogs( + FlightSql.CommandGetCatalogs command, CallContext context, FlightDescriptor descriptor) { + throw CallStatus.UNIMPLEMENTED + .withDescription("GetCatalogs not yet implemented") + .toRuntimeException(); + } + + @Override + public void getStreamCatalogs(CallContext context, ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED + .withDescription("GetCatalogs not yet implemented") + .toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoSchemas( + FlightSql.CommandGetDbSchemas command, CallContext context, FlightDescriptor descriptor) { + throw CallStatus.UNIMPLEMENTED + .withDescription("GetSchemas not yet implemented") + .toRuntimeException(); + } + + @Override + public void getStreamSchemas( + FlightSql.CommandGetDbSchemas command, CallContext context, ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED + .withDescription("GetSchemas not yet implemented") + .toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoTables( + FlightSql.CommandGetTables command, CallContext context, FlightDescriptor descriptor) { + throw CallStatus.UNIMPLEMENTED + .withDescription("GetTables not yet implemented") + .toRuntimeException(); + } + + @Override + public void getStreamTables( + FlightSql.CommandGetTables command, CallContext context, ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED + .withDescription("GetTables not yet implemented") + .toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoTableTypes( + FlightSql.CommandGetTableTypes command, CallContext context, FlightDescriptor descriptor) { + throw CallStatus.UNIMPLEMENTED + .withDescription("GetTableTypes not yet implemented") + .toRuntimeException(); + } + + @Override + public void getStreamTableTypes(CallContext context, ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED + .withDescription("GetTableTypes not yet implemented") + .toRuntimeException(); + } + + // ===================== Keys ===================== + + @Override + public FlightInfo getFlightInfoPrimaryKeys( + FlightSql.CommandGetPrimaryKeys command, CallContext context, FlightDescriptor descriptor) { + throw CallStatus.UNIMPLEMENTED + .withDescription("GetPrimaryKeys not yet implemented") + .toRuntimeException(); + } + + @Override + public void getStreamPrimaryKeys( + FlightSql.CommandGetPrimaryKeys command, CallContext context, ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED + .withDescription("GetPrimaryKeys not yet implemented") + .toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoExportedKeys( + FlightSql.CommandGetExportedKeys command, CallContext context, FlightDescriptor descriptor) { + throw CallStatus.UNIMPLEMENTED + .withDescription("GetExportedKeys not yet implemented") + .toRuntimeException(); + } + + @Override + public void getStreamExportedKeys( + FlightSql.CommandGetExportedKeys command, + CallContext context, + ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED + .withDescription("GetExportedKeys not yet implemented") + .toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoImportedKeys( + FlightSql.CommandGetImportedKeys command, CallContext context, FlightDescriptor descriptor) { + throw CallStatus.UNIMPLEMENTED + .withDescription("GetImportedKeys not yet implemented") + .toRuntimeException(); + } + + @Override + public void getStreamImportedKeys( + FlightSql.CommandGetImportedKeys command, + CallContext context, + ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED + .withDescription("GetImportedKeys not yet implemented") + .toRuntimeException(); + } + + @Override + public FlightInfo getFlightInfoCrossReference( + FlightSql.CommandGetCrossReference command, + CallContext context, + FlightDescriptor descriptor) { + throw CallStatus.UNIMPLEMENTED + .withDescription("GetCrossReference not yet implemented") + .toRuntimeException(); + } + + @Override + public void getStreamCrossReference( + FlightSql.CommandGetCrossReference command, + CallContext context, + ServerStreamListener listener) { + throw CallStatus.UNIMPLEMENTED + .withDescription("GetCrossReference not yet implemented") + .toRuntimeException(); + } + + // ===================== Lifecycle ===================== + + @Override + public void close() throws Exception { + // Clean up all active queries + for (Long queryId : activeQueries.keySet()) { + try { + coordinator.cleanupQueryExecution(queryId); + } catch (Exception e) { + LOGGER.warn("Error cleaning up query {} during shutdown", queryId, e); + } + } + activeQueries.clear(); + } + + // ===================== Inner Classes ===================== + + /** Holds query execution context for streaming results. */ + private static class QueryContext { + + final IQueryExecution queryExecution; + final DatasetHeader header; + final IClientSession session; + + QueryContext(IQueryExecution queryExecution, DatasetHeader header, IClientSession session) { + this.queryExecution = queryExecution; + this.header = header; + this.session = session; + } + } +} diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/TsBlockToArrowConverter.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/TsBlockToArrowConverter.java new file mode 100644 index 00000000000..396f230d0b9 --- /dev/null +++ b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/TsBlockToArrowConverter.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.flight; + +import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.TimeStampMilliTZVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** Converts IoTDB TsBlock data to Apache Arrow VectorSchemaRoot format. */ +public class TsBlockToArrowConverter { + + private TsBlockToArrowConverter() { + // utility class + } + + /** Maps IoTDB TSDataType to Apache Arrow ArrowType. */ + public static ArrowType toArrowType(TSDataType tsDataType) { + switch (tsDataType) { + case BOOLEAN: + return ArrowType.Bool.INSTANCE; + case INT32: + return new ArrowType.Int(32, true); + case INT64: + return new ArrowType.Int(64, true); + case FLOAT: + return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE); + case DOUBLE: + return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE); + case TEXT: + case STRING: + return ArrowType.Utf8.INSTANCE; + case BLOB: + return ArrowType.Binary.INSTANCE; + case TIMESTAMP: + return new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC"); + case DATE: + return new ArrowType.Date(DateUnit.DAY); + default: + throw new IllegalArgumentException("Unsupported TSDataType: " + tsDataType); + } + } + + /** Builds an Arrow Schema from an IoTDB DatasetHeader. */ + public static Schema toArrowSchema(DatasetHeader header) { + List<String> columnNames = header.getRespColumns(); + List<TSDataType> dataTypes = header.getRespDataTypes(); + List<Field> fields = new ArrayList<>(columnNames.size()); + for (int i = 0; i < columnNames.size(); i++) { + ArrowType arrowType = toArrowType(dataTypes.get(i)); + fields.add(new Field(columnNames.get(i), new FieldType(true, arrowType, null), null)); + } + return new Schema(fields); + } + + /** Creates a new VectorSchemaRoot from an IoTDB DatasetHeader. */ + public static VectorSchemaRoot createVectorSchemaRoot( + DatasetHeader header, BufferAllocator allocator) { + Schema arrowSchema = toArrowSchema(header); + return VectorSchemaRoot.create(arrowSchema, allocator); + } + + /** + * Fills an existing VectorSchemaRoot with data from a TsBlock. + * + * @param root the VectorSchemaRoot to fill (cleared before filling) + * @param tsBlock the TsBlock containing the data + * @param header the DatasetHeader for column name to index mapping + */ + public static void fillVectorSchemaRoot( + VectorSchemaRoot root, TsBlock tsBlock, DatasetHeader header) { + + int rowCount = tsBlock.getPositionCount(); + List<String> columnNames = header.getRespColumns(); + List<TSDataType> dataTypes = header.getRespDataTypes(); + Map<String, Integer> headerMap = header.getColumnNameIndexMap(); + + root.allocateNew(); + + for (int colIdx = 0; colIdx < columnNames.size(); colIdx++) { + String colName = columnNames.get(colIdx); + Integer sourceIdx = headerMap.get(colName); + Column column = tsBlock.getColumn(sourceIdx); + TSDataType dataType = dataTypes.get(colIdx); + FieldVector fieldVector = root.getVector(colIdx); + + fillColumnVector(fieldVector, column, dataType, rowCount); + } + + root.setRowCount(rowCount); + } + + /** Fills an Arrow FieldVector from an IoTDB Column. */ + private static void fillColumnVector( + FieldVector fieldVector, Column column, TSDataType dataType, int rowCount) { + switch (dataType) { + case BOOLEAN: + fillBooleanVector((BitVector) fieldVector, column, rowCount); + break; + case INT32: + fillInt32Vector((IntVector) fieldVector, column, rowCount); + break; + case INT64: + fillInt64Vector((BigIntVector) fieldVector, column, rowCount); + break; + case FLOAT: + fillFloatVector((Float4Vector) fieldVector, column, rowCount); + break; + case DOUBLE: + fillDoubleVector((Float8Vector) fieldVector, column, rowCount); + break; + case TEXT: + case STRING: + fillTextVector((VarCharVector) fieldVector, column, rowCount); + break; + case BLOB: + fillBlobVector((VarBinaryVector) fieldVector, column, rowCount); + break; + case TIMESTAMP: + fillTimestampVector((TimeStampMilliTZVector) fieldVector, column, rowCount); + break; + case DATE: + fillDateVector((DateDayVector) fieldVector, column, rowCount); + break; + default: + throw new IllegalArgumentException("Unsupported TSDataType: " + dataType); + } + } + + private static void fillBooleanVector(BitVector vector, Column column, int rowCount) { + for (int i = 0; i < rowCount; i++) { + if (column.isNull(i)) { + vector.setNull(i); + } else { + vector.setSafe(i, column.getBoolean(i) ? 1 : 0); + } + } + vector.setValueCount(rowCount); + } + + private static void fillInt32Vector(IntVector vector, Column column, int rowCount) { + for (int i = 0; i < rowCount; i++) { + if (column.isNull(i)) { + vector.setNull(i); + } else { + vector.setSafe(i, column.getInt(i)); + } + } + vector.setValueCount(rowCount); + } + + private static void fillInt64Vector(BigIntVector vector, Column column, int rowCount) { + for (int i = 0; i < rowCount; i++) { + if (column.isNull(i)) { + vector.setNull(i); + } else { + vector.setSafe(i, column.getLong(i)); + } + } + vector.setValueCount(rowCount); + } + + private static void fillFloatVector(Float4Vector vector, Column column, int rowCount) { + for (int i = 0; i < rowCount; i++) { + if (column.isNull(i)) { + vector.setNull(i); + } else { + vector.setSafe(i, column.getFloat(i)); + } + } + vector.setValueCount(rowCount); + } + + private static void fillDoubleVector(Float8Vector vector, Column column, int rowCount) { + for (int i = 0; i < rowCount; i++) { + if (column.isNull(i)) { + vector.setNull(i); + } else { + vector.setSafe(i, column.getDouble(i)); + } + } + vector.setValueCount(rowCount); + } + + private static void fillTextVector(VarCharVector vector, Column column, int rowCount) { + for (int i = 0; i < rowCount; i++) { + if (column.isNull(i)) { + vector.setNull(i); + } else { + byte[] bytes = + column + .getBinary(i) + .getStringValue(TSFileConfig.STRING_CHARSET) + .getBytes(StandardCharsets.UTF_8); + vector.setSafe(i, bytes); + } + } + vector.setValueCount(rowCount); + } + + private static void fillBlobVector(VarBinaryVector vector, Column column, int rowCount) { + for (int i = 0; i < rowCount; i++) { + if (column.isNull(i)) { + vector.setNull(i); + } else { + vector.setSafe(i, column.getBinary(i).getValues()); + } + } + vector.setValueCount(rowCount); + } + + private static void fillTimestampVector( + TimeStampMilliTZVector vector, Column column, int rowCount) { + for (int i = 0; i < rowCount; i++) { + if (column.isNull(i)) { + vector.setNull(i); + } else { + vector.setSafe(i, column.getLong(i)); + } + } + vector.setValueCount(rowCount); + } + + private static void fillDateVector(DateDayVector vector, Column column, int rowCount) { + for (int i = 0; i < rowCount; i++) { + if (column.isNull(i)) { + vector.setNull(i); + } else { + vector.setSafe(i, column.getInt(i)); + } + } + vector.setValueCount(rowCount); + } +} diff --git a/external-service-impl/flight-sql/src/test/java/org/apache/iotdb/flight/TsBlockToArrowConverterTest.java b/external-service-impl/flight-sql/src/test/java/org/apache/iotdb/flight/TsBlockToArrowConverterTest.java new file mode 100644 index 00000000000..413e17e23b7 --- /dev/null +++ b/external-service-impl/flight-sql/src/test/java/org/apache/iotdb/flight/TsBlockToArrowConverterTest.java @@ -0,0 +1,493 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.flight; + +import org.apache.iotdb.commons.schema.column.ColumnHeader; +import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.TimeStampMilliTZVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.apache.tsfile.read.common.block.column.TimeColumn; +import org.apache.tsfile.utils.Binary; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.*; + +/** Unit tests for TsBlockToArrowConverter. */ +public class TsBlockToArrowConverterTest { + + private BufferAllocator allocator; + + @Before + public void setUp() { + allocator = new RootAllocator(Long.MAX_VALUE); + } + + @After + public void tearDown() { + allocator.close(); + } + + // ===================== toArrowType Tests ===================== + + @Test + public void testToArrowTypeMappings() { + assertTrue(TsBlockToArrowConverter.toArrowType(TSDataType.BOOLEAN) instanceof ArrowType.Bool); + + ArrowType int32Type = TsBlockToArrowConverter.toArrowType(TSDataType.INT32); + assertTrue(int32Type instanceof ArrowType.Int); + assertEquals(32, ((ArrowType.Int) int32Type).getBitWidth()); + assertTrue(((ArrowType.Int) int32Type).getIsSigned()); + + ArrowType int64Type = TsBlockToArrowConverter.toArrowType(TSDataType.INT64); + assertTrue(int64Type instanceof ArrowType.Int); + assertEquals(64, ((ArrowType.Int) int64Type).getBitWidth()); + assertTrue(((ArrowType.Int) int64Type).getIsSigned()); + + ArrowType floatType = TsBlockToArrowConverter.toArrowType(TSDataType.FLOAT); + assertTrue(floatType instanceof ArrowType.FloatingPoint); + assertEquals( + FloatingPointPrecision.SINGLE, ((ArrowType.FloatingPoint) floatType).getPrecision()); + + ArrowType doubleType = TsBlockToArrowConverter.toArrowType(TSDataType.DOUBLE); + assertTrue(doubleType instanceof ArrowType.FloatingPoint); + assertEquals( + FloatingPointPrecision.DOUBLE, ((ArrowType.FloatingPoint) doubleType).getPrecision()); + + assertTrue(TsBlockToArrowConverter.toArrowType(TSDataType.TEXT) instanceof ArrowType.Utf8); + assertTrue(TsBlockToArrowConverter.toArrowType(TSDataType.STRING) instanceof ArrowType.Utf8); + assertTrue(TsBlockToArrowConverter.toArrowType(TSDataType.BLOB) instanceof ArrowType.Binary); + + ArrowType tsType = TsBlockToArrowConverter.toArrowType(TSDataType.TIMESTAMP); + assertTrue(tsType instanceof ArrowType.Timestamp); + assertEquals(TimeUnit.MILLISECOND, ((ArrowType.Timestamp) tsType).getUnit()); + assertEquals("UTC", ((ArrowType.Timestamp) tsType).getTimezone()); + + ArrowType dateType = TsBlockToArrowConverter.toArrowType(TSDataType.DATE); + assertTrue(dateType instanceof ArrowType.Date); + assertEquals(DateUnit.DAY, ((ArrowType.Date) dateType).getUnit()); + } + + // ===================== toArrowSchema Tests ===================== + + @Test + public void testToArrowSchema() { + DatasetHeader header = buildHeader("col_bool", TSDataType.BOOLEAN, "col_int", TSDataType.INT32); + Schema schema = TsBlockToArrowConverter.toArrowSchema(header); + + assertEquals(2, schema.getFields().size()); + assertEquals("col_bool", schema.getFields().get(0).getName()); + assertTrue(schema.getFields().get(0).getType() instanceof ArrowType.Bool); + assertEquals("col_int", schema.getFields().get(1).getName()); + assertTrue(schema.getFields().get(1).getType() instanceof ArrowType.Int); + } + + // ===================== INT32 Conversion ===================== + + @Test + public void testFillInt32Values() { + DatasetHeader header = buildHeader("value", TSDataType.INT32); + TsBlock tsBlock = buildTsBlock(TSDataType.INT32, new Object[] {10, 20, 30}); + + try (VectorSchemaRoot root = + TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) { + TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header); + + assertEquals(3, root.getRowCount()); + IntVector vector = (IntVector) root.getVector(0); + assertEquals(10, vector.get(0)); + assertEquals(20, vector.get(1)); + assertEquals(30, vector.get(2)); + } + } + + @Test + public void testFillInt32WithNulls() { + DatasetHeader header = buildHeader("value", TSDataType.INT32); + TsBlock tsBlock = buildTsBlock(TSDataType.INT32, new Object[] {10, null, 30}); + + try (VectorSchemaRoot root = + TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) { + TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header); + + assertEquals(3, root.getRowCount()); + IntVector vector = (IntVector) root.getVector(0); + assertEquals(10, vector.get(0)); + assertTrue(vector.isNull(1)); + assertEquals(30, vector.get(2)); + } + } + + // ===================== INT64 Conversion ===================== + + @Test + public void testFillInt64Values() { + DatasetHeader header = buildHeader("value", TSDataType.INT64); + TsBlock tsBlock = buildTsBlock(TSDataType.INT64, new Object[] {100L, 200L, 300L}); + + try (VectorSchemaRoot root = + TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) { + TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header); + + assertEquals(3, root.getRowCount()); + BigIntVector vector = (BigIntVector) root.getVector(0); + assertEquals(100L, vector.get(0)); + assertEquals(200L, vector.get(1)); + assertEquals(300L, vector.get(2)); + } + } + + // ===================== FLOAT Conversion ===================== + + @Test + public void testFillFloatValues() { + DatasetHeader header = buildHeader("value", TSDataType.FLOAT); + TsBlock tsBlock = buildTsBlock(TSDataType.FLOAT, new Object[] {1.1f, 2.2f, 3.3f}); + + try (VectorSchemaRoot root = + TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) { + TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header); + + assertEquals(3, root.getRowCount()); + Float4Vector vector = (Float4Vector) root.getVector(0); + assertEquals(1.1f, vector.get(0), 0.001f); + assertEquals(2.2f, vector.get(1), 0.001f); + assertEquals(3.3f, vector.get(2), 0.001f); + } + } + + // ===================== DOUBLE Conversion ===================== + + @Test + public void testFillDoubleValues() { + DatasetHeader header = buildHeader("value", TSDataType.DOUBLE); + TsBlock tsBlock = buildTsBlock(TSDataType.DOUBLE, new Object[] {1.11, 2.22, 3.33}); + + try (VectorSchemaRoot root = + TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) { + TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header); + + assertEquals(3, root.getRowCount()); + Float8Vector vector = (Float8Vector) root.getVector(0); + assertEquals(1.11, vector.get(0), 0.0001); + assertEquals(2.22, vector.get(1), 0.0001); + assertEquals(3.33, vector.get(2), 0.0001); + } + } + + // ===================== BOOLEAN Conversion ===================== + + @Test + public void testFillBooleanValues() { + DatasetHeader header = buildHeader("value", TSDataType.BOOLEAN); + TsBlock tsBlock = buildTsBlock(TSDataType.BOOLEAN, new Object[] {true, false, true}); + + try (VectorSchemaRoot root = + TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) { + TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header); + + assertEquals(3, root.getRowCount()); + BitVector vector = (BitVector) root.getVector(0); + assertEquals(1, vector.get(0)); + assertEquals(0, vector.get(1)); + assertEquals(1, vector.get(2)); + } + } + + @Test + public void testFillBooleanWithNulls() { + DatasetHeader header = buildHeader("value", TSDataType.BOOLEAN); + TsBlock tsBlock = buildTsBlock(TSDataType.BOOLEAN, new Object[] {true, null, false}); + + try (VectorSchemaRoot root = + TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) { + TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header); + + assertEquals(3, root.getRowCount()); + BitVector vector = (BitVector) root.getVector(0); + assertEquals(1, vector.get(0)); + assertTrue(vector.isNull(1)); + assertEquals(0, vector.get(2)); + } + } + + // ===================== TEXT/STRING Conversion ===================== + + @Test + public void testFillTextValues() { + DatasetHeader header = buildHeader("value", TSDataType.TEXT); + TsBlock tsBlock = + buildTsBlock( + TSDataType.TEXT, + new Object[] { + new Binary("hello", StandardCharsets.UTF_8), + new Binary("world", StandardCharsets.UTF_8), + new Binary("IoTDB", StandardCharsets.UTF_8) + }); + + try (VectorSchemaRoot root = + TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) { + TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header); + + assertEquals(3, root.getRowCount()); + VarCharVector vector = (VarCharVector) root.getVector(0); + assertEquals("hello", new String(vector.get(0), StandardCharsets.UTF_8)); + assertEquals("world", new String(vector.get(1), StandardCharsets.UTF_8)); + assertEquals("IoTDB", new String(vector.get(2), StandardCharsets.UTF_8)); + } + } + + @Test + public void testFillTextWithNulls() { + DatasetHeader header = buildHeader("value", TSDataType.TEXT); + TsBlock tsBlock = + buildTsBlock( + TSDataType.TEXT, + new Object[] {new Binary("hello", StandardCharsets.UTF_8), null, null}); + + try (VectorSchemaRoot root = + TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) { + TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header); + + assertEquals(3, root.getRowCount()); + VarCharVector vector = (VarCharVector) root.getVector(0); + assertEquals("hello", new String(vector.get(0), StandardCharsets.UTF_8)); + assertTrue(vector.isNull(1)); + assertTrue(vector.isNull(2)); + } + } + + // ===================== BLOB Conversion ===================== + + @Test + public void testFillBlobValues() { + DatasetHeader header = buildHeader("value", TSDataType.BLOB); + byte[] bytes1 = {0x01, 0x02, 0x03}; + byte[] bytes2 = {0x04, 0x05}; + TsBlock tsBlock = + buildTsBlock(TSDataType.BLOB, new Object[] {new Binary(bytes1), new Binary(bytes2)}); + + try (VectorSchemaRoot root = + TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) { + TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header); + + assertEquals(2, root.getRowCount()); + VarBinaryVector vector = (VarBinaryVector) root.getVector(0); + assertArrayEquals(bytes1, vector.get(0)); + assertArrayEquals(bytes2, vector.get(1)); + } + } + + // ===================== TIMESTAMP Conversion ===================== + + @Test + public void testFillTimestampValues() { + DatasetHeader header = buildHeader("value", TSDataType.TIMESTAMP); + TsBlock tsBlock = + buildTsBlock( + TSDataType.TIMESTAMP, new Object[] {1609459200000L, 1609459260000L, 1609459320000L}); + + try (VectorSchemaRoot root = + TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) { + TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header); + + assertEquals(3, root.getRowCount()); + TimeStampMilliTZVector vector = (TimeStampMilliTZVector) root.getVector(0); + assertEquals(1609459200000L, vector.get(0)); + assertEquals(1609459260000L, vector.get(1)); + assertEquals(1609459320000L, vector.get(2)); + } + } + + // ===================== DATE Conversion ===================== + + @Test + public void testFillDateValues() { + DatasetHeader header = buildHeader("value", TSDataType.DATE); + // Epoch days: 2021-01-01 = 18628 + TsBlock tsBlock = buildTsBlock(TSDataType.DATE, new Object[] {18628, 18629, 18630}); + + try (VectorSchemaRoot root = + TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) { + TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header); + + assertEquals(3, root.getRowCount()); + DateDayVector vector = (DateDayVector) root.getVector(0); + assertEquals(18628, vector.get(0)); + assertEquals(18629, vector.get(1)); + assertEquals(18630, vector.get(2)); + } + } + + // ===================== Multi-Column Tests ===================== + + @Test + public void testFillMultipleColumns() { + List<ColumnHeader> headers = new ArrayList<>(); + headers.add(new ColumnHeader("int_col", TSDataType.INT32)); + headers.add(new ColumnHeader("double_col", TSDataType.DOUBLE)); + headers.add(new ColumnHeader("text_col", TSDataType.TEXT)); + DatasetHeader header = new DatasetHeader(headers, true); + List<String> colNames = header.getRespColumns(); + header.setTreeColumnToTsBlockIndexMap(colNames); + + List<TSDataType> types = Arrays.asList(TSDataType.INT32, TSDataType.DOUBLE, TSDataType.TEXT); + TsBlockBuilder builder = new TsBlockBuilder(types); + ColumnBuilder[] cols = builder.getValueColumnBuilders(); + + // Row 1 + cols[0].writeInt(42); + cols[1].writeDouble(3.14); + cols[2].writeBinary(new Binary("test", StandardCharsets.UTF_8)); + builder.declarePosition(); + + // Row 2 + cols[0].writeInt(100); + cols[1].writeDouble(2.71); + cols[2].writeBinary(new Binary("hello", StandardCharsets.UTF_8)); + builder.declarePosition(); + + TsBlock tsBlock = + builder.build( + new RunLengthEncodedColumn( + new TimeColumn(1, new long[] {0L}), builder.getPositionCount())); + + try (VectorSchemaRoot root = + TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) { + TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header); + + assertEquals(2, root.getRowCount()); + + IntVector intVec = (IntVector) root.getVector("int_col"); + assertEquals(42, intVec.get(0)); + assertEquals(100, intVec.get(1)); + + Float8Vector doubleVec = (Float8Vector) root.getVector("double_col"); + assertEquals(3.14, doubleVec.get(0), 0.001); + assertEquals(2.71, doubleVec.get(1), 0.001); + + VarCharVector textVec = (VarCharVector) root.getVector("text_col"); + assertEquals("test", new String(textVec.get(0), StandardCharsets.UTF_8)); + assertEquals("hello", new String(textVec.get(1), StandardCharsets.UTF_8)); + } + } + + // ===================== Empty TsBlock ===================== + + @Test + public void testFillEmptyTsBlock() { + DatasetHeader header = buildHeader("value", TSDataType.INT32); + TsBlock tsBlock = buildTsBlock(TSDataType.INT32, new Object[] {}); + + try (VectorSchemaRoot root = + TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) { + TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header); + assertEquals(0, root.getRowCount()); + } + } + + // ===================== Helper Methods ===================== + + /** Build a simple DatasetHeader with one or more columns (name, type pairs). */ + private DatasetHeader buildHeader(Object... nameTypePairs) { + List<ColumnHeader> columnHeaders = new ArrayList<>(); + for (int i = 0; i < nameTypePairs.length; i += 2) { + columnHeaders.add( + new ColumnHeader((String) nameTypePairs[i], (TSDataType) nameTypePairs[i + 1])); + } + DatasetHeader header = new DatasetHeader(columnHeaders, true); + List<String> colNames = header.getRespColumns(); + header.setTreeColumnToTsBlockIndexMap(colNames); + return header; + } + + /** + * Build a single-column TsBlock with the given data type and values. Null values in the array + * produce null entries in the TsBlock. + */ + private TsBlock buildTsBlock(TSDataType type, Object[] values) { + TsBlockBuilder builder = new TsBlockBuilder(java.util.Collections.singletonList(type)); + ColumnBuilder[] cols = builder.getValueColumnBuilders(); + + for (Object value : values) { + if (value == null) { + cols[0].appendNull(); + } else { + switch (type) { + case BOOLEAN: + cols[0].writeBoolean((Boolean) value); + break; + case INT32: + case DATE: + cols[0].writeInt((Integer) value); + break; + case INT64: + case TIMESTAMP: + cols[0].writeLong((Long) value); + break; + case FLOAT: + cols[0].writeFloat((Float) value); + break; + case DOUBLE: + cols[0].writeDouble((Double) value); + break; + case TEXT: + case STRING: + case BLOB: + cols[0].writeBinary((Binary) value); + break; + default: + throw new IllegalArgumentException("Unsupported type: " + type); + } + } + builder.declarePosition(); + } + + return builder.build( + new RunLengthEncodedColumn(new TimeColumn(1, new long[] {0L}), builder.getPositionCount())); + } +} diff --git a/external-service-impl/pom.xml b/external-service-impl/pom.xml index 04483b289f3..511a58f2bef 100644 --- a/external-service-impl/pom.xml +++ b/external-service-impl/pom.xml @@ -33,6 +33,7 @@ <module>mqtt</module> <module>rest</module> <module>rest-openapi</module> + <module>flight-sql</module> </modules> <properties> <maven.compiler.source>8</maven.compiler.source> diff --git a/integration-test/pom.xml b/integration-test/pom.xml index a4776a57c55..4ef730957e4 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -246,6 +246,16 @@ <!--We will integrate rest-jar-with-dependencies into lib by assembly plugin--> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>flight-sql</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-netty</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java index 82ed7976959..114556c8b68 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java @@ -170,6 +170,7 @@ public class ClusterConstant { public static final String PIPE_LIB_DIR = "pipe_lib_dir"; public static final String REST_SERVICE_PORT = "rest_service_port"; public static final String INFLUXDB_RPC_PORT = "influxdb_rpc_port"; + public static final String ARROW_FLIGHT_SQL_PORT = "arrow_flight_sql_port"; // ConfigNode public static final String CN_SYSTEM_DIR = "cn_system_dir"; diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java index 53aa395491b..d043cc4fbb2 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java @@ -315,6 +315,12 @@ public class MppCommonConfig extends MppBaseConfig implements CommonConfig { return this; } + @Override + public CommonConfig setEnableArrowFlightSqlService(boolean enableArrowFlightSqlService) { + setProperty("enable_arrow_flight_sql_service", String.valueOf(enableArrowFlightSqlService)); + return this; + } + @Override public CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter) { setProperty("mqtt_payload_formatter", String.valueOf(mqttPayloadFormatter)); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java index 5e418072a7d..8fcb242ca12 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java @@ -102,6 +102,12 @@ public class MppDataNodeConfig extends MppBaseConfig implements DataNodeConfig { return this; } + @Override + public DataNodeConfig setEnableArrowFlightSqlService(boolean enableArrowFlightSqlService) { + setProperty("enable_arrow_flight_sql_service", String.valueOf(enableArrowFlightSqlService)); + return this; + } + @Override public DataNodeConfig setMqttPayloadFormatter(String mqttPayloadFormatter) { setProperty("mqtt_payload_formatter", String.valueOf(mqttPayloadFormatter)); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java index ed8a8943303..b73b55510c0 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java @@ -316,6 +316,13 @@ public class MppSharedCommonConfig implements CommonConfig { return this; } + @Override + public CommonConfig setEnableArrowFlightSqlService(boolean enableArrowFlightSqlService) { + cnConfig.setEnableArrowFlightSqlService(enableArrowFlightSqlService); + dnConfig.setEnableArrowFlightSqlService(enableArrowFlightSqlService); + return this; + } + @Override public CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter) { cnConfig.setMqttPayloadFormatter(mqttPayloadFormatter); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 8308225a9af..d223b1e32ee 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -955,7 +955,8 @@ public abstract class AbstractEnv implements BaseEnv { } // use this to avoid some runtimeExceptions when try to get jdbc connections. - // because it is hard to add retry and handle exception when getting jdbc connections in + // because it is hard to add retry and handle exception when getting jdbc + // connections in // getWriteConnectionWithSpecifiedDataNode and getReadConnections. // so use this function to add retry when cluster is ready. // after retryCount times, if the jdbc can't connect, throw @@ -1426,7 +1427,8 @@ public abstract class AbstractEnv implements BaseEnv { final String endpoint = nodes.get(j).getIpAndPortString(); if (!nodeIds.containsKey(endpoint)) { // Node not exist - // Notice: Never modify this line, since the NodeLocation might be modified in IT + // Notice: Never modify this line, since the NodeLocation might be modified in + // IT errorMessages.add("The node " + nodes.get(j).getIpAndPortString() + " is not found!"); continue; } @@ -1463,6 +1465,13 @@ public abstract class AbstractEnv implements BaseEnv { .getMqttPort(); } + @Override + public int getArrowFlightSqlPort() { + return dataNodeWrapperList + .get(new Random(System.currentTimeMillis()).nextInt(dataNodeWrapperList.size())) + .getArrowFlightSqlPort(); + } + @Override public String getIP() { return dataNodeWrapperList.get(0).getIp(); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java index 96a0fbe27e0..c4111c0f5f6 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.List; import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS; +import static org.apache.iotdb.it.env.cluster.ClusterConstant.ARROW_FLIGHT_SQL_PORT; import static org.apache.iotdb.it.env.cluster.ClusterConstant.CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS; import static org.apache.iotdb.it.env.cluster.ClusterConstant.DATANODE_INIT_HEAP_SIZE; import static org.apache.iotdb.it.env.cluster.ClusterConstant.DATANODE_MAX_DIRECT_MEMORY_SIZE; @@ -78,6 +79,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper { private final int dataRegionConsensusPort; private final int schemaRegionConsensusPort; private final int mqttPort; + private final int arrowFlightSqlPort; private final int restServicePort; private final int pipeAirGapReceiverPort; @@ -103,6 +105,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper { this.schemaRegionConsensusPort = portList[4]; this.mqttPort = portList[5]; this.pipeAirGapReceiverPort = portList[6]; + this.arrowFlightSqlPort = portList[7]; this.restServicePort = portList[10] + 6000; this.defaultNodePropertiesFile = EnvUtils.getFilePathFromSysVar(DEFAULT_DATA_NODE_PROPERTIES, clusterIndex); @@ -120,6 +123,8 @@ public class DataNodeWrapper extends AbstractNodeWrapper { MQTT_DATA_PATH, getNodePath() + File.separator + "mqttData"); immutableCommonProperties.setProperty( PIPE_AIR_GAP_RECEIVER_PORT, String.valueOf(this.pipeAirGapReceiverPort)); + immutableCommonProperties.setProperty( + ARROW_FLIGHT_SQL_PORT, String.valueOf(this.arrowFlightSqlPort)); immutableNodeProperties.setProperty(REST_SERVICE_PORT, String.valueOf(restServicePort)); @@ -304,6 +309,10 @@ public class DataNodeWrapper extends AbstractNodeWrapper { return pipeAirGapReceiverPort; } + public int getArrowFlightSqlPort() { + return arrowFlightSqlPort; + } + public int getRestServicePort() { return restServicePort; } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java index 0c05af17f6a..db1a713ee09 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java @@ -221,6 +221,11 @@ public class RemoteCommonConfig implements CommonConfig { return this; } + @Override + public CommonConfig setEnableArrowFlightSqlService(boolean enableArrowFlightSqlService) { + return this; + } + @Override public CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter) { return this; diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java index bba4c964f95..d4526ad2834 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java @@ -64,6 +64,11 @@ public class RemoteDataNodeConfig implements DataNodeConfig { return this; } + @Override + public DataNodeConfig setEnableArrowFlightSqlService(boolean enableArrowFlightSqlService) { + return this; + } + @Override public DataNodeConfig setMqttPayloadFormatter(String mqttPayloadFormatter) { return this; diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java index 586eff60494..5ad15d43e44 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java @@ -480,6 +480,11 @@ public class RemoteServerEnv implements BaseEnv { throw new UnsupportedOperationException(); } + @Override + public int getArrowFlightSqlPort() { + throw new UnsupportedOperationException(); + } + @Override public String getIP() { return ip_addr; diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java index 8b32deb3ea8..43175676ab8 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java @@ -332,6 +332,8 @@ public interface BaseEnv { int getMqttPort(); + int getArrowFlightSqlPort(); + String getIP(); String getPort(); diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java index 95ac239e89b..d5af1068ec9 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java @@ -102,6 +102,8 @@ public interface CommonConfig { CommonConfig setEnableMQTTService(boolean enableMQTTService); + CommonConfig setEnableArrowFlightSqlService(boolean enableArrowFlightSqlService); + CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter); CommonConfig setSchemaEngineMode(String schemaEngineMode); diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java index d57015b1396..9cbdf393e8d 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java @@ -40,6 +40,8 @@ public interface DataNodeConfig { DataNodeConfig setEnableMQTTService(boolean enableMQTTService); + DataNodeConfig setEnableArrowFlightSqlService(boolean enableArrowFlightSqlService); + DataNodeConfig setMqttPayloadFormatter(String mqttPayloadFormatter); DataNodeConfig setLoadLastCacheStrategy(String strategyName); diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java new file mode 100644 index 00000000000..d931ca77999 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.relational.it.flightsql; + +import org.apache.iotdb.isession.ITableSession; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; +import org.apache.iotdb.itbase.env.BaseEnv; + +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightEndpoint; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.auth2.BasicAuthCredentialWriter; +import org.apache.arrow.flight.auth2.ClientBearerHeaderHandler; +import org.apache.arrow.flight.auth2.ClientIncomingAuthHeaderMiddleware; +import org.apache.arrow.flight.grpc.CredentialCallOption; +import org.apache.arrow.flight.sql.FlightSqlClient; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.*; + +/** + * Integration tests for Arrow Flight SQL service in IoTDB. Tests the end-to-end flow: client + * connects via Flight SQL protocol, authenticates, executes SQL queries, and receives + * Arrow-formatted results. + */ +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBArrowFlightSqlIT { + + private static final String DATABASE = "flightsql_test_db"; + private static final String USER = "root"; + private static final String PASSWORD = "root"; + + private BufferAllocator allocator; + private FlightClient flightClient; + private FlightSqlClient flightSqlClient; + private CredentialCallOption bearerToken; + + @Before + public void setUp() throws Exception { + // Configure and start the cluster with Arrow Flight SQL enabled + BaseEnv baseEnv = EnvFactory.getEnv(); + baseEnv.getConfig().getDataNodeConfig().setEnableArrowFlightSqlService(true); + baseEnv.initClusterEnvironment(); + + // Get the Flight SQL port from the data node + int port = EnvFactory.getEnv().getArrowFlightSqlPort(); + + // Create Arrow allocator and Flight client with Bearer token auth middleware + allocator = new RootAllocator(Long.MAX_VALUE); + Location location = Location.forGrpcInsecure("127.0.0.1", port); + + // The ClientIncomingAuthHeaderMiddleware captures the Bearer token from the + // auth handshake + ClientIncomingAuthHeaderMiddleware.Factory authFactory = + new ClientIncomingAuthHeaderMiddleware.Factory(new ClientBearerHeaderHandler()); + + flightClient = FlightClient.builder(allocator, location).intercept(authFactory).build(); + + // Authenticate: sends Basic credentials, server returns Bearer token + bearerToken = new CredentialCallOption(new BasicAuthCredentialWriter(USER, PASSWORD)); + + // Wrap in FlightSqlClient for Flight SQL protocol operations + flightSqlClient = new FlightSqlClient(flightClient); + + // Use the standard session to create the test database and table with data + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS " + DATABASE); + } + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE)) { + session.executeNonQueryStatement( + "CREATE TABLE test_table (" + + "id1 STRING TAG, " + + "s1 INT32 FIELD, " + + "s2 INT64 FIELD, " + + "s3 FLOAT FIELD, " + + "s4 DOUBLE FIELD, " + + "s5 BOOLEAN FIELD, " + + "s6 TEXT FIELD)"); + session.executeNonQueryStatement( + "INSERT INTO test_table(time, id1, s1, s2, s3, s4, s5, s6) " + + "VALUES(1, 'device1', 100, 1000, 1.5, 2.5, true, 'hello')"); + session.executeNonQueryStatement( + "INSERT INTO test_table(time, id1, s1, s2, s3, s4, s5, s6) " + + "VALUES(2, 'device1', 200, 2000, 3.5, 4.5, false, 'world')"); + session.executeNonQueryStatement( + "INSERT INTO test_table(time, id1, s1, s2, s3, s4, s5, s6) " + + "VALUES(3, 'device2', 300, 3000, 5.5, 6.5, true, 'iotdb')"); + } + } + + @After + public void tearDown() throws Exception { + if (flightSqlClient != null) { + try { + flightSqlClient.close(); + } catch (Exception e) { + // ignore + } + } + if (flightClient != null) { + try { + flightClient.close(); + } catch (Exception e) { + // ignore + } + } + if (allocator != null) { + allocator.close(); + } + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testQueryWithAllDataTypes() throws Exception { + FlightInfo flightInfo = + flightSqlClient.execute( + "SELECT time, id1, s1, s2, s3, s4, s5, s6 FROM test_table ORDER BY time", bearerToken); + + // Validate schema + Schema schema = flightInfo.getSchema(); + assertNotNull("Schema should not be null", schema); + List<Field> fields = schema.getFields(); + assertEquals("Should have 8 columns", 8, fields.size()); + + // Fetch all data + List<List<String>> rows = fetchAllRows(flightInfo); + assertEquals("Should have 3 rows", 3, rows.size()); + } + + @Test + public void testQueryWithFilter() throws Exception { + FlightInfo flightInfo = + flightSqlClient.execute( + "SELECT id1, s1 FROM test_table WHERE id1 = 'device1' ORDER BY time", bearerToken); + + List<List<String>> rows = fetchAllRows(flightInfo); + assertEquals("Should have 2 rows for device1", 2, rows.size()); + } + + @Test + public void testQueryWithAggregation() throws Exception { + FlightInfo flightInfo = + flightSqlClient.execute( + "SELECT id1, COUNT(*) as cnt, SUM(s1) as s1_sum " + + "FROM test_table GROUP BY id1 ORDER BY id1", + bearerToken); + + List<List<String>> rows = fetchAllRows(flightInfo); + assertEquals("Should have 2 groups", 2, rows.size()); + } + + @Test + public void testEmptyResult() throws Exception { + FlightInfo flightInfo = + flightSqlClient.execute("SELECT * FROM test_table WHERE id1 = 'nonexistent'", bearerToken); + + List<List<String>> rows = fetchAllRows(flightInfo); + assertEquals("Should have 0 rows", 0, rows.size()); + } + + @Test + public void testShowDatabases() throws Exception { + FlightInfo flightInfo = flightSqlClient.execute("SHOW DATABASES", bearerToken); + + List<List<String>> rows = fetchAllRows(flightInfo); + assertTrue("Should have at least 1 database", rows.size() >= 1); + + boolean found = false; + for (List<String> row : rows) { + for (String val : row) { + if (val.contains(DATABASE)) { + found = true; + break; + } + } + } + assertTrue("Should find test database " + DATABASE, found); + } + + /** + * Fetches all rows from all endpoints in a FlightInfo. Each row is a list of string + * representations of the column values. + */ + private List<List<String>> fetchAllRows(FlightInfo flightInfo) throws Exception { + List<List<String>> rows = new ArrayList<>(); + for (FlightEndpoint endpoint : flightInfo.getEndpoints()) { + try (FlightStream stream = flightSqlClient.getStream(endpoint.getTicket(), bearerToken)) { + while (stream.next()) { + VectorSchemaRoot root = stream.getRoot(); + int rowCount = root.getRowCount(); + for (int i = 0; i < rowCount; i++) { + List<String> row = new ArrayList<>(); + for (FieldVector vector : root.getFieldVectors()) { + Object value = vector.getObject(i); + row.add(value == null ? "null" : value.toString()); + } + rows.add(row); + } + } + } + } + return rows; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 98c15a2d9bf..3fef5d79fe1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -95,7 +95,7 @@ public class IoTDBConfig { // e.g., a31+/$%#&[]{}3e4, "a.b", 'a.b' private static final String NODE_NAME_MATCHER = "([^\n\t]+)"; - // e.g., .s1 + // e.g., .s1 private static final String PARTIAL_NODE_MATCHER = "[" + PATH_SEPARATOR + "]" + NODE_NAME_MATCHER; private static final String NODE_MATCHER = @@ -106,6 +106,12 @@ public class IoTDBConfig { /** Whether to enable the mqtt service. */ private boolean enableMQTTService = false; + /** Whether to enable the Arrow Flight SQL service. */ + private boolean enableArrowFlightSqlService = false; + + /** The Arrow Flight SQL service binding port. */ + private int arrowFlightSqlPort = 8904; + /** The mqtt service binding host. */ private String mqttHost = "127.0.0.1"; @@ -703,14 +709,16 @@ public class IoTDBConfig { private int compactionMaxAlignedSeriesNumInOneBatch = 10; /* - * How many thread will be set up to perform continuous queries. When <= 0, use max(1, CPU core number / 2). + * How many thread will be set up to perform continuous queries. When <= 0, use + * max(1, CPU core number / 2). */ private int continuousQueryThreadNum = Math.max(1, Runtime.getRuntime().availableProcessors() / 2); /* * Minimum every interval to perform continuous query. - * The every interval of continuous query instances should not be lower than this limit. + * The every interval of continuous query instances should not be lower than + * this limit. */ private long continuousQueryMinimumEveryInterval = 1000; @@ -789,7 +797,8 @@ public class IoTDBConfig { private int tagAttributeFlushInterval = 1000; // In one insert (one device, one timestamp, multiple measurements), - // if enable partial insert, one measurement failure will not impact other measurements + // if enable partial insert, one measurement failure will not impact other + // measurements private boolean enablePartialInsert = true; private boolean enable13DataInsertAdapt = false; @@ -1108,7 +1117,8 @@ public class IoTDBConfig { private int loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber = 4096; private int loadTsFileAnalyzeSchemaBatchFlushTableDeviceNumber = 4096; // For table model private long loadTsFileAnalyzeSchemaMemorySizeInBytes = - 0L; // 0 means that the decision will be adaptive based on the number of sequences + 0L; // 0 means that the decision will be adaptive based on the + // number of sequences private long loadTsFileTabletConversionBatchMemorySizeInBytes = 4096 * 1024; @@ -1449,7 +1459,8 @@ public class IoTDBConfig { SystemMetrics.getInstance().setDiskDirs(diskDirs); } - // if IOTDB_DATA_HOME is not set, then we keep dataHomeDir prefix being the same with IOTDB_HOME + // if IOTDB_DATA_HOME is not set, then we keep dataHomeDir prefix being the same + // with IOTDB_HOME // In this way, we can keep consistent with v0.13.0~2. public static String addDataHomeDir(final String dir) { String dataHomeDir = System.getProperty(IoTDBConstant.IOTDB_DATA_HOME, null); @@ -1518,7 +1529,8 @@ public class IoTDBConfig { public void setTierDataDirs(String[][] tierDataDirs) { formulateDataDirs(tierDataDirs); this.tierDataDirs = tierDataDirs; - // TODO(szywilliam): rewrite the logic here when ratis supports complete snapshot semantic + // TODO(szywilliam): rewrite the logic here when ratis supports complete + // snapshot semantic setRatisDataRegionSnapshotDir( tierDataDirs[0][0] + File.separator + IoTDBConstant.SNAPSHOT_FOLDER_NAME); } @@ -1730,7 +1742,8 @@ public class IoTDBConfig { public void checkMultiDirStrategyClassName() { confirmMultiDirStrategy(); for (String multiDirStrategy : CLUSTER_ALLOWED_MULTI_DIR_STRATEGIES) { - // If the multiDirStrategyClassName is one of cluster allowed strategy, the check is passed. + // If the multiDirStrategyClassName is one of cluster allowed strategy, the + // check is passed. if (multiDirStrategyClassName.equals(multiDirStrategy) || multiDirStrategyClassName.equals(MULTI_DIR_STRATEGY_PREFIX + multiDirStrategy)) { return; @@ -2535,6 +2548,22 @@ public class IoTDBConfig { this.enableMQTTService = enableMQTTService; } + public boolean isEnableArrowFlightSqlService() { + return enableArrowFlightSqlService; + } + + public void setEnableArrowFlightSqlService(boolean enableArrowFlightSqlService) { + this.enableArrowFlightSqlService = enableArrowFlightSqlService; + } + + public int getArrowFlightSqlPort() { + return arrowFlightSqlPort; + } + + public void setArrowFlightSqlPort(int arrowFlightSqlPort) { + this.arrowFlightSqlPort = arrowFlightSqlPort; + } + public String getMqttHost() { return mqttHost; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java index 157734aef56..820a5b75822 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java @@ -32,7 +32,11 @@ public enum BuiltinExternalServices { REST( "REST", "org.apache.iotdb.rest.RestService", - IoTDBRestServiceDescriptor.getInstance().getConfig()::isEnableRestService); + IoTDBRestServiceDescriptor.getInstance().getConfig()::isEnableRestService), + FLIGHT_SQL( + "FLIGHT_SQL", + "org.apache.iotdb.flight.FlightSqlService", + IoTDBDescriptor.getInstance().getConfig()::isEnableArrowFlightSqlService); private final String serviceName; private final String className; diff --git a/pom.xml b/pom.xml index 99ba9725ca6..334b89200b5 100644 --- a/pom.xml +++ b/pom.xml @@ -174,6 +174,7 @@ <xz.version>1.9</xz.version> <zstd-jni.version>1.5.6-3</zstd-jni.version> <tsfile.version>2.2.1-260206-SNAPSHOT</tsfile.version> + <arrow.version>17.0.0</arrow.version> </properties> <!-- if we claim dependencies in dependencyManagement, then we do not claim @@ -748,6 +749,16 @@ <artifactId>jackson-core</artifactId> <version>${jackson.version}</version> </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>flight-sql</artifactId> + <version>${arrow.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-netty</artifactId> + <version>${arrow.version}</version> + </dependency> </dependencies> </dependencyManagement> <build>
