This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ai-code/flight-sql
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ai-code/flight-sql by this
push:
new 25b5eb5a9ec Add Arrow Flight SQL service as external service plugin -
Add new external-service-impl/flight-sql module with Arrow 17.0.0 - Implement
FlightSqlService (IExternalService lifecycle management) - Implement
IoTDBFlightSqlProducer (SQL execution via Coordinator, TsBlock→Arrow streaming)
- Implement TsBlockToArrowConverter supporting all 10 data types - Implement
Bearer token authentication (FlightSqlAuthHandler, FlightSqlSessionManager) -
Add enableArrowFlightSqlService [...]
25b5eb5a9ec is described below
commit 25b5eb5a9ec8c70c18fb53b276679c730e345242
Author: JackieTien97 <[email protected]>
AuthorDate: Thu Feb 12 20:57:26 2026 +0800
Add Arrow Flight SQL service as external service plugin
- Add new external-service-impl/flight-sql module with Arrow 17.0.0
- Implement FlightSqlService (IExternalService lifecycle management)
- Implement IoTDBFlightSqlProducer (SQL execution via Coordinator,
TsBlock→Arrow streaming)
- Implement TsBlockToArrowConverter supporting all 10 data types
- Implement Bearer token authentication (FlightSqlAuthHandler,
FlightSqlSessionManager)
- Add enableArrowFlightSqlService and arrowFlightSqlPort config to
IoTDBConfig
- Add FLIGHT_SQL entry to BuiltinExternalServices
- Add Arrow Flight SQL port allocation and configuration to IT framework
(13 files)
- Add IoTDBArrowFlightSqlIT with 5 integration test cases
- Add unit tests for TsBlockToArrowConverter (16 test cases)
---
external-service-impl/flight-sql/pom.xml | 121 +++++
.../apache/iotdb/flight/FlightSqlAuthHandler.java | 59 +++
.../iotdb/flight/FlightSqlAuthMiddleware.java | 89 ++++
.../org/apache/iotdb/flight/FlightSqlService.java | 134 ++++++
.../iotdb/flight/FlightSqlSessionManager.java | 149 ++++++
.../iotdb/flight/IoTDBFlightSqlProducer.java | 531 +++++++++++++++++++++
.../iotdb/flight/TsBlockToArrowConverter.java | 277 +++++++++++
.../iotdb/flight/TsBlockToArrowConverterTest.java | 493 +++++++++++++++++++
external-service-impl/pom.xml | 1 +
integration-test/pom.xml | 10 +
.../iotdb/it/env/cluster/ClusterConstant.java | 1 +
.../it/env/cluster/config/MppCommonConfig.java | 6 +
.../it/env/cluster/config/MppDataNodeConfig.java | 6 +
.../env/cluster/config/MppSharedCommonConfig.java | 7 +
.../iotdb/it/env/cluster/env/AbstractEnv.java | 13 +-
.../iotdb/it/env/cluster/node/DataNodeWrapper.java | 9 +
.../it/env/remote/config/RemoteCommonConfig.java | 5 +
.../it/env/remote/config/RemoteDataNodeConfig.java | 5 +
.../iotdb/it/env/remote/env/RemoteServerEnv.java | 5 +
.../java/org/apache/iotdb/itbase/env/BaseEnv.java | 2 +
.../org/apache/iotdb/itbase/env/CommonConfig.java | 2 +
.../apache/iotdb/itbase/env/DataNodeConfig.java | 2 +
.../it/flightsql/IoTDBArrowFlightSqlIT.java | 239 ++++++++++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 45 +-
.../externalservice/BuiltinExternalServices.java | 6 +-
pom.xml | 11 +
26 files changed, 2217 insertions(+), 11 deletions(-)
diff --git a/external-service-impl/flight-sql/pom.xml
b/external-service-impl/flight-sql/pom.xml
new file mode 100644
index 00000000000..be363c336b7
--- /dev/null
+++ b/external-service-impl/flight-sql/pom.xml
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>external-service-impl</artifactId>
+ <version>2.0.7-SNAPSHOT</version>
+ </parent>
+ <artifactId>flight-sql</artifactId>
+ <name>IoTDB: External-Service-Impl: Arrow Flight SQL</name>
+ <properties>
+ <maven.compiler.source>11</maven.compiler.source>
+ <maven.compiler.target>11</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+ <dependencies>
+ <!-- Arrow Flight SQL (bundled in jar-with-dependencies) -->
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>flight-sql</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-netty</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <!-- IoTDB dependencies (provided at runtime by DataNode classloader)
-->
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-server</artifactId>
+ <scope>provided</scope>
+ <version>2.0.7-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>node-commons</artifactId>
+ <version>2.0.7-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tsfile</groupId>
+ <artifactId>tsfile</artifactId>
+ <version>${tsfile.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tsfile</groupId>
+ <artifactId>common</artifactId>
+ <version>${tsfile.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>${maven.assembly.version}</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <!-- bind to the packaging phase -->
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <!-- this is used for inheritance merges -->
+ <phase>package</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <ignoredDependencies>
+
<ignoredDependency>org.apache.tsfile:common</ignoredDependency>
+ </ignoredDependencies>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java
new file mode 100644
index 00000000000..b591665eab0
--- /dev/null
+++
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flight;
+
+import org.apache.arrow.flight.CallStatus;
+import org.apache.arrow.flight.auth2.BasicCallHeaderAuthenticator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Arrow Flight SQL credential validator using Arrow's built-in auth2
framework. Validates
+ * username/password credentials via IoTDB's SessionManager and returns a
Bearer token string as the
+ * peer identity for subsequent requests.
+ *
+ * <p>Used with {@link BasicCallHeaderAuthenticator} and {@link
+ * org.apache.arrow.flight.auth2.GeneratedBearerTokenAuthenticator} to provide
Basic → Bearer token
+ * authentication flow.
+ */
+public class FlightSqlAuthHandler implements
BasicCallHeaderAuthenticator.CredentialValidator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FlightSqlAuthHandler.class);
+ private final FlightSqlSessionManager sessionManager;
+
+ public FlightSqlAuthHandler(FlightSqlSessionManager sessionManager) {
+ this.sessionManager = sessionManager;
+ }
+
+ @Override
+ public org.apache.arrow.flight.auth2.CallHeaderAuthenticator.AuthResult
validate(
+ String username, String password) {
+ LOGGER.debug("Validating credentials for user: {}", username);
+
+ try {
+ String token = sessionManager.authenticate(username, password,
"unknown");
+ // Return the token as the peer identity;
GeneratedBearerTokenAuthenticator
+ // wraps it in a Bearer token and sets it in the response header.
+ return () -> token;
+ } catch (SecurityException e) {
+ throw
CallStatus.UNAUTHENTICATED.withDescription(e.getMessage()).toRuntimeException();
+ }
+ }
+}
diff --git
a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthMiddleware.java
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthMiddleware.java
new file mode 100644
index 00000000000..2cf0d048556
--- /dev/null
+++
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthMiddleware.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flight;
+
+import org.apache.arrow.flight.CallHeaders;
+import org.apache.arrow.flight.CallInfo;
+import org.apache.arrow.flight.CallStatus;
+import org.apache.arrow.flight.FlightServerMiddleware;
+import org.apache.arrow.flight.RequestContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flight Server middleware for handling Bearer token / Basic authentication.
Supports initial login
+ * via Basic auth header (username:password), returning a Bearer token.
Subsequent requests use the
+ * Bearer token.
+ */
+public class FlightSqlAuthMiddleware implements FlightServerMiddleware {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FlightSqlAuthMiddleware.class);
+
+ /** The middleware key used to retrieve this middleware in the CallContext.
*/
+ public static final Key<FlightSqlAuthMiddleware> KEY =
Key.of("flight-sql-auth-middleware");
+
+ private final CallHeaders incomingHeaders;
+
+ FlightSqlAuthMiddleware(CallHeaders incomingHeaders) {
+ this.incomingHeaders = incomingHeaders;
+ }
+
+ /** Returns the incoming call headers for session lookup. */
+ public CallHeaders getCallHeaders() {
+ return incomingHeaders;
+ }
+
+ @Override
+ public void onBeforeSendingHeaders(CallHeaders outgoingHeaders) {
+ // no-op: token is set during Handshake response, not here
+ }
+
+ @Override
+ public void onCallCompleted(CallStatus status) {
+ // no-op
+ }
+
+ @Override
+ public void onCallErrored(Throwable err) {
+ // no-op
+ }
+
+ // ===================== Factory =====================
+
+ /** Factory that creates FlightSqlAuthMiddleware for each call. */
+ public static class Factory implements
FlightServerMiddleware.Factory<FlightSqlAuthMiddleware> {
+
+ private final FlightSqlSessionManager sessionManager;
+
+ public Factory(FlightSqlSessionManager sessionManager) {
+ this.sessionManager = sessionManager;
+ }
+
+ @Override
+ public FlightSqlAuthMiddleware onCallStarted(
+ CallInfo callInfo, CallHeaders incomingHeaders, RequestContext
context) {
+ return new FlightSqlAuthMiddleware(incomingHeaders);
+ }
+
+ public FlightSqlSessionManager getSessionManager() {
+ return sessionManager;
+ }
+ }
+}
diff --git
a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlService.java
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlService.java
new file mode 100644
index 00000000000..96480449fcc
--- /dev/null
+++
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlService.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flight;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.externalservice.api.IExternalService;
+
+import org.apache.arrow.flight.FlightServer;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.flight.auth2.BasicCallHeaderAuthenticator;
+import org.apache.arrow.flight.auth2.GeneratedBearerTokenAuthenticator;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Arrow Flight SQL service implementation for IoTDB. Implements the
IExternalService interface to
+ * integrate with IoTDB's external service management framework (plugin
lifecycle).
+ *
+ * <p>This service starts a gRPC-based Arrow Flight SQL server that allows
clients to execute SQL
+ * queries using the Arrow Flight SQL protocol and receive results in columnar
Arrow format.
+ */
+public class FlightSqlService implements IExternalService {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FlightSqlService.class);
+ private static final long SESSION_TIMEOUT_MINUTES = 30;
+
+ private FlightServer flightServer;
+ private BufferAllocator allocator;
+ private FlightSqlSessionManager flightSessionManager;
+ private IoTDBFlightSqlProducer producer;
+
+ @Override
+ public void start() {
+ int port =
IoTDBDescriptor.getInstance().getConfig().getArrowFlightSqlPort();
+ LOGGER.info("Starting Arrow Flight SQL service on port {}", port);
+
+ try {
+ // Create the root allocator for Arrow memory management
+ allocator = new RootAllocator(Long.MAX_VALUE);
+
+ // Create session manager with TTL
+ flightSessionManager = new
FlightSqlSessionManager(SESSION_TIMEOUT_MINUTES);
+
+ // Create the auth handler
+ FlightSqlAuthHandler authHandler = new
FlightSqlAuthHandler(flightSessionManager);
+
+ // Create the Flight SQL producer
+ producer = new IoTDBFlightSqlProducer(allocator, flightSessionManager);
+
+ // Build the Flight server with auth2 Bearer token authentication
+ Location location = Location.forGrpcInsecure("0.0.0.0", port);
+ flightServer =
+ FlightServer.builder(allocator, location, producer)
+ .headerAuthenticator(
+ new GeneratedBearerTokenAuthenticator(
+ new BasicCallHeaderAuthenticator(authHandler)))
+ .build();
+
+ flightServer.start();
+ LOGGER.info(
+ "Arrow Flight SQL service started successfully on port {}",
flightServer.getPort());
+ } catch (IOException e) {
+ LOGGER.error("Failed to start Arrow Flight SQL service", e);
+ stop();
+ throw new RuntimeException("Failed to start Arrow Flight SQL service",
e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ LOGGER.info("Stopping Arrow Flight SQL service");
+
+ if (flightServer != null) {
+ try {
+ flightServer.shutdown();
+ flightServer.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Interrupted while waiting for Flight server shutdown", e);
+ Thread.currentThread().interrupt();
+ try {
+ flightServer.close();
+ } catch (Exception ex) {
+ LOGGER.warn("Error force-closing Flight server", ex);
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Error shutting down Flight server", e);
+ }
+ flightServer = null;
+ }
+
+ if (producer != null) {
+ try {
+ producer.close();
+ } catch (Exception e) {
+ LOGGER.warn("Error closing Flight SQL producer", e);
+ }
+ producer = null;
+ }
+
+ if (flightSessionManager != null) {
+ flightSessionManager.close();
+ flightSessionManager = null;
+ }
+
+ if (allocator != null) {
+ allocator.close();
+ allocator = null;
+ }
+
+ LOGGER.info("Arrow Flight SQL service stopped");
+ }
+}
diff --git
a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlSessionManager.java
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlSessionManager.java
new file mode 100644
index 00000000000..ad829888f22
--- /dev/null
+++
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlSessionManager.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flight;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.InternalClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import org.apache.arrow.flight.CallHeaders;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Manages Arrow Flight SQL client sessions using Bearer token authentication.
Maps Bearer tokens to
+ * IoTDB IClientSession objects with a TTL-based Caffeine cache.
+ */
+public class FlightSqlSessionManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FlightSqlSessionManager.class);
+ private static final String AUTHORIZATION_HEADER = "authorization";
+ private static final String BEARER_PREFIX = "Bearer ";
+
+ private final SessionManager sessionManager = SessionManager.getInstance();
+
+ /** Cache of Bearer token -> IClientSession with configurable TTL. */
+ private final Cache<String, IClientSession> tokenCache;
+
+ public FlightSqlSessionManager(long sessionTimeoutMinutes) {
+ this.tokenCache =
+ Caffeine.newBuilder()
+ .expireAfterAccess(sessionTimeoutMinutes, TimeUnit.MINUTES)
+ .removalListener(
+ (String token, IClientSession session, RemovalCause cause) -> {
+ if (session != null && cause != RemovalCause.REPLACED) {
+ LOGGER.info("Flight SQL session expired, closing: {}",
session);
+ sessionManager.closeSession(
+ session,
+ queryId ->
+
org.apache.iotdb.db.queryengine.plan.Coordinator.getInstance()
+ .cleanupQueryExecution(queryId),
+ false);
+ sessionManager.removeCurrSessionForMqtt(null); // handled
via sessions map only
+ }
+ })
+ .build();
+ }
+
+ /**
+ * Authenticates a user with username/password and returns a Bearer token.
+ *
+ * @param username the username
+ * @param password the password
+ * @param clientAddress the client's IP address
+ * @return the Bearer token if authentication succeeds
+ * @throws SecurityException if authentication fails
+ */
+ public String authenticate(String username, String password, String
clientAddress) {
+ // Create a session for this client
+ IClientSession session = new InternalClientSession("FlightSQL-" +
clientAddress);
+ session.setSqlDialect(IClientSession.SqlDialect.TABLE);
+
+ // Register the session before login (MQTT pattern)
+ sessionManager.registerSessionForMqtt(session);
+
+ // Use SessionManager's login method
+ org.apache.iotdb.db.protocol.basic.BasicOpenSessionResp loginResp =
+ sessionManager.login(
+ session,
+ username,
+ password,
+ java.time.ZoneId.systemDefault().getId(),
+ SessionManager.CURRENT_RPC_VERSION,
+ IoTDBConstant.ClientVersion.V_1_0,
+ IClientSession.SqlDialect.TABLE);
+
+ if (loginResp.getCode() !=
org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // Remove the session if login failed
+ sessionManager.removeCurrSessionForMqtt(null);
+ throw new SecurityException("Authentication failed: " +
loginResp.getMessage());
+ }
+
+ // Generate Bearer token and store in cache
+ String token = UUID.randomUUID().toString();
+ tokenCache.put(token, session);
+ LOGGER.info("Flight SQL user '{}' authenticated, session: {}", username,
session);
+ return token;
+ }
+
+ /**
+ * Retrieves the IClientSession associated with a Bearer token from request
headers.
+ *
+ * @param headers the Call headers containing the Authorization header
+ * @return the associated IClientSession
+ * @throws SecurityException if the token is invalid or expired
+ */
+ public IClientSession getSession(CallHeaders headers) {
+ String authHeader = headers.get(AUTHORIZATION_HEADER);
+ if (authHeader == null || !authHeader.startsWith(BEARER_PREFIX)) {
+ throw new SecurityException("Missing or invalid Authorization header");
+ }
+ String token = authHeader.substring(BEARER_PREFIX.length());
+ return getSessionByToken(token);
+ }
+
+ /**
+ * Retrieves the IClientSession associated with a Bearer token.
+ *
+ * @param token the Bearer token
+ * @return the associated IClientSession
+ * @throws SecurityException if the token is invalid or expired
+ */
+ public IClientSession getSessionByToken(String token) {
+ IClientSession session = tokenCache.getIfPresent(token);
+ if (session == null) {
+ throw new SecurityException("Invalid or expired Bearer token");
+ }
+ return session;
+ }
+
+ /** Invalidates all sessions and cleans up resources. */
+ public void close() {
+ tokenCache.invalidateAll();
+ tokenCache.cleanUp();
+ }
+}
diff --git
a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java
new file mode 100644
index 00000000000..067ccfb20d4
--- /dev/null
+++
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flight;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.protobuf.ByteString;
+import org.apache.arrow.flight.CallStatus;
+import org.apache.arrow.flight.Criteria;
+import org.apache.arrow.flight.FlightDescriptor;
+import org.apache.arrow.flight.FlightEndpoint;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.PutResult;
+import org.apache.arrow.flight.Result;
+import org.apache.arrow.flight.SchemaResult;
+import org.apache.arrow.flight.Ticket;
+import org.apache.arrow.flight.sql.FlightSqlProducer;
+import org.apache.arrow.flight.sql.impl.FlightSql;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Apache Arrow Flight SQL producer implementation for IoTDB. Handles SQL
query execution via the
+ * Arrow Flight SQL protocol, using the Table model SQL dialect.
+ */
+public class IoTDBFlightSqlProducer implements FlightSqlProducer {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBFlightSqlProducer.class);
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+
+ private final BufferAllocator allocator;
+ private final FlightSqlSessionManager flightSessionManager;
+ private final Coordinator coordinator = Coordinator.getInstance();
+ private final SessionManager sessionManager = SessionManager.getInstance();
+ private final SqlParser sqlParser = new SqlParser();
+ private final Metadata metadata =
LocalExecutionPlanner.getInstance().metadata;
+
+ /** Stores query execution context by queryId for streaming results via
getStream. */
+ private final ConcurrentHashMap<Long, QueryContext> activeQueries = new
ConcurrentHashMap<>();
+
+ public IoTDBFlightSqlProducer(
+ BufferAllocator allocator, FlightSqlSessionManager flightSessionManager)
{
+ this.allocator = allocator;
+ this.flightSessionManager = flightSessionManager;
+ }
+
+ // ===================== Session Retrieval =====================
+
+ /**
+ * Retrieves the IClientSession for the current call. The auth2 framework
sets the peer identity
+ * to the Bearer token after handshake.
+ */
+ private IClientSession getSessionFromContext(CallContext context) {
+ String peerIdentity = context.peerIdentity();
+ if (peerIdentity == null || peerIdentity.isEmpty()) {
+ throw CallStatus.UNAUTHENTICATED.withDescription("Not
authenticated").toRuntimeException();
+ }
+ try {
+ return flightSessionManager.getSessionByToken(peerIdentity);
+ } catch (SecurityException e) {
+ throw
CallStatus.UNAUTHENTICATED.withDescription(e.getMessage()).toRuntimeException();
+ }
+ }
+
+ // ===================== SQL Query Execution =====================
+
+ @Override
+ public FlightInfo getFlightInfoStatement(
+ FlightSql.CommandStatementQuery command, CallContext context,
FlightDescriptor descriptor) {
+ String sql = command.getQuery();
+ LOGGER.debug("getFlightInfoStatement: {}", sql);
+
+ IClientSession session = getSessionFromContext(context);
+
+ Long queryId = null;
+ try {
+ queryId = sessionManager.requestQueryId();
+
+ // Parse the SQL statement
+ Statement statement = sqlParser.createStatement(sql,
ZoneId.systemDefault(), session);
+
+ // Execute via Coordinator (Table model)
+ ExecutionResult result =
+ coordinator.executeForTableModel(
+ statement,
+ sqlParser,
+ session,
+ queryId,
+ sessionManager.getSessionInfo(session),
+ sql,
+ metadata,
+ CONFIG.getQueryTimeoutThreshold(),
+ true);
+
+ if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && result.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ throw CallStatus.INTERNAL
+ .withDescription("Query execution failed: " +
result.status.getMessage())
+ .toRuntimeException();
+ }
+
+ IQueryExecution queryExecution = coordinator.getQueryExecution(queryId);
+ if (queryExecution == null) {
+ throw CallStatus.INTERNAL
+ .withDescription("Query execution not found after execution")
+ .toRuntimeException();
+ }
+
+ DatasetHeader header = queryExecution.getDatasetHeader();
+ Schema arrowSchema = TsBlockToArrowConverter.toArrowSchema(header);
+
+ // Store the query context for later getStream calls
+ activeQueries.put(queryId, new QueryContext(queryExecution, header,
session));
+
+ // Build ticket containing the queryId
+ byte[] ticketBytes =
Long.toString(queryId).getBytes(StandardCharsets.UTF_8);
+ Ticket ticket = new Ticket(ticketBytes);
+ FlightEndpoint endpoint = new FlightEndpoint(ticket);
+
+ return new FlightInfo(arrowSchema, descriptor,
Collections.singletonList(endpoint), -1, -1);
+
+ } catch (RuntimeException e) {
+ // Cleanup on error
+ if (queryId != null) {
+ coordinator.cleanupQueryExecution(queryId);
+ activeQueries.remove(queryId);
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public SchemaResult getSchemaStatement(
+ FlightSql.CommandStatementQuery command, CallContext context,
FlightDescriptor descriptor) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("getSchemaStatement not implemented")
+ .toRuntimeException();
+ }
+
+ @Override
+ public void getStreamStatement(
+ FlightSql.TicketStatementQuery ticketQuery,
+ CallContext context,
+ ServerStreamListener listener) {
+ ByteString handle = ticketQuery.getStatementHandle();
+ long queryId = Long.parseLong(handle.toStringUtf8());
+ streamQueryResults(queryId, listener);
+ }
+
+ /** Streams query results for a given queryId as Arrow VectorSchemaRoot
batches. */
+ private void streamQueryResults(long queryId, ServerStreamListener listener)
{
+ QueryContext ctx = activeQueries.get(queryId);
+ if (ctx == null) {
+ listener.error(
+ CallStatus.NOT_FOUND
+ .withDescription("Query not found for id: " + queryId)
+ .toRuntimeException());
+ return;
+ }
+
+ VectorSchemaRoot root = null;
+ try {
+ root = TsBlockToArrowConverter.createVectorSchemaRoot(ctx.header,
allocator);
+ listener.start(root);
+
+ while (true) {
+ Optional<TsBlock> optionalTsBlock =
ctx.queryExecution.getBatchResult();
+ if (!optionalTsBlock.isPresent() || optionalTsBlock.get().isEmpty()) {
+ break;
+ }
+
+ TsBlock tsBlock = optionalTsBlock.get();
+ root.clear();
+ TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock,
ctx.header);
+ listener.putNext();
+ }
+
+ listener.completed();
+ } catch (IoTDBException e) {
+ LOGGER.error("Error streaming query results for queryId={}", queryId, e);
+
listener.error(CallStatus.INTERNAL.withDescription(e.getMessage()).toRuntimeException());
+ } finally {
+ coordinator.cleanupQueryExecution(queryId);
+ activeQueries.remove(queryId);
+ if (root != null) {
+ root.close();
+ }
+ }
+ }
+
+ // ===================== Generic Flight Methods =====================
+
+ @Override
+ public void listFlights(
+ CallContext context, Criteria criteria, StreamListener<FlightInfo>
listener) {
+ listener.onCompleted();
+ }
+
+ // ===================== Prepared Statements =====================
+
+ @Override
+ public void createPreparedStatement(
+ FlightSql.ActionCreatePreparedStatementRequest request,
+ CallContext context,
+ StreamListener<Result> listener) {
+ listener.onError(
+ CallStatus.UNIMPLEMENTED
+ .withDescription("Prepared statements are not yet supported")
+ .toRuntimeException());
+ }
+
+ @Override
+ public void closePreparedStatement(
+ FlightSql.ActionClosePreparedStatementRequest request,
+ CallContext context,
+ StreamListener<Result> listener) {
+ listener.onError(
+ CallStatus.UNIMPLEMENTED
+ .withDescription("Prepared statements are not yet supported")
+ .toRuntimeException());
+ }
+
+ @Override
+ public FlightInfo getFlightInfoPreparedStatement(
+ FlightSql.CommandPreparedStatementQuery command,
+ CallContext context,
+ FlightDescriptor descriptor) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("Prepared statements are not yet supported")
+ .toRuntimeException();
+ }
+
+ @Override
+ public SchemaResult getSchemaPreparedStatement(
+ FlightSql.CommandPreparedStatementQuery command,
+ CallContext context,
+ FlightDescriptor descriptor) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("Prepared statements are not yet supported")
+ .toRuntimeException();
+ }
+
+ @Override
+ public void getStreamPreparedStatement(
+ FlightSql.CommandPreparedStatementQuery command,
+ CallContext context,
+ ServerStreamListener listener) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("Prepared statements are not yet supported")
+ .toRuntimeException();
+ }
+
+ // ===================== DML/Update =====================
+
+ @Override
+ public Runnable acceptPutStatement(
+ FlightSql.CommandStatementUpdate command,
+ CallContext context,
+ FlightStream flightStream,
+ StreamListener<PutResult> ackStream) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("Statement updates are not yet supported")
+ .toRuntimeException();
+ }
+
+ @Override
+ public Runnable acceptPutPreparedStatementUpdate(
+ FlightSql.CommandPreparedStatementUpdate command,
+ CallContext context,
+ FlightStream flightStream,
+ StreamListener<PutResult> ackStream) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("Prepared statements are not yet supported")
+ .toRuntimeException();
+ }
+
+ @Override
+ public Runnable acceptPutPreparedStatementQuery(
+ FlightSql.CommandPreparedStatementQuery command,
+ CallContext context,
+ FlightStream flightStream,
+ StreamListener<PutResult> ackStream) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("Prepared statements are not yet supported")
+ .toRuntimeException();
+ }
+
+ // ===================== Catalog/Schema/Table Metadata =====================
+
+ @Override
+ public FlightInfo getFlightInfoSqlInfo(
+ FlightSql.CommandGetSqlInfo command, CallContext context,
FlightDescriptor descriptor) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("GetSqlInfo not yet implemented")
+ .toRuntimeException();
+ }
+
+ @Override
+ public void getStreamSqlInfo(
+ FlightSql.CommandGetSqlInfo command, CallContext context,
ServerStreamListener listener) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("GetSqlInfo not yet implemented")
+ .toRuntimeException();
+ }
+
+ @Override
+ public FlightInfo getFlightInfoTypeInfo(
+ FlightSql.CommandGetXdbcTypeInfo command, CallContext context,
FlightDescriptor descriptor) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("GetTypeInfo not yet implemented")
+ .toRuntimeException();
+ }
+
+ @Override
+ public void getStreamTypeInfo(
+ FlightSql.CommandGetXdbcTypeInfo command,
+ CallContext context,
+ ServerStreamListener listener) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("GetTypeInfo not yet implemented")
+ .toRuntimeException();
+ }
+
+ @Override
+ public FlightInfo getFlightInfoCatalogs(
+ FlightSql.CommandGetCatalogs command, CallContext context,
FlightDescriptor descriptor) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("GetCatalogs not yet implemented")
+ .toRuntimeException();
+ }
+
+ @Override
+ public void getStreamCatalogs(CallContext context, ServerStreamListener
listener) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("GetCatalogs not yet implemented")
+ .toRuntimeException();
+ }
+
+ @Override
+ public FlightInfo getFlightInfoSchemas(
+ FlightSql.CommandGetDbSchemas command, CallContext context,
FlightDescriptor descriptor) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("GetSchemas not yet implemented")
+ .toRuntimeException();
+ }
+
+ @Override
+ public void getStreamSchemas(
+ FlightSql.CommandGetDbSchemas command, CallContext context,
ServerStreamListener listener) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("GetSchemas not yet implemented")
+ .toRuntimeException();
+ }
+
+ @Override
+ public FlightInfo getFlightInfoTables(
+ FlightSql.CommandGetTables command, CallContext context,
FlightDescriptor descriptor) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("GetTables not yet implemented")
+ .toRuntimeException();
+ }
+
+ @Override
+ public void getStreamTables(
+ FlightSql.CommandGetTables command, CallContext context,
ServerStreamListener listener) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("GetTables not yet implemented")
+ .toRuntimeException();
+ }
+
+ @Override
+ public FlightInfo getFlightInfoTableTypes(
+ FlightSql.CommandGetTableTypes command, CallContext context,
FlightDescriptor descriptor) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("GetTableTypes not yet implemented")
+ .toRuntimeException();
+ }
+
+ @Override
+ public void getStreamTableTypes(CallContext context, ServerStreamListener
listener) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("GetTableTypes not yet implemented")
+ .toRuntimeException();
+ }
+
+ // ===================== Keys =====================
+
+ @Override
+ public FlightInfo getFlightInfoPrimaryKeys(
+ FlightSql.CommandGetPrimaryKeys command, CallContext context,
FlightDescriptor descriptor) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("GetPrimaryKeys not yet implemented")
+ .toRuntimeException();
+ }
+
+ @Override
+ public void getStreamPrimaryKeys(
+ FlightSql.CommandGetPrimaryKeys command, CallContext context,
ServerStreamListener listener) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("GetPrimaryKeys not yet implemented")
+ .toRuntimeException();
+ }
+
+ @Override
+ public FlightInfo getFlightInfoExportedKeys(
+ FlightSql.CommandGetExportedKeys command, CallContext context,
FlightDescriptor descriptor) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("GetExportedKeys not yet implemented")
+ .toRuntimeException();
+ }
+
+ @Override
+ public void getStreamExportedKeys(
+ FlightSql.CommandGetExportedKeys command,
+ CallContext context,
+ ServerStreamListener listener) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("GetExportedKeys not yet implemented")
+ .toRuntimeException();
+ }
+
+ @Override
+ public FlightInfo getFlightInfoImportedKeys(
+ FlightSql.CommandGetImportedKeys command, CallContext context,
FlightDescriptor descriptor) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("GetImportedKeys not yet implemented")
+ .toRuntimeException();
+ }
+
+ @Override
+ public void getStreamImportedKeys(
+ FlightSql.CommandGetImportedKeys command,
+ CallContext context,
+ ServerStreamListener listener) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("GetImportedKeys not yet implemented")
+ .toRuntimeException();
+ }
+
+ @Override
+ public FlightInfo getFlightInfoCrossReference(
+ FlightSql.CommandGetCrossReference command,
+ CallContext context,
+ FlightDescriptor descriptor) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("GetCrossReference not yet implemented")
+ .toRuntimeException();
+ }
+
+ @Override
+ public void getStreamCrossReference(
+ FlightSql.CommandGetCrossReference command,
+ CallContext context,
+ ServerStreamListener listener) {
+ throw CallStatus.UNIMPLEMENTED
+ .withDescription("GetCrossReference not yet implemented")
+ .toRuntimeException();
+ }
+
+ // ===================== Lifecycle =====================
+
+ @Override
+ public void close() throws Exception {
+ // Clean up all active queries
+ for (Long queryId : activeQueries.keySet()) {
+ try {
+ coordinator.cleanupQueryExecution(queryId);
+ } catch (Exception e) {
+ LOGGER.warn("Error cleaning up query {} during shutdown", queryId, e);
+ }
+ }
+ activeQueries.clear();
+ }
+
+ // ===================== Inner Classes =====================
+
+ /** Holds query execution context for streaming results. */
+ private static class QueryContext {
+
+ final IQueryExecution queryExecution;
+ final DatasetHeader header;
+ final IClientSession session;
+
+ QueryContext(IQueryExecution queryExecution, DatasetHeader header,
IClientSession session) {
+ this.queryExecution = queryExecution;
+ this.header = header;
+ this.session = session;
+ }
+ }
+}
diff --git
a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/TsBlockToArrowConverter.java
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/TsBlockToArrowConverter.java
new file mode 100644
index 00000000000..396f230d0b9
--- /dev/null
+++
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/TsBlockToArrowConverter.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flight;
+
+import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeStampMilliTZVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** Converts IoTDB TsBlock data to Apache Arrow VectorSchemaRoot format. */
+public class TsBlockToArrowConverter {
+
+ private TsBlockToArrowConverter() {
+ // utility class
+ }
+
+ /** Maps IoTDB TSDataType to Apache Arrow ArrowType. */
+ public static ArrowType toArrowType(TSDataType tsDataType) {
+ switch (tsDataType) {
+ case BOOLEAN:
+ return ArrowType.Bool.INSTANCE;
+ case INT32:
+ return new ArrowType.Int(32, true);
+ case INT64:
+ return new ArrowType.Int(64, true);
+ case FLOAT:
+ return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
+ case DOUBLE:
+ return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
+ case TEXT:
+ case STRING:
+ return ArrowType.Utf8.INSTANCE;
+ case BLOB:
+ return ArrowType.Binary.INSTANCE;
+ case TIMESTAMP:
+ return new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC");
+ case DATE:
+ return new ArrowType.Date(DateUnit.DAY);
+ default:
+ throw new IllegalArgumentException("Unsupported TSDataType: " +
tsDataType);
+ }
+ }
+
+ /** Builds an Arrow Schema from an IoTDB DatasetHeader. */
+ public static Schema toArrowSchema(DatasetHeader header) {
+ List<String> columnNames = header.getRespColumns();
+ List<TSDataType> dataTypes = header.getRespDataTypes();
+ List<Field> fields = new ArrayList<>(columnNames.size());
+ for (int i = 0; i < columnNames.size(); i++) {
+ ArrowType arrowType = toArrowType(dataTypes.get(i));
+ fields.add(new Field(columnNames.get(i), new FieldType(true, arrowType,
null), null));
+ }
+ return new Schema(fields);
+ }
+
+ /** Creates a new VectorSchemaRoot from an IoTDB DatasetHeader. */
+ public static VectorSchemaRoot createVectorSchemaRoot(
+ DatasetHeader header, BufferAllocator allocator) {
+ Schema arrowSchema = toArrowSchema(header);
+ return VectorSchemaRoot.create(arrowSchema, allocator);
+ }
+
+ /**
+ * Fills an existing VectorSchemaRoot with data from a TsBlock.
+ *
+ * @param root the VectorSchemaRoot to fill (cleared before filling)
+ * @param tsBlock the TsBlock containing the data
+ * @param header the DatasetHeader for column name to index mapping
+ */
+ public static void fillVectorSchemaRoot(
+ VectorSchemaRoot root, TsBlock tsBlock, DatasetHeader header) {
+
+ int rowCount = tsBlock.getPositionCount();
+ List<String> columnNames = header.getRespColumns();
+ List<TSDataType> dataTypes = header.getRespDataTypes();
+ Map<String, Integer> headerMap = header.getColumnNameIndexMap();
+
+ root.allocateNew();
+
+ for (int colIdx = 0; colIdx < columnNames.size(); colIdx++) {
+ String colName = columnNames.get(colIdx);
+ Integer sourceIdx = headerMap.get(colName);
+ Column column = tsBlock.getColumn(sourceIdx);
+ TSDataType dataType = dataTypes.get(colIdx);
+ FieldVector fieldVector = root.getVector(colIdx);
+
+ fillColumnVector(fieldVector, column, dataType, rowCount);
+ }
+
+ root.setRowCount(rowCount);
+ }
+
+ /** Fills an Arrow FieldVector from an IoTDB Column. */
+ private static void fillColumnVector(
+ FieldVector fieldVector, Column column, TSDataType dataType, int
rowCount) {
+ switch (dataType) {
+ case BOOLEAN:
+ fillBooleanVector((BitVector) fieldVector, column, rowCount);
+ break;
+ case INT32:
+ fillInt32Vector((IntVector) fieldVector, column, rowCount);
+ break;
+ case INT64:
+ fillInt64Vector((BigIntVector) fieldVector, column, rowCount);
+ break;
+ case FLOAT:
+ fillFloatVector((Float4Vector) fieldVector, column, rowCount);
+ break;
+ case DOUBLE:
+ fillDoubleVector((Float8Vector) fieldVector, column, rowCount);
+ break;
+ case TEXT:
+ case STRING:
+ fillTextVector((VarCharVector) fieldVector, column, rowCount);
+ break;
+ case BLOB:
+ fillBlobVector((VarBinaryVector) fieldVector, column, rowCount);
+ break;
+ case TIMESTAMP:
+ fillTimestampVector((TimeStampMilliTZVector) fieldVector, column,
rowCount);
+ break;
+ case DATE:
+ fillDateVector((DateDayVector) fieldVector, column, rowCount);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported TSDataType: " +
dataType);
+ }
+ }
+
+ private static void fillBooleanVector(BitVector vector, Column column, int
rowCount) {
+ for (int i = 0; i < rowCount; i++) {
+ if (column.isNull(i)) {
+ vector.setNull(i);
+ } else {
+ vector.setSafe(i, column.getBoolean(i) ? 1 : 0);
+ }
+ }
+ vector.setValueCount(rowCount);
+ }
+
+ private static void fillInt32Vector(IntVector vector, Column column, int
rowCount) {
+ for (int i = 0; i < rowCount; i++) {
+ if (column.isNull(i)) {
+ vector.setNull(i);
+ } else {
+ vector.setSafe(i, column.getInt(i));
+ }
+ }
+ vector.setValueCount(rowCount);
+ }
+
+ private static void fillInt64Vector(BigIntVector vector, Column column, int
rowCount) {
+ for (int i = 0; i < rowCount; i++) {
+ if (column.isNull(i)) {
+ vector.setNull(i);
+ } else {
+ vector.setSafe(i, column.getLong(i));
+ }
+ }
+ vector.setValueCount(rowCount);
+ }
+
+ private static void fillFloatVector(Float4Vector vector, Column column, int
rowCount) {
+ for (int i = 0; i < rowCount; i++) {
+ if (column.isNull(i)) {
+ vector.setNull(i);
+ } else {
+ vector.setSafe(i, column.getFloat(i));
+ }
+ }
+ vector.setValueCount(rowCount);
+ }
+
+ private static void fillDoubleVector(Float8Vector vector, Column column, int
rowCount) {
+ for (int i = 0; i < rowCount; i++) {
+ if (column.isNull(i)) {
+ vector.setNull(i);
+ } else {
+ vector.setSafe(i, column.getDouble(i));
+ }
+ }
+ vector.setValueCount(rowCount);
+ }
+
+ private static void fillTextVector(VarCharVector vector, Column column, int
rowCount) {
+ for (int i = 0; i < rowCount; i++) {
+ if (column.isNull(i)) {
+ vector.setNull(i);
+ } else {
+ byte[] bytes =
+ column
+ .getBinary(i)
+ .getStringValue(TSFileConfig.STRING_CHARSET)
+ .getBytes(StandardCharsets.UTF_8);
+ vector.setSafe(i, bytes);
+ }
+ }
+ vector.setValueCount(rowCount);
+ }
+
+ private static void fillBlobVector(VarBinaryVector vector, Column column,
int rowCount) {
+ for (int i = 0; i < rowCount; i++) {
+ if (column.isNull(i)) {
+ vector.setNull(i);
+ } else {
+ vector.setSafe(i, column.getBinary(i).getValues());
+ }
+ }
+ vector.setValueCount(rowCount);
+ }
+
+ private static void fillTimestampVector(
+ TimeStampMilliTZVector vector, Column column, int rowCount) {
+ for (int i = 0; i < rowCount; i++) {
+ if (column.isNull(i)) {
+ vector.setNull(i);
+ } else {
+ vector.setSafe(i, column.getLong(i));
+ }
+ }
+ vector.setValueCount(rowCount);
+ }
+
+ private static void fillDateVector(DateDayVector vector, Column column, int
rowCount) {
+ for (int i = 0; i < rowCount; i++) {
+ if (column.isNull(i)) {
+ vector.setNull(i);
+ } else {
+ vector.setSafe(i, column.getInt(i));
+ }
+ }
+ vector.setValueCount(rowCount);
+ }
+}
diff --git
a/external-service-impl/flight-sql/src/test/java/org/apache/iotdb/flight/TsBlockToArrowConverterTest.java
b/external-service-impl/flight-sql/src/test/java/org/apache/iotdb/flight/TsBlockToArrowConverterTest.java
new file mode 100644
index 00000000000..413e17e23b7
--- /dev/null
+++
b/external-service-impl/flight-sql/src/test/java/org/apache/iotdb/flight/TsBlockToArrowConverterTest.java
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flight;
+
+import org.apache.iotdb.commons.schema.column.ColumnHeader;
+import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeStampMilliTZVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.read.common.block.column.TimeColumn;
+import org.apache.tsfile.utils.Binary;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/** Unit tests for TsBlockToArrowConverter. */
+public class TsBlockToArrowConverterTest {
+
+ private BufferAllocator allocator;
+
+ @Before
+ public void setUp() {
+ allocator = new RootAllocator(Long.MAX_VALUE);
+ }
+
+ @After
+ public void tearDown() {
+ allocator.close();
+ }
+
+ // ===================== toArrowType Tests =====================
+
+ @Test
+ public void testToArrowTypeMappings() {
+ assertTrue(TsBlockToArrowConverter.toArrowType(TSDataType.BOOLEAN)
instanceof ArrowType.Bool);
+
+ ArrowType int32Type =
TsBlockToArrowConverter.toArrowType(TSDataType.INT32);
+ assertTrue(int32Type instanceof ArrowType.Int);
+ assertEquals(32, ((ArrowType.Int) int32Type).getBitWidth());
+ assertTrue(((ArrowType.Int) int32Type).getIsSigned());
+
+ ArrowType int64Type =
TsBlockToArrowConverter.toArrowType(TSDataType.INT64);
+ assertTrue(int64Type instanceof ArrowType.Int);
+ assertEquals(64, ((ArrowType.Int) int64Type).getBitWidth());
+ assertTrue(((ArrowType.Int) int64Type).getIsSigned());
+
+ ArrowType floatType =
TsBlockToArrowConverter.toArrowType(TSDataType.FLOAT);
+ assertTrue(floatType instanceof ArrowType.FloatingPoint);
+ assertEquals(
+ FloatingPointPrecision.SINGLE, ((ArrowType.FloatingPoint)
floatType).getPrecision());
+
+ ArrowType doubleType =
TsBlockToArrowConverter.toArrowType(TSDataType.DOUBLE);
+ assertTrue(doubleType instanceof ArrowType.FloatingPoint);
+ assertEquals(
+ FloatingPointPrecision.DOUBLE, ((ArrowType.FloatingPoint)
doubleType).getPrecision());
+
+ assertTrue(TsBlockToArrowConverter.toArrowType(TSDataType.TEXT) instanceof
ArrowType.Utf8);
+ assertTrue(TsBlockToArrowConverter.toArrowType(TSDataType.STRING)
instanceof ArrowType.Utf8);
+ assertTrue(TsBlockToArrowConverter.toArrowType(TSDataType.BLOB) instanceof
ArrowType.Binary);
+
+ ArrowType tsType =
TsBlockToArrowConverter.toArrowType(TSDataType.TIMESTAMP);
+ assertTrue(tsType instanceof ArrowType.Timestamp);
+ assertEquals(TimeUnit.MILLISECOND, ((ArrowType.Timestamp)
tsType).getUnit());
+ assertEquals("UTC", ((ArrowType.Timestamp) tsType).getTimezone());
+
+ ArrowType dateType = TsBlockToArrowConverter.toArrowType(TSDataType.DATE);
+ assertTrue(dateType instanceof ArrowType.Date);
+ assertEquals(DateUnit.DAY, ((ArrowType.Date) dateType).getUnit());
+ }
+
+ // ===================== toArrowSchema Tests =====================
+
+ @Test
+ public void testToArrowSchema() {
+ DatasetHeader header = buildHeader("col_bool", TSDataType.BOOLEAN,
"col_int", TSDataType.INT32);
+ Schema schema = TsBlockToArrowConverter.toArrowSchema(header);
+
+ assertEquals(2, schema.getFields().size());
+ assertEquals("col_bool", schema.getFields().get(0).getName());
+ assertTrue(schema.getFields().get(0).getType() instanceof ArrowType.Bool);
+ assertEquals("col_int", schema.getFields().get(1).getName());
+ assertTrue(schema.getFields().get(1).getType() instanceof ArrowType.Int);
+ }
+
+ // ===================== INT32 Conversion =====================
+
+ @Test
+ public void testFillInt32Values() {
+ DatasetHeader header = buildHeader("value", TSDataType.INT32);
+ TsBlock tsBlock = buildTsBlock(TSDataType.INT32, new Object[] {10, 20,
30});
+
+ try (VectorSchemaRoot root =
+ TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+ TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+ assertEquals(3, root.getRowCount());
+ IntVector vector = (IntVector) root.getVector(0);
+ assertEquals(10, vector.get(0));
+ assertEquals(20, vector.get(1));
+ assertEquals(30, vector.get(2));
+ }
+ }
+
+ @Test
+ public void testFillInt32WithNulls() {
+ DatasetHeader header = buildHeader("value", TSDataType.INT32);
+ TsBlock tsBlock = buildTsBlock(TSDataType.INT32, new Object[] {10, null,
30});
+
+ try (VectorSchemaRoot root =
+ TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+ TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+ assertEquals(3, root.getRowCount());
+ IntVector vector = (IntVector) root.getVector(0);
+ assertEquals(10, vector.get(0));
+ assertTrue(vector.isNull(1));
+ assertEquals(30, vector.get(2));
+ }
+ }
+
+ // ===================== INT64 Conversion =====================
+
+ @Test
+ public void testFillInt64Values() {
+ DatasetHeader header = buildHeader("value", TSDataType.INT64);
+ TsBlock tsBlock = buildTsBlock(TSDataType.INT64, new Object[] {100L, 200L,
300L});
+
+ try (VectorSchemaRoot root =
+ TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+ TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+ assertEquals(3, root.getRowCount());
+ BigIntVector vector = (BigIntVector) root.getVector(0);
+ assertEquals(100L, vector.get(0));
+ assertEquals(200L, vector.get(1));
+ assertEquals(300L, vector.get(2));
+ }
+ }
+
+ // ===================== FLOAT Conversion =====================
+
+ @Test
+ public void testFillFloatValues() {
+ DatasetHeader header = buildHeader("value", TSDataType.FLOAT);
+ TsBlock tsBlock = buildTsBlock(TSDataType.FLOAT, new Object[] {1.1f, 2.2f,
3.3f});
+
+ try (VectorSchemaRoot root =
+ TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+ TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+ assertEquals(3, root.getRowCount());
+ Float4Vector vector = (Float4Vector) root.getVector(0);
+ assertEquals(1.1f, vector.get(0), 0.001f);
+ assertEquals(2.2f, vector.get(1), 0.001f);
+ assertEquals(3.3f, vector.get(2), 0.001f);
+ }
+ }
+
+ // ===================== DOUBLE Conversion =====================
+
+ @Test
+ public void testFillDoubleValues() {
+ DatasetHeader header = buildHeader("value", TSDataType.DOUBLE);
+ TsBlock tsBlock = buildTsBlock(TSDataType.DOUBLE, new Object[] {1.11,
2.22, 3.33});
+
+ try (VectorSchemaRoot root =
+ TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+ TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+ assertEquals(3, root.getRowCount());
+ Float8Vector vector = (Float8Vector) root.getVector(0);
+ assertEquals(1.11, vector.get(0), 0.0001);
+ assertEquals(2.22, vector.get(1), 0.0001);
+ assertEquals(3.33, vector.get(2), 0.0001);
+ }
+ }
+
+ // ===================== BOOLEAN Conversion =====================
+
+ @Test
+ public void testFillBooleanValues() {
+ DatasetHeader header = buildHeader("value", TSDataType.BOOLEAN);
+ TsBlock tsBlock = buildTsBlock(TSDataType.BOOLEAN, new Object[] {true,
false, true});
+
+ try (VectorSchemaRoot root =
+ TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+ TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+ assertEquals(3, root.getRowCount());
+ BitVector vector = (BitVector) root.getVector(0);
+ assertEquals(1, vector.get(0));
+ assertEquals(0, vector.get(1));
+ assertEquals(1, vector.get(2));
+ }
+ }
+
+ @Test
+ public void testFillBooleanWithNulls() {
+ DatasetHeader header = buildHeader("value", TSDataType.BOOLEAN);
+ TsBlock tsBlock = buildTsBlock(TSDataType.BOOLEAN, new Object[] {true,
null, false});
+
+ try (VectorSchemaRoot root =
+ TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+ TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+ assertEquals(3, root.getRowCount());
+ BitVector vector = (BitVector) root.getVector(0);
+ assertEquals(1, vector.get(0));
+ assertTrue(vector.isNull(1));
+ assertEquals(0, vector.get(2));
+ }
+ }
+
+ // ===================== TEXT/STRING Conversion =====================
+
+ @Test
+ public void testFillTextValues() {
+ DatasetHeader header = buildHeader("value", TSDataType.TEXT);
+ TsBlock tsBlock =
+ buildTsBlock(
+ TSDataType.TEXT,
+ new Object[] {
+ new Binary("hello", StandardCharsets.UTF_8),
+ new Binary("world", StandardCharsets.UTF_8),
+ new Binary("IoTDB", StandardCharsets.UTF_8)
+ });
+
+ try (VectorSchemaRoot root =
+ TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+ TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+ assertEquals(3, root.getRowCount());
+ VarCharVector vector = (VarCharVector) root.getVector(0);
+ assertEquals("hello", new String(vector.get(0), StandardCharsets.UTF_8));
+ assertEquals("world", new String(vector.get(1), StandardCharsets.UTF_8));
+ assertEquals("IoTDB", new String(vector.get(2), StandardCharsets.UTF_8));
+ }
+ }
+
+ @Test
+ public void testFillTextWithNulls() {
+ DatasetHeader header = buildHeader("value", TSDataType.TEXT);
+ TsBlock tsBlock =
+ buildTsBlock(
+ TSDataType.TEXT,
+ new Object[] {new Binary("hello", StandardCharsets.UTF_8), null,
null});
+
+ try (VectorSchemaRoot root =
+ TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+ TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+ assertEquals(3, root.getRowCount());
+ VarCharVector vector = (VarCharVector) root.getVector(0);
+ assertEquals("hello", new String(vector.get(0), StandardCharsets.UTF_8));
+ assertTrue(vector.isNull(1));
+ assertTrue(vector.isNull(2));
+ }
+ }
+
+ // ===================== BLOB Conversion =====================
+
+ @Test
+ public void testFillBlobValues() {
+ DatasetHeader header = buildHeader("value", TSDataType.BLOB);
+ byte[] bytes1 = {0x01, 0x02, 0x03};
+ byte[] bytes2 = {0x04, 0x05};
+ TsBlock tsBlock =
+ buildTsBlock(TSDataType.BLOB, new Object[] {new Binary(bytes1), new
Binary(bytes2)});
+
+ try (VectorSchemaRoot root =
+ TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+ TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+ assertEquals(2, root.getRowCount());
+ VarBinaryVector vector = (VarBinaryVector) root.getVector(0);
+ assertArrayEquals(bytes1, vector.get(0));
+ assertArrayEquals(bytes2, vector.get(1));
+ }
+ }
+
+ // ===================== TIMESTAMP Conversion =====================
+
+ @Test
+ public void testFillTimestampValues() {
+ DatasetHeader header = buildHeader("value", TSDataType.TIMESTAMP);
+ TsBlock tsBlock =
+ buildTsBlock(
+ TSDataType.TIMESTAMP, new Object[] {1609459200000L,
1609459260000L, 1609459320000L});
+
+ try (VectorSchemaRoot root =
+ TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+ TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+ assertEquals(3, root.getRowCount());
+ TimeStampMilliTZVector vector = (TimeStampMilliTZVector)
root.getVector(0);
+ assertEquals(1609459200000L, vector.get(0));
+ assertEquals(1609459260000L, vector.get(1));
+ assertEquals(1609459320000L, vector.get(2));
+ }
+ }
+
+ // ===================== DATE Conversion =====================
+
+ @Test
+ public void testFillDateValues() {
+ DatasetHeader header = buildHeader("value", TSDataType.DATE);
+ // Epoch days: 2021-01-01 = 18628
+ TsBlock tsBlock = buildTsBlock(TSDataType.DATE, new Object[] {18628,
18629, 18630});
+
+ try (VectorSchemaRoot root =
+ TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+ TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+ assertEquals(3, root.getRowCount());
+ DateDayVector vector = (DateDayVector) root.getVector(0);
+ assertEquals(18628, vector.get(0));
+ assertEquals(18629, vector.get(1));
+ assertEquals(18630, vector.get(2));
+ }
+ }
+
+ // ===================== Multi-Column Tests =====================
+
+ @Test
+ public void testFillMultipleColumns() {
+ List<ColumnHeader> headers = new ArrayList<>();
+ headers.add(new ColumnHeader("int_col", TSDataType.INT32));
+ headers.add(new ColumnHeader("double_col", TSDataType.DOUBLE));
+ headers.add(new ColumnHeader("text_col", TSDataType.TEXT));
+ DatasetHeader header = new DatasetHeader(headers, true);
+ List<String> colNames = header.getRespColumns();
+ header.setTreeColumnToTsBlockIndexMap(colNames);
+
+ List<TSDataType> types = Arrays.asList(TSDataType.INT32,
TSDataType.DOUBLE, TSDataType.TEXT);
+ TsBlockBuilder builder = new TsBlockBuilder(types);
+ ColumnBuilder[] cols = builder.getValueColumnBuilders();
+
+ // Row 1
+ cols[0].writeInt(42);
+ cols[1].writeDouble(3.14);
+ cols[2].writeBinary(new Binary("test", StandardCharsets.UTF_8));
+ builder.declarePosition();
+
+ // Row 2
+ cols[0].writeInt(100);
+ cols[1].writeDouble(2.71);
+ cols[2].writeBinary(new Binary("hello", StandardCharsets.UTF_8));
+ builder.declarePosition();
+
+ TsBlock tsBlock =
+ builder.build(
+ new RunLengthEncodedColumn(
+ new TimeColumn(1, new long[] {0L}),
builder.getPositionCount()));
+
+ try (VectorSchemaRoot root =
+ TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+ TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+ assertEquals(2, root.getRowCount());
+
+ IntVector intVec = (IntVector) root.getVector("int_col");
+ assertEquals(42, intVec.get(0));
+ assertEquals(100, intVec.get(1));
+
+ Float8Vector doubleVec = (Float8Vector) root.getVector("double_col");
+ assertEquals(3.14, doubleVec.get(0), 0.001);
+ assertEquals(2.71, doubleVec.get(1), 0.001);
+
+ VarCharVector textVec = (VarCharVector) root.getVector("text_col");
+ assertEquals("test", new String(textVec.get(0), StandardCharsets.UTF_8));
+ assertEquals("hello", new String(textVec.get(1),
StandardCharsets.UTF_8));
+ }
+ }
+
+ // ===================== Empty TsBlock =====================
+
+ @Test
+ public void testFillEmptyTsBlock() {
+ DatasetHeader header = buildHeader("value", TSDataType.INT32);
+ TsBlock tsBlock = buildTsBlock(TSDataType.INT32, new Object[] {});
+
+ try (VectorSchemaRoot root =
+ TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+ TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+ assertEquals(0, root.getRowCount());
+ }
+ }
+
+ // ===================== Helper Methods =====================
+
+ /** Build a simple DatasetHeader with one or more columns (name, type
pairs). */
+ private DatasetHeader buildHeader(Object... nameTypePairs) {
+ List<ColumnHeader> columnHeaders = new ArrayList<>();
+ for (int i = 0; i < nameTypePairs.length; i += 2) {
+ columnHeaders.add(
+ new ColumnHeader((String) nameTypePairs[i], (TSDataType)
nameTypePairs[i + 1]));
+ }
+ DatasetHeader header = new DatasetHeader(columnHeaders, true);
+ List<String> colNames = header.getRespColumns();
+ header.setTreeColumnToTsBlockIndexMap(colNames);
+ return header;
+ }
+
+ /**
+ * Build a single-column TsBlock with the given data type and values. Null
values in the array
+ * produce null entries in the TsBlock.
+ */
+ private TsBlock buildTsBlock(TSDataType type, Object[] values) {
+ TsBlockBuilder builder = new
TsBlockBuilder(java.util.Collections.singletonList(type));
+ ColumnBuilder[] cols = builder.getValueColumnBuilders();
+
+ for (Object value : values) {
+ if (value == null) {
+ cols[0].appendNull();
+ } else {
+ switch (type) {
+ case BOOLEAN:
+ cols[0].writeBoolean((Boolean) value);
+ break;
+ case INT32:
+ case DATE:
+ cols[0].writeInt((Integer) value);
+ break;
+ case INT64:
+ case TIMESTAMP:
+ cols[0].writeLong((Long) value);
+ break;
+ case FLOAT:
+ cols[0].writeFloat((Float) value);
+ break;
+ case DOUBLE:
+ cols[0].writeDouble((Double) value);
+ break;
+ case TEXT:
+ case STRING:
+ case BLOB:
+ cols[0].writeBinary((Binary) value);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported type: " + type);
+ }
+ }
+ builder.declarePosition();
+ }
+
+ return builder.build(
+ new RunLengthEncodedColumn(new TimeColumn(1, new long[] {0L}),
builder.getPositionCount()));
+ }
+}
diff --git a/external-service-impl/pom.xml b/external-service-impl/pom.xml
index 04483b289f3..511a58f2bef 100644
--- a/external-service-impl/pom.xml
+++ b/external-service-impl/pom.xml
@@ -33,6 +33,7 @@
<module>mqtt</module>
<module>rest</module>
<module>rest-openapi</module>
+ <module>flight-sql</module>
</modules>
<properties>
<maven.compiler.source>8</maven.compiler.source>
diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index a4776a57c55..4ef730957e4 100644
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -246,6 +246,16 @@
<!--We will integrate rest-jar-with-dependencies into lib by
assembly plugin-->
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>flight-sql</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-netty</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
index 82ed7976959..114556c8b68 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
@@ -170,6 +170,7 @@ public class ClusterConstant {
public static final String PIPE_LIB_DIR = "pipe_lib_dir";
public static final String REST_SERVICE_PORT = "rest_service_port";
public static final String INFLUXDB_RPC_PORT = "influxdb_rpc_port";
+ public static final String ARROW_FLIGHT_SQL_PORT = "arrow_flight_sql_port";
// ConfigNode
public static final String CN_SYSTEM_DIR = "cn_system_dir";
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 53aa395491b..d043cc4fbb2 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -315,6 +315,12 @@ public class MppCommonConfig extends MppBaseConfig
implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setEnableArrowFlightSqlService(boolean
enableArrowFlightSqlService) {
+ setProperty("enable_arrow_flight_sql_service",
String.valueOf(enableArrowFlightSqlService));
+ return this;
+ }
+
@Override
public CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
setProperty("mqtt_payload_formatter",
String.valueOf(mqttPayloadFormatter));
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
index 5e418072a7d..8fcb242ca12 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
@@ -102,6 +102,12 @@ public class MppDataNodeConfig extends MppBaseConfig
implements DataNodeConfig {
return this;
}
+ @Override
+ public DataNodeConfig setEnableArrowFlightSqlService(boolean
enableArrowFlightSqlService) {
+ setProperty("enable_arrow_flight_sql_service",
String.valueOf(enableArrowFlightSqlService));
+ return this;
+ }
+
@Override
public DataNodeConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
setProperty("mqtt_payload_formatter",
String.valueOf(mqttPayloadFormatter));
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index ed8a8943303..b73b55510c0 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -316,6 +316,13 @@ public class MppSharedCommonConfig implements CommonConfig
{
return this;
}
+ @Override
+ public CommonConfig setEnableArrowFlightSqlService(boolean
enableArrowFlightSqlService) {
+ cnConfig.setEnableArrowFlightSqlService(enableArrowFlightSqlService);
+ dnConfig.setEnableArrowFlightSqlService(enableArrowFlightSqlService);
+ return this;
+ }
+
@Override
public CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
cnConfig.setMqttPayloadFormatter(mqttPayloadFormatter);
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 8308225a9af..d223b1e32ee 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -955,7 +955,8 @@ public abstract class AbstractEnv implements BaseEnv {
}
// use this to avoid some runtimeExceptions when try to get jdbc connections.
- // because it is hard to add retry and handle exception when getting jdbc
connections in
+ // because it is hard to add retry and handle exception when getting jdbc
+ // connections in
// getWriteConnectionWithSpecifiedDataNode and getReadConnections.
// so use this function to add retry when cluster is ready.
// after retryCount times, if the jdbc can't connect, throw
@@ -1426,7 +1427,8 @@ public abstract class AbstractEnv implements BaseEnv {
final String endpoint = nodes.get(j).getIpAndPortString();
if (!nodeIds.containsKey(endpoint)) {
// Node not exist
- // Notice: Never modify this line, since the NodeLocation might be
modified in IT
+ // Notice: Never modify this line, since the NodeLocation might be
modified in
+ // IT
errorMessages.add("The node " + nodes.get(j).getIpAndPortString()
+ " is not found!");
continue;
}
@@ -1463,6 +1465,13 @@ public abstract class AbstractEnv implements BaseEnv {
.getMqttPort();
}
+ @Override
+ public int getArrowFlightSqlPort() {
+ return dataNodeWrapperList
+ .get(new
Random(System.currentTimeMillis()).nextInt(dataNodeWrapperList.size()))
+ .getArrowFlightSqlPort();
+ }
+
@Override
public String getIP() {
return dataNodeWrapperList.get(0).getIp();
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
index 96a0fbe27e0..c4111c0f5f6 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
import java.util.List;
import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
+import static
org.apache.iotdb.it.env.cluster.ClusterConstant.ARROW_FLIGHT_SQL_PORT;
import static
org.apache.iotdb.it.env.cluster.ClusterConstant.CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS;
import static
org.apache.iotdb.it.env.cluster.ClusterConstant.DATANODE_INIT_HEAP_SIZE;
import static
org.apache.iotdb.it.env.cluster.ClusterConstant.DATANODE_MAX_DIRECT_MEMORY_SIZE;
@@ -78,6 +79,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
private final int dataRegionConsensusPort;
private final int schemaRegionConsensusPort;
private final int mqttPort;
+ private final int arrowFlightSqlPort;
private final int restServicePort;
private final int pipeAirGapReceiverPort;
@@ -103,6 +105,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
this.schemaRegionConsensusPort = portList[4];
this.mqttPort = portList[5];
this.pipeAirGapReceiverPort = portList[6];
+ this.arrowFlightSqlPort = portList[7];
this.restServicePort = portList[10] + 6000;
this.defaultNodePropertiesFile =
EnvUtils.getFilePathFromSysVar(DEFAULT_DATA_NODE_PROPERTIES,
clusterIndex);
@@ -120,6 +123,8 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
MQTT_DATA_PATH, getNodePath() + File.separator + "mqttData");
immutableCommonProperties.setProperty(
PIPE_AIR_GAP_RECEIVER_PORT,
String.valueOf(this.pipeAirGapReceiverPort));
+ immutableCommonProperties.setProperty(
+ ARROW_FLIGHT_SQL_PORT, String.valueOf(this.arrowFlightSqlPort));
immutableNodeProperties.setProperty(REST_SERVICE_PORT,
String.valueOf(restServicePort));
@@ -304,6 +309,10 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
return pipeAirGapReceiverPort;
}
+ public int getArrowFlightSqlPort() {
+ return arrowFlightSqlPort;
+ }
+
public int getRestServicePort() {
return restServicePort;
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 0c05af17f6a..db1a713ee09 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -221,6 +221,11 @@ public class RemoteCommonConfig implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setEnableArrowFlightSqlService(boolean
enableArrowFlightSqlService) {
+ return this;
+ }
+
@Override
public CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
return this;
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
index bba4c964f95..d4526ad2834 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
@@ -64,6 +64,11 @@ public class RemoteDataNodeConfig implements DataNodeConfig {
return this;
}
+ @Override
+ public DataNodeConfig setEnableArrowFlightSqlService(boolean
enableArrowFlightSqlService) {
+ return this;
+ }
+
@Override
public DataNodeConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
return this;
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
index 586eff60494..5ad15d43e44 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
@@ -480,6 +480,11 @@ public class RemoteServerEnv implements BaseEnv {
throw new UnsupportedOperationException();
}
+ @Override
+ public int getArrowFlightSqlPort() {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public String getIP() {
return ip_addr;
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 8b32deb3ea8..43175676ab8 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -332,6 +332,8 @@ public interface BaseEnv {
int getMqttPort();
+ int getArrowFlightSqlPort();
+
String getIP();
String getPort();
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 95ac239e89b..d5af1068ec9 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -102,6 +102,8 @@ public interface CommonConfig {
CommonConfig setEnableMQTTService(boolean enableMQTTService);
+ CommonConfig setEnableArrowFlightSqlService(boolean
enableArrowFlightSqlService);
+
CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter);
CommonConfig setSchemaEngineMode(String schemaEngineMode);
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
index d57015b1396..9cbdf393e8d 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
@@ -40,6 +40,8 @@ public interface DataNodeConfig {
DataNodeConfig setEnableMQTTService(boolean enableMQTTService);
+ DataNodeConfig setEnableArrowFlightSqlService(boolean
enableArrowFlightSqlService);
+
DataNodeConfig setMqttPayloadFormatter(String mqttPayloadFormatter);
DataNodeConfig setLoadLastCacheStrategy(String strategyName);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java
new file mode 100644
index 00000000000..d931ca77999
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.relational.it.flightsql;
+
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
+
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightEndpoint;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.flight.auth2.BasicAuthCredentialWriter;
+import org.apache.arrow.flight.auth2.ClientBearerHeaderHandler;
+import org.apache.arrow.flight.auth2.ClientIncomingAuthHeaderMiddleware;
+import org.apache.arrow.flight.grpc.CredentialCallOption;
+import org.apache.arrow.flight.sql.FlightSqlClient;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * Integration tests for Arrow Flight SQL service in IoTDB. Tests the
end-to-end flow: client
+ * connects via Flight SQL protocol, authenticates, executes SQL queries, and
receives
+ * Arrow-formatted results.
+ */
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBArrowFlightSqlIT {
+
+ private static final String DATABASE = "flightsql_test_db";
+ private static final String USER = "root";
+ private static final String PASSWORD = "root";
+
+ private BufferAllocator allocator;
+ private FlightClient flightClient;
+ private FlightSqlClient flightSqlClient;
+ private CredentialCallOption bearerToken;
+
+ @Before
+ public void setUp() throws Exception {
+ // Configure and start the cluster with Arrow Flight SQL enabled
+ BaseEnv baseEnv = EnvFactory.getEnv();
+
baseEnv.getConfig().getDataNodeConfig().setEnableArrowFlightSqlService(true);
+ baseEnv.initClusterEnvironment();
+
+ // Get the Flight SQL port from the data node
+ int port = EnvFactory.getEnv().getArrowFlightSqlPort();
+
+ // Create Arrow allocator and Flight client with Bearer token auth
middleware
+ allocator = new RootAllocator(Long.MAX_VALUE);
+ Location location = Location.forGrpcInsecure("127.0.0.1", port);
+
+ // The ClientIncomingAuthHeaderMiddleware captures the Bearer token from
the
+ // auth handshake
+ ClientIncomingAuthHeaderMiddleware.Factory authFactory =
+ new ClientIncomingAuthHeaderMiddleware.Factory(new
ClientBearerHeaderHandler());
+
+ flightClient = FlightClient.builder(allocator,
location).intercept(authFactory).build();
+
+ // Authenticate: sends Basic credentials, server returns Bearer token
+ bearerToken = new CredentialCallOption(new BasicAuthCredentialWriter(USER,
PASSWORD));
+
+ // Wrap in FlightSqlClient for Flight SQL protocol operations
+ flightSqlClient = new FlightSqlClient(flightClient);
+
+ // Use the standard session to create the test database and table with data
+ try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnection()) {
+ session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS " +
DATABASE);
+ }
+ try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE)) {
+ session.executeNonQueryStatement(
+ "CREATE TABLE test_table ("
+ + "id1 STRING TAG, "
+ + "s1 INT32 FIELD, "
+ + "s2 INT64 FIELD, "
+ + "s3 FLOAT FIELD, "
+ + "s4 DOUBLE FIELD, "
+ + "s5 BOOLEAN FIELD, "
+ + "s6 TEXT FIELD)");
+ session.executeNonQueryStatement(
+ "INSERT INTO test_table(time, id1, s1, s2, s3, s4, s5, s6) "
+ + "VALUES(1, 'device1', 100, 1000, 1.5, 2.5, true, 'hello')");
+ session.executeNonQueryStatement(
+ "INSERT INTO test_table(time, id1, s1, s2, s3, s4, s5, s6) "
+ + "VALUES(2, 'device1', 200, 2000, 3.5, 4.5, false, 'world')");
+ session.executeNonQueryStatement(
+ "INSERT INTO test_table(time, id1, s1, s2, s3, s4, s5, s6) "
+ + "VALUES(3, 'device2', 300, 3000, 5.5, 6.5, true, 'iotdb')");
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (flightSqlClient != null) {
+ try {
+ flightSqlClient.close();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ if (flightClient != null) {
+ try {
+ flightClient.close();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ if (allocator != null) {
+ allocator.close();
+ }
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testQueryWithAllDataTypes() throws Exception {
+ FlightInfo flightInfo =
+ flightSqlClient.execute(
+ "SELECT time, id1, s1, s2, s3, s4, s5, s6 FROM test_table ORDER BY
time", bearerToken);
+
+ // Validate schema
+ Schema schema = flightInfo.getSchema();
+ assertNotNull("Schema should not be null", schema);
+ List<Field> fields = schema.getFields();
+ assertEquals("Should have 8 columns", 8, fields.size());
+
+ // Fetch all data
+ List<List<String>> rows = fetchAllRows(flightInfo);
+ assertEquals("Should have 3 rows", 3, rows.size());
+ }
+
+ @Test
+ public void testQueryWithFilter() throws Exception {
+ FlightInfo flightInfo =
+ flightSqlClient.execute(
+ "SELECT id1, s1 FROM test_table WHERE id1 = 'device1' ORDER BY
time", bearerToken);
+
+ List<List<String>> rows = fetchAllRows(flightInfo);
+ assertEquals("Should have 2 rows for device1", 2, rows.size());
+ }
+
+ @Test
+ public void testQueryWithAggregation() throws Exception {
+ FlightInfo flightInfo =
+ flightSqlClient.execute(
+ "SELECT id1, COUNT(*) as cnt, SUM(s1) as s1_sum "
+ + "FROM test_table GROUP BY id1 ORDER BY id1",
+ bearerToken);
+
+ List<List<String>> rows = fetchAllRows(flightInfo);
+ assertEquals("Should have 2 groups", 2, rows.size());
+ }
+
+ @Test
+ public void testEmptyResult() throws Exception {
+ FlightInfo flightInfo =
+ flightSqlClient.execute("SELECT * FROM test_table WHERE id1 =
'nonexistent'", bearerToken);
+
+ List<List<String>> rows = fetchAllRows(flightInfo);
+ assertEquals("Should have 0 rows", 0, rows.size());
+ }
+
+ @Test
+ public void testShowDatabases() throws Exception {
+ FlightInfo flightInfo = flightSqlClient.execute("SHOW DATABASES",
bearerToken);
+
+ List<List<String>> rows = fetchAllRows(flightInfo);
+ assertTrue("Should have at least 1 database", rows.size() >= 1);
+
+ boolean found = false;
+ for (List<String> row : rows) {
+ for (String val : row) {
+ if (val.contains(DATABASE)) {
+ found = true;
+ break;
+ }
+ }
+ }
+ assertTrue("Should find test database " + DATABASE, found);
+ }
+
+ /**
+ * Fetches all rows from all endpoints in a FlightInfo. Each row is a list
of string
+ * representations of the column values.
+ */
+ private List<List<String>> fetchAllRows(FlightInfo flightInfo) throws
Exception {
+ List<List<String>> rows = new ArrayList<>();
+ for (FlightEndpoint endpoint : flightInfo.getEndpoints()) {
+ try (FlightStream stream =
flightSqlClient.getStream(endpoint.getTicket(), bearerToken)) {
+ while (stream.next()) {
+ VectorSchemaRoot root = stream.getRoot();
+ int rowCount = root.getRowCount();
+ for (int i = 0; i < rowCount; i++) {
+ List<String> row = new ArrayList<>();
+ for (FieldVector vector : root.getFieldVectors()) {
+ Object value = vector.getObject(i);
+ row.add(value == null ? "null" : value.toString());
+ }
+ rows.add(row);
+ }
+ }
+ }
+ }
+ return rows;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 98c15a2d9bf..3fef5d79fe1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -95,7 +95,7 @@ public class IoTDBConfig {
// e.g., a31+/$%#&[]{}3e4, "a.b", 'a.b'
private static final String NODE_NAME_MATCHER = "([^\n\t]+)";
- // e.g., .s1
+ // e.g., .s1
private static final String PARTIAL_NODE_MATCHER = "[" + PATH_SEPARATOR +
"]" + NODE_NAME_MATCHER;
private static final String NODE_MATCHER =
@@ -106,6 +106,12 @@ public class IoTDBConfig {
/** Whether to enable the mqtt service. */
private boolean enableMQTTService = false;
+ /** Whether to enable the Arrow Flight SQL service. */
+ private boolean enableArrowFlightSqlService = false;
+
+ /** The Arrow Flight SQL service binding port. */
+ private int arrowFlightSqlPort = 8904;
+
/** The mqtt service binding host. */
private String mqttHost = "127.0.0.1";
@@ -703,14 +709,16 @@ public class IoTDBConfig {
private int compactionMaxAlignedSeriesNumInOneBatch = 10;
/*
- * How many thread will be set up to perform continuous queries. When <= 0,
use max(1, CPU core number / 2).
+ * How many thread will be set up to perform continuous queries. When <= 0,
use
+ * max(1, CPU core number / 2).
*/
private int continuousQueryThreadNum =
Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
/*
* Minimum every interval to perform continuous query.
- * The every interval of continuous query instances should not be lower than
this limit.
+ * The every interval of continuous query instances should not be lower than
+ * this limit.
*/
private long continuousQueryMinimumEveryInterval = 1000;
@@ -789,7 +797,8 @@ public class IoTDBConfig {
private int tagAttributeFlushInterval = 1000;
// In one insert (one device, one timestamp, multiple measurements),
- // if enable partial insert, one measurement failure will not impact other
measurements
+ // if enable partial insert, one measurement failure will not impact other
+ // measurements
private boolean enablePartialInsert = true;
private boolean enable13DataInsertAdapt = false;
@@ -1108,7 +1117,8 @@ public class IoTDBConfig {
private int loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber = 4096;
private int loadTsFileAnalyzeSchemaBatchFlushTableDeviceNumber = 4096; //
For table model
private long loadTsFileAnalyzeSchemaMemorySizeInBytes =
- 0L; // 0 means that the decision will be adaptive based on the number of
sequences
+ 0L; // 0 means that the decision will be adaptive based on the
+ // number of sequences
private long loadTsFileTabletConversionBatchMemorySizeInBytes = 4096 * 1024;
@@ -1449,7 +1459,8 @@ public class IoTDBConfig {
SystemMetrics.getInstance().setDiskDirs(diskDirs);
}
- // if IOTDB_DATA_HOME is not set, then we keep dataHomeDir prefix being the
same with IOTDB_HOME
+ // if IOTDB_DATA_HOME is not set, then we keep dataHomeDir prefix being the
same
+ // with IOTDB_HOME
// In this way, we can keep consistent with v0.13.0~2.
public static String addDataHomeDir(final String dir) {
String dataHomeDir = System.getProperty(IoTDBConstant.IOTDB_DATA_HOME,
null);
@@ -1518,7 +1529,8 @@ public class IoTDBConfig {
public void setTierDataDirs(String[][] tierDataDirs) {
formulateDataDirs(tierDataDirs);
this.tierDataDirs = tierDataDirs;
- // TODO(szywilliam): rewrite the logic here when ratis supports complete
snapshot semantic
+ // TODO(szywilliam): rewrite the logic here when ratis supports complete
+ // snapshot semantic
setRatisDataRegionSnapshotDir(
tierDataDirs[0][0] + File.separator +
IoTDBConstant.SNAPSHOT_FOLDER_NAME);
}
@@ -1730,7 +1742,8 @@ public class IoTDBConfig {
public void checkMultiDirStrategyClassName() {
confirmMultiDirStrategy();
for (String multiDirStrategy : CLUSTER_ALLOWED_MULTI_DIR_STRATEGIES) {
- // If the multiDirStrategyClassName is one of cluster allowed strategy,
the check is passed.
+ // If the multiDirStrategyClassName is one of cluster allowed strategy,
the
+ // check is passed.
if (multiDirStrategyClassName.equals(multiDirStrategy)
|| multiDirStrategyClassName.equals(MULTI_DIR_STRATEGY_PREFIX +
multiDirStrategy)) {
return;
@@ -2535,6 +2548,22 @@ public class IoTDBConfig {
this.enableMQTTService = enableMQTTService;
}
+ public boolean isEnableArrowFlightSqlService() {
+ return enableArrowFlightSqlService;
+ }
+
+ public void setEnableArrowFlightSqlService(boolean
enableArrowFlightSqlService) {
+ this.enableArrowFlightSqlService = enableArrowFlightSqlService;
+ }
+
+ public int getArrowFlightSqlPort() {
+ return arrowFlightSqlPort;
+ }
+
+ public void setArrowFlightSqlPort(int arrowFlightSqlPort) {
+ this.arrowFlightSqlPort = arrowFlightSqlPort;
+ }
+
public String getMqttHost() {
return mqttHost;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java
index 157734aef56..820a5b75822 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java
@@ -32,7 +32,11 @@ public enum BuiltinExternalServices {
REST(
"REST",
"org.apache.iotdb.rest.RestService",
-
IoTDBRestServiceDescriptor.getInstance().getConfig()::isEnableRestService);
+
IoTDBRestServiceDescriptor.getInstance().getConfig()::isEnableRestService),
+ FLIGHT_SQL(
+ "FLIGHT_SQL",
+ "org.apache.iotdb.flight.FlightSqlService",
+
IoTDBDescriptor.getInstance().getConfig()::isEnableArrowFlightSqlService);
private final String serviceName;
private final String className;
diff --git a/pom.xml b/pom.xml
index 99ba9725ca6..334b89200b5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -174,6 +174,7 @@
<xz.version>1.9</xz.version>
<zstd-jni.version>1.5.6-3</zstd-jni.version>
<tsfile.version>2.2.1-260206-SNAPSHOT</tsfile.version>
+ <arrow.version>17.0.0</arrow.version>
</properties>
<!--
if we claim dependencies in dependencyManagement, then we do not claim
@@ -748,6 +749,16 @@
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>flight-sql</artifactId>
+ <version>${arrow.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-netty</artifactId>
+ <version>${arrow.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
<build>