This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ai-code/flight-sql
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5d60e292e88cf7ca749854b370459c62253ded99
Author: JackieTien97 <[email protected]>
AuthorDate: Thu Feb 12 20:57:26 2026 +0800

    Add Arrow Flight SQL service as external service plugin
    - Add new external-service-impl/flight-sql module with Arrow 17.0.0
    - Implement FlightSqlService (IExternalService lifecycle management)
    - Implement IoTDBFlightSqlProducer (SQL execution via Coordinator, 
TsBlock→Arrow streaming)
    - Implement TsBlockToArrowConverter supporting all 10 data types
    - Implement Bearer token authentication (FlightSqlAuthHandler, 
FlightSqlSessionManager)
    - Add enableArrowFlightSqlService and arrowFlightSqlPort config to 
IoTDBConfig
    - Add FLIGHT_SQL entry to BuiltinExternalServices
    - Add Arrow Flight SQL port allocation and configuration to IT framework 
(13 files)
    - Add IoTDBArrowFlightSqlIT with 5 integration test cases
    - Add unit tests for TsBlockToArrowConverter (16 test cases)
---
 external-service-impl/flight-sql/pom.xml           | 121 +++++
 .../apache/iotdb/flight/FlightSqlAuthHandler.java  |  59 +++
 .../iotdb/flight/FlightSqlAuthMiddleware.java      |  89 ++++
 .../org/apache/iotdb/flight/FlightSqlService.java  | 134 ++++++
 .../iotdb/flight/FlightSqlSessionManager.java      | 149 ++++++
 .../iotdb/flight/IoTDBFlightSqlProducer.java       | 531 +++++++++++++++++++++
 .../iotdb/flight/TsBlockToArrowConverter.java      | 277 +++++++++++
 .../iotdb/flight/TsBlockToArrowConverterTest.java  | 493 +++++++++++++++++++
 external-service-impl/pom.xml                      |   1 +
 integration-test/pom.xml                           |  10 +
 .../iotdb/it/env/cluster/ClusterConstant.java      |   1 +
 .../it/env/cluster/config/MppCommonConfig.java     |   6 +
 .../it/env/cluster/config/MppDataNodeConfig.java   |   6 +
 .../env/cluster/config/MppSharedCommonConfig.java  |   7 +
 .../iotdb/it/env/cluster/env/AbstractEnv.java      |  13 +-
 .../iotdb/it/env/cluster/node/DataNodeWrapper.java |   9 +
 .../it/env/remote/config/RemoteCommonConfig.java   |   5 +
 .../it/env/remote/config/RemoteDataNodeConfig.java |   5 +
 .../iotdb/it/env/remote/env/RemoteServerEnv.java   |   5 +
 .../java/org/apache/iotdb/itbase/env/BaseEnv.java  |   2 +
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |   2 +
 .../apache/iotdb/itbase/env/DataNodeConfig.java    |   2 +
 .../it/flightsql/IoTDBArrowFlightSqlIT.java        | 239 ++++++++++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  45 +-
 .../externalservice/BuiltinExternalServices.java   |   6 +-
 pom.xml                                            |  11 +
 26 files changed, 2217 insertions(+), 11 deletions(-)

diff --git a/external-service-impl/flight-sql/pom.xml 
b/external-service-impl/flight-sql/pom.xml
new file mode 100644
index 00000000000..be363c336b7
--- /dev/null
+++ b/external-service-impl/flight-sql/pom.xml
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.iotdb</groupId>
+        <artifactId>external-service-impl</artifactId>
+        <version>2.0.7-SNAPSHOT</version>
+    </parent>
+    <artifactId>flight-sql</artifactId>
+    <name>IoTDB: External-Service-Impl: Arrow Flight SQL</name>
+    <properties>
+        <maven.compiler.source>11</maven.compiler.source>
+        <maven.compiler.target>11</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+    <dependencies>
+        <!-- Arrow Flight SQL (bundled in jar-with-dependencies) -->
+        <dependency>
+            <groupId>org.apache.arrow</groupId>
+            <artifactId>flight-sql</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.arrow</groupId>
+            <artifactId>arrow-memory-netty</artifactId>
+            <scope>runtime</scope>
+        </dependency>
+        <!-- IoTDB dependencies (provided at runtime by DataNode classloader) 
-->
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <scope>provided</scope>
+            <version>2.0.7-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>node-commons</artifactId>
+            <version>2.0.7-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.tsfile</groupId>
+            <artifactId>tsfile</artifactId>
+            <version>${tsfile.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.tsfile</groupId>
+            <artifactId>common</artifactId>
+            <version>${tsfile.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>${maven.assembly.version}</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <!-- bind to the packaging phase -->
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <!-- this is used for inheritance merges -->
+                        <phase>package</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <ignoredDependencies>
+                        
<ignoredDependency>org.apache.tsfile:common</ignoredDependency>
+                    </ignoredDependencies>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java
 
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java
new file mode 100644
index 00000000000..b591665eab0
--- /dev/null
+++ 
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flight;
+
+import org.apache.arrow.flight.CallStatus;
+import org.apache.arrow.flight.auth2.BasicCallHeaderAuthenticator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Arrow Flight SQL credential validator using Arrow's built-in auth2 
framework. Validates
+ * username/password credentials via IoTDB's SessionManager and returns a 
Bearer token string as the
+ * peer identity for subsequent requests.
+ *
+ * <p>Used with {@link BasicCallHeaderAuthenticator} and {@link
+ * org.apache.arrow.flight.auth2.GeneratedBearerTokenAuthenticator} to provide 
Basic → Bearer token
+ * authentication flow.
+ */
+public class FlightSqlAuthHandler implements 
BasicCallHeaderAuthenticator.CredentialValidator {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FlightSqlAuthHandler.class);
+  private final FlightSqlSessionManager sessionManager;
+
+  public FlightSqlAuthHandler(FlightSqlSessionManager sessionManager) {
+    this.sessionManager = sessionManager;
+  }
+
+  @Override
+  public org.apache.arrow.flight.auth2.CallHeaderAuthenticator.AuthResult 
validate(
+      String username, String password) {
+    LOGGER.debug("Validating credentials for user: {}", username);
+
+    try {
+      String token = sessionManager.authenticate(username, password, 
"unknown");
+      // Return the token as the peer identity; 
GeneratedBearerTokenAuthenticator
+      // wraps it in a Bearer token and sets it in the response header.
+      return () -> token;
+    } catch (SecurityException e) {
+      throw 
CallStatus.UNAUTHENTICATED.withDescription(e.getMessage()).toRuntimeException();
+    }
+  }
+}
diff --git 
a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthMiddleware.java
 
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthMiddleware.java
new file mode 100644
index 00000000000..2cf0d048556
--- /dev/null
+++ 
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthMiddleware.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flight;
+
+import org.apache.arrow.flight.CallHeaders;
+import org.apache.arrow.flight.CallInfo;
+import org.apache.arrow.flight.CallStatus;
+import org.apache.arrow.flight.FlightServerMiddleware;
+import org.apache.arrow.flight.RequestContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flight Server middleware for handling Bearer token / Basic authentication. 
Supports initial login
+ * via Basic auth header (username:password), returning a Bearer token. 
Subsequent requests use the
+ * Bearer token.
+ */
+public class FlightSqlAuthMiddleware implements FlightServerMiddleware {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FlightSqlAuthMiddleware.class);
+
+  /** The middleware key used to retrieve this middleware in the CallContext. 
*/
+  public static final Key<FlightSqlAuthMiddleware> KEY = 
Key.of("flight-sql-auth-middleware");
+
+  private final CallHeaders incomingHeaders;
+
+  FlightSqlAuthMiddleware(CallHeaders incomingHeaders) {
+    this.incomingHeaders = incomingHeaders;
+  }
+
+  /** Returns the incoming call headers for session lookup. */
+  public CallHeaders getCallHeaders() {
+    return incomingHeaders;
+  }
+
+  @Override
+  public void onBeforeSendingHeaders(CallHeaders outgoingHeaders) {
+    // no-op: token is set during Handshake response, not here
+  }
+
+  @Override
+  public void onCallCompleted(CallStatus status) {
+    // no-op
+  }
+
+  @Override
+  public void onCallErrored(Throwable err) {
+    // no-op
+  }
+
+  // ===================== Factory =====================
+
+  /** Factory that creates FlightSqlAuthMiddleware for each call. */
+  public static class Factory implements 
FlightServerMiddleware.Factory<FlightSqlAuthMiddleware> {
+
+    private final FlightSqlSessionManager sessionManager;
+
+    public Factory(FlightSqlSessionManager sessionManager) {
+      this.sessionManager = sessionManager;
+    }
+
+    @Override
+    public FlightSqlAuthMiddleware onCallStarted(
+        CallInfo callInfo, CallHeaders incomingHeaders, RequestContext 
context) {
+      return new FlightSqlAuthMiddleware(incomingHeaders);
+    }
+
+    public FlightSqlSessionManager getSessionManager() {
+      return sessionManager;
+    }
+  }
+}
diff --git 
a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlService.java
 
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlService.java
new file mode 100644
index 00000000000..96480449fcc
--- /dev/null
+++ 
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlService.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flight;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.externalservice.api.IExternalService;
+
+import org.apache.arrow.flight.FlightServer;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.flight.auth2.BasicCallHeaderAuthenticator;
+import org.apache.arrow.flight.auth2.GeneratedBearerTokenAuthenticator;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Arrow Flight SQL service implementation for IoTDB. Implements the 
IExternalService interface to
+ * integrate with IoTDB's external service management framework (plugin 
lifecycle).
+ *
+ * <p>This service starts a gRPC-based Arrow Flight SQL server that allows 
clients to execute SQL
+ * queries using the Arrow Flight SQL protocol and receive results in columnar 
Arrow format.
+ */
+public class FlightSqlService implements IExternalService {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FlightSqlService.class);
+  private static final long SESSION_TIMEOUT_MINUTES = 30;
+
+  private FlightServer flightServer;
+  private BufferAllocator allocator;
+  private FlightSqlSessionManager flightSessionManager;
+  private IoTDBFlightSqlProducer producer;
+
+  @Override
+  public void start() {
+    int port = 
IoTDBDescriptor.getInstance().getConfig().getArrowFlightSqlPort();
+    LOGGER.info("Starting Arrow Flight SQL service on port {}", port);
+
+    try {
+      // Create the root allocator for Arrow memory management
+      allocator = new RootAllocator(Long.MAX_VALUE);
+
+      // Create session manager with TTL
+      flightSessionManager = new 
FlightSqlSessionManager(SESSION_TIMEOUT_MINUTES);
+
+      // Create the auth handler
+      FlightSqlAuthHandler authHandler = new 
FlightSqlAuthHandler(flightSessionManager);
+
+      // Create the Flight SQL producer
+      producer = new IoTDBFlightSqlProducer(allocator, flightSessionManager);
+
+      // Build the Flight server with auth2 Bearer token authentication
+      Location location = Location.forGrpcInsecure("0.0.0.0", port);
+      flightServer =
+          FlightServer.builder(allocator, location, producer)
+              .headerAuthenticator(
+                  new GeneratedBearerTokenAuthenticator(
+                      new BasicCallHeaderAuthenticator(authHandler)))
+              .build();
+
+      flightServer.start();
+      LOGGER.info(
+          "Arrow Flight SQL service started successfully on port {}", 
flightServer.getPort());
+    } catch (IOException e) {
+      LOGGER.error("Failed to start Arrow Flight SQL service", e);
+      stop();
+      throw new RuntimeException("Failed to start Arrow Flight SQL service", 
e);
+    }
+  }
+
+  @Override
+  public void stop() {
+    LOGGER.info("Stopping Arrow Flight SQL service");
+
+    if (flightServer != null) {
+      try {
+        flightServer.shutdown();
+        flightServer.awaitTermination(10, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        LOGGER.warn("Interrupted while waiting for Flight server shutdown", e);
+        Thread.currentThread().interrupt();
+        try {
+          flightServer.close();
+        } catch (Exception ex) {
+          LOGGER.warn("Error force-closing Flight server", ex);
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Error shutting down Flight server", e);
+      }
+      flightServer = null;
+    }
+
+    if (producer != null) {
+      try {
+        producer.close();
+      } catch (Exception e) {
+        LOGGER.warn("Error closing Flight SQL producer", e);
+      }
+      producer = null;
+    }
+
+    if (flightSessionManager != null) {
+      flightSessionManager.close();
+      flightSessionManager = null;
+    }
+
+    if (allocator != null) {
+      allocator.close();
+      allocator = null;
+    }
+
+    LOGGER.info("Arrow Flight SQL service stopped");
+  }
+}
diff --git 
a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlSessionManager.java
 
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlSessionManager.java
new file mode 100644
index 00000000000..ad829888f22
--- /dev/null
+++ 
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlSessionManager.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flight;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.InternalClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import org.apache.arrow.flight.CallHeaders;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Manages Arrow Flight SQL client sessions using Bearer token authentication. 
Maps Bearer tokens to
+ * IoTDB IClientSession objects with a TTL-based Caffeine cache.
+ */
+public class FlightSqlSessionManager {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FlightSqlSessionManager.class);
+  private static final String AUTHORIZATION_HEADER = "authorization";
+  private static final String BEARER_PREFIX = "Bearer ";
+
+  private final SessionManager sessionManager = SessionManager.getInstance();
+
+  /** Cache of Bearer token -> IClientSession with configurable TTL. */
+  private final Cache<String, IClientSession> tokenCache;
+
+  public FlightSqlSessionManager(long sessionTimeoutMinutes) {
+    this.tokenCache =
+        Caffeine.newBuilder()
+            .expireAfterAccess(sessionTimeoutMinutes, TimeUnit.MINUTES)
+            .removalListener(
+                (String token, IClientSession session, RemovalCause cause) -> {
+                  if (session != null && cause != RemovalCause.REPLACED) {
+                    LOGGER.info("Flight SQL session expired, closing: {}", 
session);
+                    sessionManager.closeSession(
+                        session,
+                        queryId ->
+                            
org.apache.iotdb.db.queryengine.plan.Coordinator.getInstance()
+                                .cleanupQueryExecution(queryId),
+                        false);
+                    sessionManager.removeCurrSessionForMqtt(null); // handled 
via sessions map only
+                  }
+                })
+            .build();
+  }
+
+  /**
+   * Authenticates a user with username/password and returns a Bearer token.
+   *
+   * @param username the username
+   * @param password the password
+   * @param clientAddress the client's IP address
+   * @return the Bearer token if authentication succeeds
+   * @throws SecurityException if authentication fails
+   */
+  public String authenticate(String username, String password, String 
clientAddress) {
+    // Create a session for this client
+    IClientSession session = new InternalClientSession("FlightSQL-" + 
clientAddress);
+    session.setSqlDialect(IClientSession.SqlDialect.TABLE);
+
+    // Register the session before login (MQTT pattern)
+    sessionManager.registerSessionForMqtt(session);
+
+    // Use SessionManager's login method
+    org.apache.iotdb.db.protocol.basic.BasicOpenSessionResp loginResp =
+        sessionManager.login(
+            session,
+            username,
+            password,
+            java.time.ZoneId.systemDefault().getId(),
+            SessionManager.CURRENT_RPC_VERSION,
+            IoTDBConstant.ClientVersion.V_1_0,
+            IClientSession.SqlDialect.TABLE);
+
+    if (loginResp.getCode() != 
org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      // Remove the session if login failed
+      sessionManager.removeCurrSessionForMqtt(null);
+      throw new SecurityException("Authentication failed: " + 
loginResp.getMessage());
+    }
+
+    // Generate Bearer token and store in cache
+    String token = UUID.randomUUID().toString();
+    tokenCache.put(token, session);
+    LOGGER.info("Flight SQL user '{}' authenticated, session: {}", username, 
session);
+    return token;
+  }
+
+  /**
+   * Retrieves the IClientSession associated with a Bearer token from request 
headers.
+   *
+   * @param headers the Call headers containing the Authorization header
+   * @return the associated IClientSession
+   * @throws SecurityException if the token is invalid or expired
+   */
+  public IClientSession getSession(CallHeaders headers) {
+    String authHeader = headers.get(AUTHORIZATION_HEADER);
+    if (authHeader == null || !authHeader.startsWith(BEARER_PREFIX)) {
+      throw new SecurityException("Missing or invalid Authorization header");
+    }
+    String token = authHeader.substring(BEARER_PREFIX.length());
+    return getSessionByToken(token);
+  }
+
+  /**
+   * Retrieves the IClientSession associated with a Bearer token.
+   *
+   * @param token the Bearer token
+   * @return the associated IClientSession
+   * @throws SecurityException if the token is invalid or expired
+   */
+  public IClientSession getSessionByToken(String token) {
+    IClientSession session = tokenCache.getIfPresent(token);
+    if (session == null) {
+      throw new SecurityException("Invalid or expired Bearer token");
+    }
+    return session;
+  }
+
+  /** Invalidates all sessions and cleans up resources. */
+  public void close() {
+    tokenCache.invalidateAll();
+    tokenCache.cleanUp();
+  }
+}
diff --git 
a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java
 
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java
new file mode 100644
index 00000000000..067ccfb20d4
--- /dev/null
+++ 
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flight;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.protobuf.ByteString;
+import org.apache.arrow.flight.CallStatus;
+import org.apache.arrow.flight.Criteria;
+import org.apache.arrow.flight.FlightDescriptor;
+import org.apache.arrow.flight.FlightEndpoint;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.PutResult;
+import org.apache.arrow.flight.Result;
+import org.apache.arrow.flight.SchemaResult;
+import org.apache.arrow.flight.Ticket;
+import org.apache.arrow.flight.sql.FlightSqlProducer;
+import org.apache.arrow.flight.sql.impl.FlightSql;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Apache Arrow Flight SQL producer implementation for IoTDB. Handles SQL 
query execution via the
+ * Arrow Flight SQL protocol, using the Table model SQL dialect.
+ */
+public class IoTDBFlightSqlProducer implements FlightSqlProducer {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBFlightSqlProducer.class);
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+
+  private final BufferAllocator allocator;
+  private final FlightSqlSessionManager flightSessionManager;
+  private final Coordinator coordinator = Coordinator.getInstance();
+  private final SessionManager sessionManager = SessionManager.getInstance();
+  private final SqlParser sqlParser = new SqlParser();
+  private final Metadata metadata = 
LocalExecutionPlanner.getInstance().metadata;
+
+  /** Stores query execution context by queryId for streaming results via 
getStream. */
+  private final ConcurrentHashMap<Long, QueryContext> activeQueries = new 
ConcurrentHashMap<>();
+
+  public IoTDBFlightSqlProducer(
+      BufferAllocator allocator, FlightSqlSessionManager flightSessionManager) 
{
+    this.allocator = allocator;
+    this.flightSessionManager = flightSessionManager;
+  }
+
+  // ===================== Session Retrieval =====================
+
+  /**
+   * Retrieves the IClientSession for the current call. The auth2 framework 
sets the peer identity
+   * to the Bearer token after handshake.
+   */
+  private IClientSession getSessionFromContext(CallContext context) {
+    String peerIdentity = context.peerIdentity();
+    if (peerIdentity == null || peerIdentity.isEmpty()) {
+      throw CallStatus.UNAUTHENTICATED.withDescription("Not 
authenticated").toRuntimeException();
+    }
+    try {
+      return flightSessionManager.getSessionByToken(peerIdentity);
+    } catch (SecurityException e) {
+      throw 
CallStatus.UNAUTHENTICATED.withDescription(e.getMessage()).toRuntimeException();
+    }
+  }
+
+  // ===================== SQL Query Execution =====================
+
+  @Override
+  public FlightInfo getFlightInfoStatement(
+      FlightSql.CommandStatementQuery command, CallContext context, 
FlightDescriptor descriptor) {
+    String sql = command.getQuery();
+    LOGGER.debug("getFlightInfoStatement: {}", sql);
+
+    IClientSession session = getSessionFromContext(context);
+
+    Long queryId = null;
+    try {
+      queryId = sessionManager.requestQueryId();
+
+      // Parse the SQL statement
+      Statement statement = sqlParser.createStatement(sql, 
ZoneId.systemDefault(), session);
+
+      // Execute via Coordinator (Table model)
+      ExecutionResult result =
+          coordinator.executeForTableModel(
+              statement,
+              sqlParser,
+              session,
+              queryId,
+              sessionManager.getSessionInfo(session),
+              sql,
+              metadata,
+              CONFIG.getQueryTimeoutThreshold(),
+              true);
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && result.status.code != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        throw CallStatus.INTERNAL
+            .withDescription("Query execution failed: " + 
result.status.getMessage())
+            .toRuntimeException();
+      }
+
+      IQueryExecution queryExecution = coordinator.getQueryExecution(queryId);
+      if (queryExecution == null) {
+        throw CallStatus.INTERNAL
+            .withDescription("Query execution not found after execution")
+            .toRuntimeException();
+      }
+
+      DatasetHeader header = queryExecution.getDatasetHeader();
+      Schema arrowSchema = TsBlockToArrowConverter.toArrowSchema(header);
+
+      // Store the query context for later getStream calls
+      activeQueries.put(queryId, new QueryContext(queryExecution, header, 
session));
+
+      // Build ticket containing the queryId
+      byte[] ticketBytes = 
Long.toString(queryId).getBytes(StandardCharsets.UTF_8);
+      Ticket ticket = new Ticket(ticketBytes);
+      FlightEndpoint endpoint = new FlightEndpoint(ticket);
+
+      return new FlightInfo(arrowSchema, descriptor, 
Collections.singletonList(endpoint), -1, -1);
+
+    } catch (RuntimeException e) {
+      // Cleanup on error
+      if (queryId != null) {
+        coordinator.cleanupQueryExecution(queryId);
+        activeQueries.remove(queryId);
+      }
+      throw e;
+    }
+  }
+
+  @Override
+  public SchemaResult getSchemaStatement(
+      FlightSql.CommandStatementQuery command, CallContext context, 
FlightDescriptor descriptor) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("getSchemaStatement not implemented")
+        .toRuntimeException();
+  }
+
+  @Override
+  public void getStreamStatement(
+      FlightSql.TicketStatementQuery ticketQuery,
+      CallContext context,
+      ServerStreamListener listener) {
+    ByteString handle = ticketQuery.getStatementHandle();
+    long queryId = Long.parseLong(handle.toStringUtf8());
+    streamQueryResults(queryId, listener);
+  }
+
+  /** Streams query results for a given queryId as Arrow VectorSchemaRoot 
batches. */
+  private void streamQueryResults(long queryId, ServerStreamListener listener) 
{
+    QueryContext ctx = activeQueries.get(queryId);
+    if (ctx == null) {
+      listener.error(
+          CallStatus.NOT_FOUND
+              .withDescription("Query not found for id: " + queryId)
+              .toRuntimeException());
+      return;
+    }
+
+    VectorSchemaRoot root = null;
+    try {
+      root = TsBlockToArrowConverter.createVectorSchemaRoot(ctx.header, 
allocator);
+      listener.start(root);
+
+      while (true) {
+        Optional<TsBlock> optionalTsBlock = 
ctx.queryExecution.getBatchResult();
+        if (!optionalTsBlock.isPresent() || optionalTsBlock.get().isEmpty()) {
+          break;
+        }
+
+        TsBlock tsBlock = optionalTsBlock.get();
+        root.clear();
+        TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, 
ctx.header);
+        listener.putNext();
+      }
+
+      listener.completed();
+    } catch (IoTDBException e) {
+      LOGGER.error("Error streaming query results for queryId={}", queryId, e);
+      
listener.error(CallStatus.INTERNAL.withDescription(e.getMessage()).toRuntimeException());
+    } finally {
+      coordinator.cleanupQueryExecution(queryId);
+      activeQueries.remove(queryId);
+      if (root != null) {
+        root.close();
+      }
+    }
+  }
+
+  // ===================== Generic Flight Methods =====================
+
+  @Override
+  public void listFlights(
+      CallContext context, Criteria criteria, StreamListener<FlightInfo> 
listener) {
+    listener.onCompleted();
+  }
+
+  // ===================== Prepared Statements =====================
+
+  @Override
+  public void createPreparedStatement(
+      FlightSql.ActionCreatePreparedStatementRequest request,
+      CallContext context,
+      StreamListener<Result> listener) {
+    listener.onError(
+        CallStatus.UNIMPLEMENTED
+            .withDescription("Prepared statements are not yet supported")
+            .toRuntimeException());
+  }
+
+  @Override
+  public void closePreparedStatement(
+      FlightSql.ActionClosePreparedStatementRequest request,
+      CallContext context,
+      StreamListener<Result> listener) {
+    listener.onError(
+        CallStatus.UNIMPLEMENTED
+            .withDescription("Prepared statements are not yet supported")
+            .toRuntimeException());
+  }
+
+  @Override
+  public FlightInfo getFlightInfoPreparedStatement(
+      FlightSql.CommandPreparedStatementQuery command,
+      CallContext context,
+      FlightDescriptor descriptor) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("Prepared statements are not yet supported")
+        .toRuntimeException();
+  }
+
+  @Override
+  public SchemaResult getSchemaPreparedStatement(
+      FlightSql.CommandPreparedStatementQuery command,
+      CallContext context,
+      FlightDescriptor descriptor) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("Prepared statements are not yet supported")
+        .toRuntimeException();
+  }
+
+  @Override
+  public void getStreamPreparedStatement(
+      FlightSql.CommandPreparedStatementQuery command,
+      CallContext context,
+      ServerStreamListener listener) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("Prepared statements are not yet supported")
+        .toRuntimeException();
+  }
+
+  // ===================== DML/Update =====================
+
+  @Override
+  public Runnable acceptPutStatement(
+      FlightSql.CommandStatementUpdate command,
+      CallContext context,
+      FlightStream flightStream,
+      StreamListener<PutResult> ackStream) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("Statement updates are not yet supported")
+        .toRuntimeException();
+  }
+
+  @Override
+  public Runnable acceptPutPreparedStatementUpdate(
+      FlightSql.CommandPreparedStatementUpdate command,
+      CallContext context,
+      FlightStream flightStream,
+      StreamListener<PutResult> ackStream) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("Prepared statements are not yet supported")
+        .toRuntimeException();
+  }
+
+  @Override
+  public Runnable acceptPutPreparedStatementQuery(
+      FlightSql.CommandPreparedStatementQuery command,
+      CallContext context,
+      FlightStream flightStream,
+      StreamListener<PutResult> ackStream) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("Prepared statements are not yet supported")
+        .toRuntimeException();
+  }
+
+  // ===================== Catalog/Schema/Table Metadata =====================
+
+  @Override
+  public FlightInfo getFlightInfoSqlInfo(
+      FlightSql.CommandGetSqlInfo command, CallContext context, 
FlightDescriptor descriptor) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("GetSqlInfo not yet implemented")
+        .toRuntimeException();
+  }
+
+  @Override
+  public void getStreamSqlInfo(
+      FlightSql.CommandGetSqlInfo command, CallContext context, 
ServerStreamListener listener) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("GetSqlInfo not yet implemented")
+        .toRuntimeException();
+  }
+
+  @Override
+  public FlightInfo getFlightInfoTypeInfo(
+      FlightSql.CommandGetXdbcTypeInfo command, CallContext context, 
FlightDescriptor descriptor) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("GetTypeInfo not yet implemented")
+        .toRuntimeException();
+  }
+
+  @Override
+  public void getStreamTypeInfo(
+      FlightSql.CommandGetXdbcTypeInfo command,
+      CallContext context,
+      ServerStreamListener listener) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("GetTypeInfo not yet implemented")
+        .toRuntimeException();
+  }
+
+  @Override
+  public FlightInfo getFlightInfoCatalogs(
+      FlightSql.CommandGetCatalogs command, CallContext context, 
FlightDescriptor descriptor) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("GetCatalogs not yet implemented")
+        .toRuntimeException();
+  }
+
+  @Override
+  public void getStreamCatalogs(CallContext context, ServerStreamListener 
listener) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("GetCatalogs not yet implemented")
+        .toRuntimeException();
+  }
+
+  @Override
+  public FlightInfo getFlightInfoSchemas(
+      FlightSql.CommandGetDbSchemas command, CallContext context, 
FlightDescriptor descriptor) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("GetSchemas not yet implemented")
+        .toRuntimeException();
+  }
+
+  @Override
+  public void getStreamSchemas(
+      FlightSql.CommandGetDbSchemas command, CallContext context, 
ServerStreamListener listener) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("GetSchemas not yet implemented")
+        .toRuntimeException();
+  }
+
+  @Override
+  public FlightInfo getFlightInfoTables(
+      FlightSql.CommandGetTables command, CallContext context, 
FlightDescriptor descriptor) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("GetTables not yet implemented")
+        .toRuntimeException();
+  }
+
+  @Override
+  public void getStreamTables(
+      FlightSql.CommandGetTables command, CallContext context, 
ServerStreamListener listener) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("GetTables not yet implemented")
+        .toRuntimeException();
+  }
+
+  @Override
+  public FlightInfo getFlightInfoTableTypes(
+      FlightSql.CommandGetTableTypes command, CallContext context, 
FlightDescriptor descriptor) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("GetTableTypes not yet implemented")
+        .toRuntimeException();
+  }
+
+  @Override
+  public void getStreamTableTypes(CallContext context, ServerStreamListener 
listener) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("GetTableTypes not yet implemented")
+        .toRuntimeException();
+  }
+
+  // ===================== Keys =====================
+
+  @Override
+  public FlightInfo getFlightInfoPrimaryKeys(
+      FlightSql.CommandGetPrimaryKeys command, CallContext context, 
FlightDescriptor descriptor) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("GetPrimaryKeys not yet implemented")
+        .toRuntimeException();
+  }
+
+  @Override
+  public void getStreamPrimaryKeys(
+      FlightSql.CommandGetPrimaryKeys command, CallContext context, 
ServerStreamListener listener) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("GetPrimaryKeys not yet implemented")
+        .toRuntimeException();
+  }
+
+  @Override
+  public FlightInfo getFlightInfoExportedKeys(
+      FlightSql.CommandGetExportedKeys command, CallContext context, 
FlightDescriptor descriptor) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("GetExportedKeys not yet implemented")
+        .toRuntimeException();
+  }
+
+  @Override
+  public void getStreamExportedKeys(
+      FlightSql.CommandGetExportedKeys command,
+      CallContext context,
+      ServerStreamListener listener) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("GetExportedKeys not yet implemented")
+        .toRuntimeException();
+  }
+
+  @Override
+  public FlightInfo getFlightInfoImportedKeys(
+      FlightSql.CommandGetImportedKeys command, CallContext context, 
FlightDescriptor descriptor) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("GetImportedKeys not yet implemented")
+        .toRuntimeException();
+  }
+
+  @Override
+  public void getStreamImportedKeys(
+      FlightSql.CommandGetImportedKeys command,
+      CallContext context,
+      ServerStreamListener listener) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("GetImportedKeys not yet implemented")
+        .toRuntimeException();
+  }
+
+  @Override
+  public FlightInfo getFlightInfoCrossReference(
+      FlightSql.CommandGetCrossReference command,
+      CallContext context,
+      FlightDescriptor descriptor) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("GetCrossReference not yet implemented")
+        .toRuntimeException();
+  }
+
+  @Override
+  public void getStreamCrossReference(
+      FlightSql.CommandGetCrossReference command,
+      CallContext context,
+      ServerStreamListener listener) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("GetCrossReference not yet implemented")
+        .toRuntimeException();
+  }
+
+  // ===================== Lifecycle =====================
+
+  @Override
+  public void close() throws Exception {
+    // Clean up all active queries
+    for (Long queryId : activeQueries.keySet()) {
+      try {
+        coordinator.cleanupQueryExecution(queryId);
+      } catch (Exception e) {
+        LOGGER.warn("Error cleaning up query {} during shutdown", queryId, e);
+      }
+    }
+    activeQueries.clear();
+  }
+
+  // ===================== Inner Classes =====================
+
+  /** Holds query execution context for streaming results. */
+  private static class QueryContext {
+
+    final IQueryExecution queryExecution;
+    final DatasetHeader header;
+    final IClientSession session;
+
+    QueryContext(IQueryExecution queryExecution, DatasetHeader header, 
IClientSession session) {
+      this.queryExecution = queryExecution;
+      this.header = header;
+      this.session = session;
+    }
+  }
+}
diff --git 
a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/TsBlockToArrowConverter.java
 
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/TsBlockToArrowConverter.java
new file mode 100644
index 00000000000..396f230d0b9
--- /dev/null
+++ 
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/TsBlockToArrowConverter.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flight;
+
+import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeStampMilliTZVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** Converts IoTDB TsBlock data to Apache Arrow VectorSchemaRoot format. */
+public class TsBlockToArrowConverter {
+
+  private TsBlockToArrowConverter() {
+    // utility class
+  }
+
+  /** Maps IoTDB TSDataType to Apache Arrow ArrowType. */
+  public static ArrowType toArrowType(TSDataType tsDataType) {
+    switch (tsDataType) {
+      case BOOLEAN:
+        return ArrowType.Bool.INSTANCE;
+      case INT32:
+        return new ArrowType.Int(32, true);
+      case INT64:
+        return new ArrowType.Int(64, true);
+      case FLOAT:
+        return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
+      case DOUBLE:
+        return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
+      case TEXT:
+      case STRING:
+        return ArrowType.Utf8.INSTANCE;
+      case BLOB:
+        return ArrowType.Binary.INSTANCE;
+      case TIMESTAMP:
+        return new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC");
+      case DATE:
+        return new ArrowType.Date(DateUnit.DAY);
+      default:
+        throw new IllegalArgumentException("Unsupported TSDataType: " + 
tsDataType);
+    }
+  }
+
+  /** Builds an Arrow Schema from an IoTDB DatasetHeader. */
+  public static Schema toArrowSchema(DatasetHeader header) {
+    List<String> columnNames = header.getRespColumns();
+    List<TSDataType> dataTypes = header.getRespDataTypes();
+    List<Field> fields = new ArrayList<>(columnNames.size());
+    for (int i = 0; i < columnNames.size(); i++) {
+      ArrowType arrowType = toArrowType(dataTypes.get(i));
+      fields.add(new Field(columnNames.get(i), new FieldType(true, arrowType, 
null), null));
+    }
+    return new Schema(fields);
+  }
+
+  /** Creates a new VectorSchemaRoot from an IoTDB DatasetHeader. */
+  public static VectorSchemaRoot createVectorSchemaRoot(
+      DatasetHeader header, BufferAllocator allocator) {
+    Schema arrowSchema = toArrowSchema(header);
+    return VectorSchemaRoot.create(arrowSchema, allocator);
+  }
+
+  /**
+   * Fills an existing VectorSchemaRoot with data from a TsBlock.
+   *
+   * @param root the VectorSchemaRoot to fill (cleared before filling)
+   * @param tsBlock the TsBlock containing the data
+   * @param header the DatasetHeader for column name to index mapping
+   */
+  public static void fillVectorSchemaRoot(
+      VectorSchemaRoot root, TsBlock tsBlock, DatasetHeader header) {
+
+    int rowCount = tsBlock.getPositionCount();
+    List<String> columnNames = header.getRespColumns();
+    List<TSDataType> dataTypes = header.getRespDataTypes();
+    Map<String, Integer> headerMap = header.getColumnNameIndexMap();
+
+    root.allocateNew();
+
+    for (int colIdx = 0; colIdx < columnNames.size(); colIdx++) {
+      String colName = columnNames.get(colIdx);
+      Integer sourceIdx = headerMap.get(colName);
+      Column column = tsBlock.getColumn(sourceIdx);
+      TSDataType dataType = dataTypes.get(colIdx);
+      FieldVector fieldVector = root.getVector(colIdx);
+
+      fillColumnVector(fieldVector, column, dataType, rowCount);
+    }
+
+    root.setRowCount(rowCount);
+  }
+
+  /** Fills an Arrow FieldVector from an IoTDB Column. */
+  private static void fillColumnVector(
+      FieldVector fieldVector, Column column, TSDataType dataType, int 
rowCount) {
+    switch (dataType) {
+      case BOOLEAN:
+        fillBooleanVector((BitVector) fieldVector, column, rowCount);
+        break;
+      case INT32:
+        fillInt32Vector((IntVector) fieldVector, column, rowCount);
+        break;
+      case INT64:
+        fillInt64Vector((BigIntVector) fieldVector, column, rowCount);
+        break;
+      case FLOAT:
+        fillFloatVector((Float4Vector) fieldVector, column, rowCount);
+        break;
+      case DOUBLE:
+        fillDoubleVector((Float8Vector) fieldVector, column, rowCount);
+        break;
+      case TEXT:
+      case STRING:
+        fillTextVector((VarCharVector) fieldVector, column, rowCount);
+        break;
+      case BLOB:
+        fillBlobVector((VarBinaryVector) fieldVector, column, rowCount);
+        break;
+      case TIMESTAMP:
+        fillTimestampVector((TimeStampMilliTZVector) fieldVector, column, 
rowCount);
+        break;
+      case DATE:
+        fillDateVector((DateDayVector) fieldVector, column, rowCount);
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported TSDataType: " + 
dataType);
+    }
+  }
+
+  private static void fillBooleanVector(BitVector vector, Column column, int 
rowCount) {
+    for (int i = 0; i < rowCount; i++) {
+      if (column.isNull(i)) {
+        vector.setNull(i);
+      } else {
+        vector.setSafe(i, column.getBoolean(i) ? 1 : 0);
+      }
+    }
+    vector.setValueCount(rowCount);
+  }
+
+  private static void fillInt32Vector(IntVector vector, Column column, int 
rowCount) {
+    for (int i = 0; i < rowCount; i++) {
+      if (column.isNull(i)) {
+        vector.setNull(i);
+      } else {
+        vector.setSafe(i, column.getInt(i));
+      }
+    }
+    vector.setValueCount(rowCount);
+  }
+
+  private static void fillInt64Vector(BigIntVector vector, Column column, int 
rowCount) {
+    for (int i = 0; i < rowCount; i++) {
+      if (column.isNull(i)) {
+        vector.setNull(i);
+      } else {
+        vector.setSafe(i, column.getLong(i));
+      }
+    }
+    vector.setValueCount(rowCount);
+  }
+
+  private static void fillFloatVector(Float4Vector vector, Column column, int 
rowCount) {
+    for (int i = 0; i < rowCount; i++) {
+      if (column.isNull(i)) {
+        vector.setNull(i);
+      } else {
+        vector.setSafe(i, column.getFloat(i));
+      }
+    }
+    vector.setValueCount(rowCount);
+  }
+
+  private static void fillDoubleVector(Float8Vector vector, Column column, int 
rowCount) {
+    for (int i = 0; i < rowCount; i++) {
+      if (column.isNull(i)) {
+        vector.setNull(i);
+      } else {
+        vector.setSafe(i, column.getDouble(i));
+      }
+    }
+    vector.setValueCount(rowCount);
+  }
+
+  private static void fillTextVector(VarCharVector vector, Column column, int 
rowCount) {
+    for (int i = 0; i < rowCount; i++) {
+      if (column.isNull(i)) {
+        vector.setNull(i);
+      } else {
+        byte[] bytes =
+            column
+                .getBinary(i)
+                .getStringValue(TSFileConfig.STRING_CHARSET)
+                .getBytes(StandardCharsets.UTF_8);
+        vector.setSafe(i, bytes);
+      }
+    }
+    vector.setValueCount(rowCount);
+  }
+
+  private static void fillBlobVector(VarBinaryVector vector, Column column, 
int rowCount) {
+    for (int i = 0; i < rowCount; i++) {
+      if (column.isNull(i)) {
+        vector.setNull(i);
+      } else {
+        vector.setSafe(i, column.getBinary(i).getValues());
+      }
+    }
+    vector.setValueCount(rowCount);
+  }
+
+  private static void fillTimestampVector(
+      TimeStampMilliTZVector vector, Column column, int rowCount) {
+    for (int i = 0; i < rowCount; i++) {
+      if (column.isNull(i)) {
+        vector.setNull(i);
+      } else {
+        vector.setSafe(i, column.getLong(i));
+      }
+    }
+    vector.setValueCount(rowCount);
+  }
+
+  private static void fillDateVector(DateDayVector vector, Column column, int 
rowCount) {
+    for (int i = 0; i < rowCount; i++) {
+      if (column.isNull(i)) {
+        vector.setNull(i);
+      } else {
+        vector.setSafe(i, column.getInt(i));
+      }
+    }
+    vector.setValueCount(rowCount);
+  }
+}
diff --git 
a/external-service-impl/flight-sql/src/test/java/org/apache/iotdb/flight/TsBlockToArrowConverterTest.java
 
b/external-service-impl/flight-sql/src/test/java/org/apache/iotdb/flight/TsBlockToArrowConverterTest.java
new file mode 100644
index 00000000000..413e17e23b7
--- /dev/null
+++ 
b/external-service-impl/flight-sql/src/test/java/org/apache/iotdb/flight/TsBlockToArrowConverterTest.java
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.flight;
+
+import org.apache.iotdb.commons.schema.column.ColumnHeader;
+import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeStampMilliTZVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.read.common.block.column.TimeColumn;
+import org.apache.tsfile.utils.Binary;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/** Unit tests for TsBlockToArrowConverter. */
+public class TsBlockToArrowConverterTest {
+
+  private BufferAllocator allocator;
+
+  @Before
+  public void setUp() {
+    allocator = new RootAllocator(Long.MAX_VALUE);
+  }
+
+  @After
+  public void tearDown() {
+    allocator.close();
+  }
+
+  // ===================== toArrowType Tests =====================
+
+  @Test
+  public void testToArrowTypeMappings() {
+    assertTrue(TsBlockToArrowConverter.toArrowType(TSDataType.BOOLEAN) 
instanceof ArrowType.Bool);
+
+    ArrowType int32Type = 
TsBlockToArrowConverter.toArrowType(TSDataType.INT32);
+    assertTrue(int32Type instanceof ArrowType.Int);
+    assertEquals(32, ((ArrowType.Int) int32Type).getBitWidth());
+    assertTrue(((ArrowType.Int) int32Type).getIsSigned());
+
+    ArrowType int64Type = 
TsBlockToArrowConverter.toArrowType(TSDataType.INT64);
+    assertTrue(int64Type instanceof ArrowType.Int);
+    assertEquals(64, ((ArrowType.Int) int64Type).getBitWidth());
+    assertTrue(((ArrowType.Int) int64Type).getIsSigned());
+
+    ArrowType floatType = 
TsBlockToArrowConverter.toArrowType(TSDataType.FLOAT);
+    assertTrue(floatType instanceof ArrowType.FloatingPoint);
+    assertEquals(
+        FloatingPointPrecision.SINGLE, ((ArrowType.FloatingPoint) 
floatType).getPrecision());
+
+    ArrowType doubleType = 
TsBlockToArrowConverter.toArrowType(TSDataType.DOUBLE);
+    assertTrue(doubleType instanceof ArrowType.FloatingPoint);
+    assertEquals(
+        FloatingPointPrecision.DOUBLE, ((ArrowType.FloatingPoint) 
doubleType).getPrecision());
+
+    assertTrue(TsBlockToArrowConverter.toArrowType(TSDataType.TEXT) instanceof 
ArrowType.Utf8);
+    assertTrue(TsBlockToArrowConverter.toArrowType(TSDataType.STRING) 
instanceof ArrowType.Utf8);
+    assertTrue(TsBlockToArrowConverter.toArrowType(TSDataType.BLOB) instanceof 
ArrowType.Binary);
+
+    ArrowType tsType = 
TsBlockToArrowConverter.toArrowType(TSDataType.TIMESTAMP);
+    assertTrue(tsType instanceof ArrowType.Timestamp);
+    assertEquals(TimeUnit.MILLISECOND, ((ArrowType.Timestamp) 
tsType).getUnit());
+    assertEquals("UTC", ((ArrowType.Timestamp) tsType).getTimezone());
+
+    ArrowType dateType = TsBlockToArrowConverter.toArrowType(TSDataType.DATE);
+    assertTrue(dateType instanceof ArrowType.Date);
+    assertEquals(DateUnit.DAY, ((ArrowType.Date) dateType).getUnit());
+  }
+
+  // ===================== toArrowSchema Tests =====================
+
+  @Test
+  public void testToArrowSchema() {
+    DatasetHeader header = buildHeader("col_bool", TSDataType.BOOLEAN, 
"col_int", TSDataType.INT32);
+    Schema schema = TsBlockToArrowConverter.toArrowSchema(header);
+
+    assertEquals(2, schema.getFields().size());
+    assertEquals("col_bool", schema.getFields().get(0).getName());
+    assertTrue(schema.getFields().get(0).getType() instanceof ArrowType.Bool);
+    assertEquals("col_int", schema.getFields().get(1).getName());
+    assertTrue(schema.getFields().get(1).getType() instanceof ArrowType.Int);
+  }
+
+  // ===================== INT32 Conversion =====================
+
+  @Test
+  public void testFillInt32Values() {
+    DatasetHeader header = buildHeader("value", TSDataType.INT32);
+    TsBlock tsBlock = buildTsBlock(TSDataType.INT32, new Object[] {10, 20, 
30});
+
+    try (VectorSchemaRoot root =
+        TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+      TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+      assertEquals(3, root.getRowCount());
+      IntVector vector = (IntVector) root.getVector(0);
+      assertEquals(10, vector.get(0));
+      assertEquals(20, vector.get(1));
+      assertEquals(30, vector.get(2));
+    }
+  }
+
+  @Test
+  public void testFillInt32WithNulls() {
+    DatasetHeader header = buildHeader("value", TSDataType.INT32);
+    TsBlock tsBlock = buildTsBlock(TSDataType.INT32, new Object[] {10, null, 
30});
+
+    try (VectorSchemaRoot root =
+        TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+      TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+      assertEquals(3, root.getRowCount());
+      IntVector vector = (IntVector) root.getVector(0);
+      assertEquals(10, vector.get(0));
+      assertTrue(vector.isNull(1));
+      assertEquals(30, vector.get(2));
+    }
+  }
+
+  // ===================== INT64 Conversion =====================
+
+  @Test
+  public void testFillInt64Values() {
+    DatasetHeader header = buildHeader("value", TSDataType.INT64);
+    TsBlock tsBlock = buildTsBlock(TSDataType.INT64, new Object[] {100L, 200L, 
300L});
+
+    try (VectorSchemaRoot root =
+        TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+      TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+      assertEquals(3, root.getRowCount());
+      BigIntVector vector = (BigIntVector) root.getVector(0);
+      assertEquals(100L, vector.get(0));
+      assertEquals(200L, vector.get(1));
+      assertEquals(300L, vector.get(2));
+    }
+  }
+
+  // ===================== FLOAT Conversion =====================
+
+  @Test
+  public void testFillFloatValues() {
+    DatasetHeader header = buildHeader("value", TSDataType.FLOAT);
+    TsBlock tsBlock = buildTsBlock(TSDataType.FLOAT, new Object[] {1.1f, 2.2f, 
3.3f});
+
+    try (VectorSchemaRoot root =
+        TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+      TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+      assertEquals(3, root.getRowCount());
+      Float4Vector vector = (Float4Vector) root.getVector(0);
+      assertEquals(1.1f, vector.get(0), 0.001f);
+      assertEquals(2.2f, vector.get(1), 0.001f);
+      assertEquals(3.3f, vector.get(2), 0.001f);
+    }
+  }
+
+  // ===================== DOUBLE Conversion =====================
+
+  @Test
+  public void testFillDoubleValues() {
+    DatasetHeader header = buildHeader("value", TSDataType.DOUBLE);
+    TsBlock tsBlock = buildTsBlock(TSDataType.DOUBLE, new Object[] {1.11, 
2.22, 3.33});
+
+    try (VectorSchemaRoot root =
+        TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+      TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+      assertEquals(3, root.getRowCount());
+      Float8Vector vector = (Float8Vector) root.getVector(0);
+      assertEquals(1.11, vector.get(0), 0.0001);
+      assertEquals(2.22, vector.get(1), 0.0001);
+      assertEquals(3.33, vector.get(2), 0.0001);
+    }
+  }
+
+  // ===================== BOOLEAN Conversion =====================
+
+  @Test
+  public void testFillBooleanValues() {
+    DatasetHeader header = buildHeader("value", TSDataType.BOOLEAN);
+    TsBlock tsBlock = buildTsBlock(TSDataType.BOOLEAN, new Object[] {true, 
false, true});
+
+    try (VectorSchemaRoot root =
+        TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+      TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+      assertEquals(3, root.getRowCount());
+      BitVector vector = (BitVector) root.getVector(0);
+      assertEquals(1, vector.get(0));
+      assertEquals(0, vector.get(1));
+      assertEquals(1, vector.get(2));
+    }
+  }
+
+  @Test
+  public void testFillBooleanWithNulls() {
+    DatasetHeader header = buildHeader("value", TSDataType.BOOLEAN);
+    TsBlock tsBlock = buildTsBlock(TSDataType.BOOLEAN, new Object[] {true, 
null, false});
+
+    try (VectorSchemaRoot root =
+        TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+      TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+      assertEquals(3, root.getRowCount());
+      BitVector vector = (BitVector) root.getVector(0);
+      assertEquals(1, vector.get(0));
+      assertTrue(vector.isNull(1));
+      assertEquals(0, vector.get(2));
+    }
+  }
+
+  // ===================== TEXT/STRING Conversion =====================
+
+  @Test
+  public void testFillTextValues() {
+    DatasetHeader header = buildHeader("value", TSDataType.TEXT);
+    TsBlock tsBlock =
+        buildTsBlock(
+            TSDataType.TEXT,
+            new Object[] {
+              new Binary("hello", StandardCharsets.UTF_8),
+              new Binary("world", StandardCharsets.UTF_8),
+              new Binary("IoTDB", StandardCharsets.UTF_8)
+            });
+
+    try (VectorSchemaRoot root =
+        TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+      TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+      assertEquals(3, root.getRowCount());
+      VarCharVector vector = (VarCharVector) root.getVector(0);
+      assertEquals("hello", new String(vector.get(0), StandardCharsets.UTF_8));
+      assertEquals("world", new String(vector.get(1), StandardCharsets.UTF_8));
+      assertEquals("IoTDB", new String(vector.get(2), StandardCharsets.UTF_8));
+    }
+  }
+
+  @Test
+  public void testFillTextWithNulls() {
+    DatasetHeader header = buildHeader("value", TSDataType.TEXT);
+    TsBlock tsBlock =
+        buildTsBlock(
+            TSDataType.TEXT,
+            new Object[] {new Binary("hello", StandardCharsets.UTF_8), null, 
null});
+
+    try (VectorSchemaRoot root =
+        TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+      TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+      assertEquals(3, root.getRowCount());
+      VarCharVector vector = (VarCharVector) root.getVector(0);
+      assertEquals("hello", new String(vector.get(0), StandardCharsets.UTF_8));
+      assertTrue(vector.isNull(1));
+      assertTrue(vector.isNull(2));
+    }
+  }
+
+  // ===================== BLOB Conversion =====================
+
+  @Test
+  public void testFillBlobValues() {
+    DatasetHeader header = buildHeader("value", TSDataType.BLOB);
+    byte[] bytes1 = {0x01, 0x02, 0x03};
+    byte[] bytes2 = {0x04, 0x05};
+    TsBlock tsBlock =
+        buildTsBlock(TSDataType.BLOB, new Object[] {new Binary(bytes1), new 
Binary(bytes2)});
+
+    try (VectorSchemaRoot root =
+        TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+      TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+      assertEquals(2, root.getRowCount());
+      VarBinaryVector vector = (VarBinaryVector) root.getVector(0);
+      assertArrayEquals(bytes1, vector.get(0));
+      assertArrayEquals(bytes2, vector.get(1));
+    }
+  }
+
+  // ===================== TIMESTAMP Conversion =====================
+
+  @Test
+  public void testFillTimestampValues() {
+    DatasetHeader header = buildHeader("value", TSDataType.TIMESTAMP);
+    TsBlock tsBlock =
+        buildTsBlock(
+            TSDataType.TIMESTAMP, new Object[] {1609459200000L, 
1609459260000L, 1609459320000L});
+
+    try (VectorSchemaRoot root =
+        TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+      TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+      assertEquals(3, root.getRowCount());
+      TimeStampMilliTZVector vector = (TimeStampMilliTZVector) 
root.getVector(0);
+      assertEquals(1609459200000L, vector.get(0));
+      assertEquals(1609459260000L, vector.get(1));
+      assertEquals(1609459320000L, vector.get(2));
+    }
+  }
+
+  // ===================== DATE Conversion =====================
+
+  @Test
+  public void testFillDateValues() {
+    DatasetHeader header = buildHeader("value", TSDataType.DATE);
+    // Epoch days: 2021-01-01 = 18628
+    TsBlock tsBlock = buildTsBlock(TSDataType.DATE, new Object[] {18628, 
18629, 18630});
+
+    try (VectorSchemaRoot root =
+        TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+      TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+      assertEquals(3, root.getRowCount());
+      DateDayVector vector = (DateDayVector) root.getVector(0);
+      assertEquals(18628, vector.get(0));
+      assertEquals(18629, vector.get(1));
+      assertEquals(18630, vector.get(2));
+    }
+  }
+
+  // ===================== Multi-Column Tests =====================
+
+  @Test
+  public void testFillMultipleColumns() {
+    List<ColumnHeader> headers = new ArrayList<>();
+    headers.add(new ColumnHeader("int_col", TSDataType.INT32));
+    headers.add(new ColumnHeader("double_col", TSDataType.DOUBLE));
+    headers.add(new ColumnHeader("text_col", TSDataType.TEXT));
+    DatasetHeader header = new DatasetHeader(headers, true);
+    List<String> colNames = header.getRespColumns();
+    header.setTreeColumnToTsBlockIndexMap(colNames);
+
+    List<TSDataType> types = Arrays.asList(TSDataType.INT32, 
TSDataType.DOUBLE, TSDataType.TEXT);
+    TsBlockBuilder builder = new TsBlockBuilder(types);
+    ColumnBuilder[] cols = builder.getValueColumnBuilders();
+
+    // Row 1
+    cols[0].writeInt(42);
+    cols[1].writeDouble(3.14);
+    cols[2].writeBinary(new Binary("test", StandardCharsets.UTF_8));
+    builder.declarePosition();
+
+    // Row 2
+    cols[0].writeInt(100);
+    cols[1].writeDouble(2.71);
+    cols[2].writeBinary(new Binary("hello", StandardCharsets.UTF_8));
+    builder.declarePosition();
+
+    TsBlock tsBlock =
+        builder.build(
+            new RunLengthEncodedColumn(
+                new TimeColumn(1, new long[] {0L}), 
builder.getPositionCount()));
+
+    try (VectorSchemaRoot root =
+        TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+      TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+
+      assertEquals(2, root.getRowCount());
+
+      IntVector intVec = (IntVector) root.getVector("int_col");
+      assertEquals(42, intVec.get(0));
+      assertEquals(100, intVec.get(1));
+
+      Float8Vector doubleVec = (Float8Vector) root.getVector("double_col");
+      assertEquals(3.14, doubleVec.get(0), 0.001);
+      assertEquals(2.71, doubleVec.get(1), 0.001);
+
+      VarCharVector textVec = (VarCharVector) root.getVector("text_col");
+      assertEquals("test", new String(textVec.get(0), StandardCharsets.UTF_8));
+      assertEquals("hello", new String(textVec.get(1), 
StandardCharsets.UTF_8));
+    }
+  }
+
+  // ===================== Empty TsBlock =====================
+
+  @Test
+  public void testFillEmptyTsBlock() {
+    DatasetHeader header = buildHeader("value", TSDataType.INT32);
+    TsBlock tsBlock = buildTsBlock(TSDataType.INT32, new Object[] {});
+
+    try (VectorSchemaRoot root =
+        TsBlockToArrowConverter.createVectorSchemaRoot(header, allocator)) {
+      TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock, header);
+      assertEquals(0, root.getRowCount());
+    }
+  }
+
+  // ===================== Helper Methods =====================
+
+  /** Build a simple DatasetHeader with one or more columns (name, type 
pairs). */
+  private DatasetHeader buildHeader(Object... nameTypePairs) {
+    List<ColumnHeader> columnHeaders = new ArrayList<>();
+    for (int i = 0; i < nameTypePairs.length; i += 2) {
+      columnHeaders.add(
+          new ColumnHeader((String) nameTypePairs[i], (TSDataType) 
nameTypePairs[i + 1]));
+    }
+    DatasetHeader header = new DatasetHeader(columnHeaders, true);
+    List<String> colNames = header.getRespColumns();
+    header.setTreeColumnToTsBlockIndexMap(colNames);
+    return header;
+  }
+
+  /**
+   * Build a single-column TsBlock with the given data type and values. Null 
values in the array
+   * produce null entries in the TsBlock.
+   */
+  private TsBlock buildTsBlock(TSDataType type, Object[] values) {
+    TsBlockBuilder builder = new 
TsBlockBuilder(java.util.Collections.singletonList(type));
+    ColumnBuilder[] cols = builder.getValueColumnBuilders();
+
+    for (Object value : values) {
+      if (value == null) {
+        cols[0].appendNull();
+      } else {
+        switch (type) {
+          case BOOLEAN:
+            cols[0].writeBoolean((Boolean) value);
+            break;
+          case INT32:
+          case DATE:
+            cols[0].writeInt((Integer) value);
+            break;
+          case INT64:
+          case TIMESTAMP:
+            cols[0].writeLong((Long) value);
+            break;
+          case FLOAT:
+            cols[0].writeFloat((Float) value);
+            break;
+          case DOUBLE:
+            cols[0].writeDouble((Double) value);
+            break;
+          case TEXT:
+          case STRING:
+          case BLOB:
+            cols[0].writeBinary((Binary) value);
+            break;
+          default:
+            throw new IllegalArgumentException("Unsupported type: " + type);
+        }
+      }
+      builder.declarePosition();
+    }
+
+    return builder.build(
+        new RunLengthEncodedColumn(new TimeColumn(1, new long[] {0L}), 
builder.getPositionCount()));
+  }
+}
diff --git a/external-service-impl/pom.xml b/external-service-impl/pom.xml
index 04483b289f3..511a58f2bef 100644
--- a/external-service-impl/pom.xml
+++ b/external-service-impl/pom.xml
@@ -33,6 +33,7 @@
         <module>mqtt</module>
         <module>rest</module>
         <module>rest-openapi</module>
+        <module>flight-sql</module>
     </modules>
     <properties>
         <maven.compiler.source>8</maven.compiler.source>
diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index a4776a57c55..4ef730957e4 100644
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -246,6 +246,16 @@
             <!--We will integrate rest-jar-with-dependencies into lib by 
assembly plugin-->
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.arrow</groupId>
+            <artifactId>flight-sql</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.arrow</groupId>
+            <artifactId>arrow-memory-netty</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
index 82ed7976959..114556c8b68 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
@@ -170,6 +170,7 @@ public class ClusterConstant {
   public static final String PIPE_LIB_DIR = "pipe_lib_dir";
   public static final String REST_SERVICE_PORT = "rest_service_port";
   public static final String INFLUXDB_RPC_PORT = "influxdb_rpc_port";
+  public static final String ARROW_FLIGHT_SQL_PORT = "arrow_flight_sql_port";
 
   // ConfigNode
   public static final String CN_SYSTEM_DIR = "cn_system_dir";
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 53aa395491b..d043cc4fbb2 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -315,6 +315,12 @@ public class MppCommonConfig extends MppBaseConfig 
implements CommonConfig {
     return this;
   }
 
+  @Override
+  public CommonConfig setEnableArrowFlightSqlService(boolean 
enableArrowFlightSqlService) {
+    setProperty("enable_arrow_flight_sql_service", 
String.valueOf(enableArrowFlightSqlService));
+    return this;
+  }
+
   @Override
   public CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
     setProperty("mqtt_payload_formatter", 
String.valueOf(mqttPayloadFormatter));
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
index 5e418072a7d..8fcb242ca12 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
@@ -102,6 +102,12 @@ public class MppDataNodeConfig extends MppBaseConfig 
implements DataNodeConfig {
     return this;
   }
 
+  @Override
+  public DataNodeConfig setEnableArrowFlightSqlService(boolean 
enableArrowFlightSqlService) {
+    setProperty("enable_arrow_flight_sql_service", 
String.valueOf(enableArrowFlightSqlService));
+    return this;
+  }
+
   @Override
   public DataNodeConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
     setProperty("mqtt_payload_formatter", 
String.valueOf(mqttPayloadFormatter));
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index ed8a8943303..b73b55510c0 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -316,6 +316,13 @@ public class MppSharedCommonConfig implements CommonConfig 
{
     return this;
   }
 
+  @Override
+  public CommonConfig setEnableArrowFlightSqlService(boolean 
enableArrowFlightSqlService) {
+    cnConfig.setEnableArrowFlightSqlService(enableArrowFlightSqlService);
+    dnConfig.setEnableArrowFlightSqlService(enableArrowFlightSqlService);
+    return this;
+  }
+
   @Override
   public CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
     cnConfig.setMqttPayloadFormatter(mqttPayloadFormatter);
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 8308225a9af..d223b1e32ee 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -955,7 +955,8 @@ public abstract class AbstractEnv implements BaseEnv {
   }
 
   // use this to avoid some runtimeExceptions when try to get jdbc connections.
-  // because it is hard to add retry and handle exception when getting jdbc 
connections in
+  // because it is hard to add retry and handle exception when getting jdbc
+  // connections in
   // getWriteConnectionWithSpecifiedDataNode and getReadConnections.
   // so use this function to add retry when cluster is ready.
   // after retryCount times, if the jdbc can't connect, throw
@@ -1426,7 +1427,8 @@ public abstract class AbstractEnv implements BaseEnv {
           final String endpoint = nodes.get(j).getIpAndPortString();
           if (!nodeIds.containsKey(endpoint)) {
             // Node not exist
-            // Notice: Never modify this line, since the NodeLocation might be 
modified in IT
+            // Notice: Never modify this line, since the NodeLocation might be 
modified in
+            // IT
             errorMessages.add("The node " + nodes.get(j).getIpAndPortString() 
+ " is not found!");
             continue;
           }
@@ -1463,6 +1465,13 @@ public abstract class AbstractEnv implements BaseEnv {
         .getMqttPort();
   }
 
+  @Override
+  public int getArrowFlightSqlPort() {
+    return dataNodeWrapperList
+        .get(new 
Random(System.currentTimeMillis()).nextInt(dataNodeWrapperList.size()))
+        .getArrowFlightSqlPort();
+  }
+
   @Override
   public String getIP() {
     return dataNodeWrapperList.get(0).getIp();
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
index 96a0fbe27e0..c4111c0f5f6 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
+import static 
org.apache.iotdb.it.env.cluster.ClusterConstant.ARROW_FLIGHT_SQL_PORT;
 import static 
org.apache.iotdb.it.env.cluster.ClusterConstant.CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS;
 import static 
org.apache.iotdb.it.env.cluster.ClusterConstant.DATANODE_INIT_HEAP_SIZE;
 import static 
org.apache.iotdb.it.env.cluster.ClusterConstant.DATANODE_MAX_DIRECT_MEMORY_SIZE;
@@ -78,6 +79,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
   private final int dataRegionConsensusPort;
   private final int schemaRegionConsensusPort;
   private final int mqttPort;
+  private final int arrowFlightSqlPort;
   private final int restServicePort;
   private final int pipeAirGapReceiverPort;
 
@@ -103,6 +105,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
     this.schemaRegionConsensusPort = portList[4];
     this.mqttPort = portList[5];
     this.pipeAirGapReceiverPort = portList[6];
+    this.arrowFlightSqlPort = portList[7];
     this.restServicePort = portList[10] + 6000;
     this.defaultNodePropertiesFile =
         EnvUtils.getFilePathFromSysVar(DEFAULT_DATA_NODE_PROPERTIES, 
clusterIndex);
@@ -120,6 +123,8 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
         MQTT_DATA_PATH, getNodePath() + File.separator + "mqttData");
     immutableCommonProperties.setProperty(
         PIPE_AIR_GAP_RECEIVER_PORT, 
String.valueOf(this.pipeAirGapReceiverPort));
+    immutableCommonProperties.setProperty(
+        ARROW_FLIGHT_SQL_PORT, String.valueOf(this.arrowFlightSqlPort));
 
     immutableNodeProperties.setProperty(REST_SERVICE_PORT, 
String.valueOf(restServicePort));
 
@@ -304,6 +309,10 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
     return pipeAirGapReceiverPort;
   }
 
+  public int getArrowFlightSqlPort() {
+    return arrowFlightSqlPort;
+  }
+
   public int getRestServicePort() {
     return restServicePort;
   }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 0c05af17f6a..db1a713ee09 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -221,6 +221,11 @@ public class RemoteCommonConfig implements CommonConfig {
     return this;
   }
 
+  @Override
+  public CommonConfig setEnableArrowFlightSqlService(boolean 
enableArrowFlightSqlService) {
+    return this;
+  }
+
   @Override
   public CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
     return this;
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
index bba4c964f95..d4526ad2834 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
@@ -64,6 +64,11 @@ public class RemoteDataNodeConfig implements DataNodeConfig {
     return this;
   }
 
+  @Override
+  public DataNodeConfig setEnableArrowFlightSqlService(boolean 
enableArrowFlightSqlService) {
+    return this;
+  }
+
   @Override
   public DataNodeConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
     return this;
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
index 586eff60494..5ad15d43e44 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
@@ -480,6 +480,11 @@ public class RemoteServerEnv implements BaseEnv {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public int getArrowFlightSqlPort() {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   public String getIP() {
     return ip_addr;
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 8b32deb3ea8..43175676ab8 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -332,6 +332,8 @@ public interface BaseEnv {
 
   int getMqttPort();
 
+  int getArrowFlightSqlPort();
+
   String getIP();
 
   String getPort();
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 95ac239e89b..d5af1068ec9 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -102,6 +102,8 @@ public interface CommonConfig {
 
   CommonConfig setEnableMQTTService(boolean enableMQTTService);
 
+  CommonConfig setEnableArrowFlightSqlService(boolean 
enableArrowFlightSqlService);
+
   CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter);
 
   CommonConfig setSchemaEngineMode(String schemaEngineMode);
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
index d57015b1396..9cbdf393e8d 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
@@ -40,6 +40,8 @@ public interface DataNodeConfig {
 
   DataNodeConfig setEnableMQTTService(boolean enableMQTTService);
 
+  DataNodeConfig setEnableArrowFlightSqlService(boolean 
enableArrowFlightSqlService);
+
   DataNodeConfig setMqttPayloadFormatter(String mqttPayloadFormatter);
 
   DataNodeConfig setLoadLastCacheStrategy(String strategyName);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java
new file mode 100644
index 00000000000..d931ca77999
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.relational.it.flightsql;
+
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
+
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightEndpoint;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.flight.auth2.BasicAuthCredentialWriter;
+import org.apache.arrow.flight.auth2.ClientBearerHeaderHandler;
+import org.apache.arrow.flight.auth2.ClientIncomingAuthHeaderMiddleware;
+import org.apache.arrow.flight.grpc.CredentialCallOption;
+import org.apache.arrow.flight.sql.FlightSqlClient;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * Integration tests for Arrow Flight SQL service in IoTDB. Tests the 
end-to-end flow: client
+ * connects via Flight SQL protocol, authenticates, executes SQL queries, and 
receives
+ * Arrow-formatted results.
+ */
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBArrowFlightSqlIT {
+
+  private static final String DATABASE = "flightsql_test_db";
+  private static final String USER = "root";
+  private static final String PASSWORD = "root";
+
+  private BufferAllocator allocator;
+  private FlightClient flightClient;
+  private FlightSqlClient flightSqlClient;
+  private CredentialCallOption bearerToken;
+
+  @Before
+  public void setUp() throws Exception {
+    // Configure and start the cluster with Arrow Flight SQL enabled
+    BaseEnv baseEnv = EnvFactory.getEnv();
+    
baseEnv.getConfig().getDataNodeConfig().setEnableArrowFlightSqlService(true);
+    baseEnv.initClusterEnvironment();
+
+    // Get the Flight SQL port from the data node
+    int port = EnvFactory.getEnv().getArrowFlightSqlPort();
+
+    // Create Arrow allocator and Flight client with Bearer token auth 
middleware
+    allocator = new RootAllocator(Long.MAX_VALUE);
+    Location location = Location.forGrpcInsecure("127.0.0.1", port);
+
+    // The ClientIncomingAuthHeaderMiddleware captures the Bearer token from 
the
+    // auth handshake
+    ClientIncomingAuthHeaderMiddleware.Factory authFactory =
+        new ClientIncomingAuthHeaderMiddleware.Factory(new 
ClientBearerHeaderHandler());
+
+    flightClient = FlightClient.builder(allocator, 
location).intercept(authFactory).build();
+
+    // Authenticate: sends Basic credentials, server returns Bearer token
+    bearerToken = new CredentialCallOption(new BasicAuthCredentialWriter(USER, 
PASSWORD));
+
+    // Wrap in FlightSqlClient for Flight SQL protocol operations
+    flightSqlClient = new FlightSqlClient(flightClient);
+
+    // Use the standard session to create the test database and table with data
+    try (ITableSession session = 
EnvFactory.getEnv().getTableSessionConnection()) {
+      session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS " + 
DATABASE);
+    }
+    try (ITableSession session = 
EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE)) {
+      session.executeNonQueryStatement(
+          "CREATE TABLE test_table ("
+              + "id1 STRING TAG, "
+              + "s1 INT32 FIELD, "
+              + "s2 INT64 FIELD, "
+              + "s3 FLOAT FIELD, "
+              + "s4 DOUBLE FIELD, "
+              + "s5 BOOLEAN FIELD, "
+              + "s6 TEXT FIELD)");
+      session.executeNonQueryStatement(
+          "INSERT INTO test_table(time, id1, s1, s2, s3, s4, s5, s6) "
+              + "VALUES(1, 'device1', 100, 1000, 1.5, 2.5, true, 'hello')");
+      session.executeNonQueryStatement(
+          "INSERT INTO test_table(time, id1, s1, s2, s3, s4, s5, s6) "
+              + "VALUES(2, 'device1', 200, 2000, 3.5, 4.5, false, 'world')");
+      session.executeNonQueryStatement(
+          "INSERT INTO test_table(time, id1, s1, s2, s3, s4, s5, s6) "
+              + "VALUES(3, 'device2', 300, 3000, 5.5, 6.5, true, 'iotdb')");
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (flightSqlClient != null) {
+      try {
+        flightSqlClient.close();
+      } catch (Exception e) {
+        // ignore
+      }
+    }
+    if (flightClient != null) {
+      try {
+        flightClient.close();
+      } catch (Exception e) {
+        // ignore
+      }
+    }
+    if (allocator != null) {
+      allocator.close();
+    }
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testQueryWithAllDataTypes() throws Exception {
+    FlightInfo flightInfo =
+        flightSqlClient.execute(
+            "SELECT time, id1, s1, s2, s3, s4, s5, s6 FROM test_table ORDER BY 
time", bearerToken);
+
+    // Validate schema
+    Schema schema = flightInfo.getSchema();
+    assertNotNull("Schema should not be null", schema);
+    List<Field> fields = schema.getFields();
+    assertEquals("Should have 8 columns", 8, fields.size());
+
+    // Fetch all data
+    List<List<String>> rows = fetchAllRows(flightInfo);
+    assertEquals("Should have 3 rows", 3, rows.size());
+  }
+
+  @Test
+  public void testQueryWithFilter() throws Exception {
+    FlightInfo flightInfo =
+        flightSqlClient.execute(
+            "SELECT id1, s1 FROM test_table WHERE id1 = 'device1' ORDER BY 
time", bearerToken);
+
+    List<List<String>> rows = fetchAllRows(flightInfo);
+    assertEquals("Should have 2 rows for device1", 2, rows.size());
+  }
+
+  @Test
+  public void testQueryWithAggregation() throws Exception {
+    FlightInfo flightInfo =
+        flightSqlClient.execute(
+            "SELECT id1, COUNT(*) as cnt, SUM(s1) as s1_sum "
+                + "FROM test_table GROUP BY id1 ORDER BY id1",
+            bearerToken);
+
+    List<List<String>> rows = fetchAllRows(flightInfo);
+    assertEquals("Should have 2 groups", 2, rows.size());
+  }
+
+  @Test
+  public void testEmptyResult() throws Exception {
+    FlightInfo flightInfo =
+        flightSqlClient.execute("SELECT * FROM test_table WHERE id1 = 
'nonexistent'", bearerToken);
+
+    List<List<String>> rows = fetchAllRows(flightInfo);
+    assertEquals("Should have 0 rows", 0, rows.size());
+  }
+
+  @Test
+  public void testShowDatabases() throws Exception {
+    FlightInfo flightInfo = flightSqlClient.execute("SHOW DATABASES", 
bearerToken);
+
+    List<List<String>> rows = fetchAllRows(flightInfo);
+    assertTrue("Should have at least 1 database", rows.size() >= 1);
+
+    boolean found = false;
+    for (List<String> row : rows) {
+      for (String val : row) {
+        if (val.contains(DATABASE)) {
+          found = true;
+          break;
+        }
+      }
+    }
+    assertTrue("Should find test database " + DATABASE, found);
+  }
+
+  /**
+   * Fetches all rows from all endpoints in a FlightInfo. Each row is a list 
of string
+   * representations of the column values.
+   */
+  private List<List<String>> fetchAllRows(FlightInfo flightInfo) throws 
Exception {
+    List<List<String>> rows = new ArrayList<>();
+    for (FlightEndpoint endpoint : flightInfo.getEndpoints()) {
+      try (FlightStream stream = 
flightSqlClient.getStream(endpoint.getTicket(), bearerToken)) {
+        while (stream.next()) {
+          VectorSchemaRoot root = stream.getRoot();
+          int rowCount = root.getRowCount();
+          for (int i = 0; i < rowCount; i++) {
+            List<String> row = new ArrayList<>();
+            for (FieldVector vector : root.getFieldVectors()) {
+              Object value = vector.getObject(i);
+              row.add(value == null ? "null" : value.toString());
+            }
+            rows.add(row);
+          }
+        }
+      }
+    }
+    return rows;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 98c15a2d9bf..3fef5d79fe1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -95,7 +95,7 @@ public class IoTDBConfig {
   // e.g., a31+/$%#&[]{}3e4, "a.b", 'a.b'
   private static final String NODE_NAME_MATCHER = "([^\n\t]+)";
 
-  // e.g.,  .s1
+  // e.g., .s1
   private static final String PARTIAL_NODE_MATCHER = "[" + PATH_SEPARATOR + 
"]" + NODE_NAME_MATCHER;
 
   private static final String NODE_MATCHER =
@@ -106,6 +106,12 @@ public class IoTDBConfig {
   /** Whether to enable the mqtt service. */
   private boolean enableMQTTService = false;
 
+  /** Whether to enable the Arrow Flight SQL service. */
+  private boolean enableArrowFlightSqlService = false;
+
+  /** The Arrow Flight SQL service binding port. */
+  private int arrowFlightSqlPort = 8904;
+
   /** The mqtt service binding host. */
   private String mqttHost = "127.0.0.1";
 
@@ -703,14 +709,16 @@ public class IoTDBConfig {
   private int compactionMaxAlignedSeriesNumInOneBatch = 10;
 
   /*
-   * How many thread will be set up to perform continuous queries. When <= 0, 
use max(1, CPU core number / 2).
+   * How many thread will be set up to perform continuous queries. When <= 0, 
use
+   * max(1, CPU core number / 2).
    */
   private int continuousQueryThreadNum =
       Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
 
   /*
    * Minimum every interval to perform continuous query.
-   * The every interval of continuous query instances should not be lower than 
this limit.
+   * The every interval of continuous query instances should not be lower than
+   * this limit.
    */
   private long continuousQueryMinimumEveryInterval = 1000;
 
@@ -789,7 +797,8 @@ public class IoTDBConfig {
   private int tagAttributeFlushInterval = 1000;
 
   // In one insert (one device, one timestamp, multiple measurements),
-  // if enable partial insert, one measurement failure will not impact other 
measurements
+  // if enable partial insert, one measurement failure will not impact other
+  // measurements
   private boolean enablePartialInsert = true;
 
   private boolean enable13DataInsertAdapt = false;
@@ -1108,7 +1117,8 @@ public class IoTDBConfig {
   private int loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber = 4096;
   private int loadTsFileAnalyzeSchemaBatchFlushTableDeviceNumber = 4096; // 
For table model
   private long loadTsFileAnalyzeSchemaMemorySizeInBytes =
-      0L; // 0 means that the decision will be adaptive based on the number of 
sequences
+      0L; // 0 means that the decision will be adaptive based on the
+  // number of sequences
 
   private long loadTsFileTabletConversionBatchMemorySizeInBytes = 4096 * 1024;
 
@@ -1449,7 +1459,8 @@ public class IoTDBConfig {
     SystemMetrics.getInstance().setDiskDirs(diskDirs);
   }
 
-  // if IOTDB_DATA_HOME is not set, then we keep dataHomeDir prefix being the 
same with IOTDB_HOME
+  // if IOTDB_DATA_HOME is not set, then we keep dataHomeDir prefix being the 
same
+  // with IOTDB_HOME
   // In this way, we can keep consistent with v0.13.0~2.
   public static String addDataHomeDir(final String dir) {
     String dataHomeDir = System.getProperty(IoTDBConstant.IOTDB_DATA_HOME, 
null);
@@ -1518,7 +1529,8 @@ public class IoTDBConfig {
   public void setTierDataDirs(String[][] tierDataDirs) {
     formulateDataDirs(tierDataDirs);
     this.tierDataDirs = tierDataDirs;
-    // TODO(szywilliam): rewrite the logic here when ratis supports complete 
snapshot semantic
+    // TODO(szywilliam): rewrite the logic here when ratis supports complete
+    // snapshot semantic
     setRatisDataRegionSnapshotDir(
         tierDataDirs[0][0] + File.separator + 
IoTDBConstant.SNAPSHOT_FOLDER_NAME);
   }
@@ -1730,7 +1742,8 @@ public class IoTDBConfig {
   public void checkMultiDirStrategyClassName() {
     confirmMultiDirStrategy();
     for (String multiDirStrategy : CLUSTER_ALLOWED_MULTI_DIR_STRATEGIES) {
-      // If the multiDirStrategyClassName is one of cluster allowed strategy, 
the check is passed.
+      // If the multiDirStrategyClassName is one of cluster allowed strategy, 
the
+      // check is passed.
       if (multiDirStrategyClassName.equals(multiDirStrategy)
           || multiDirStrategyClassName.equals(MULTI_DIR_STRATEGY_PREFIX + 
multiDirStrategy)) {
         return;
@@ -2535,6 +2548,22 @@ public class IoTDBConfig {
     this.enableMQTTService = enableMQTTService;
   }
 
+  public boolean isEnableArrowFlightSqlService() {
+    return enableArrowFlightSqlService;
+  }
+
+  public void setEnableArrowFlightSqlService(boolean 
enableArrowFlightSqlService) {
+    this.enableArrowFlightSqlService = enableArrowFlightSqlService;
+  }
+
+  public int getArrowFlightSqlPort() {
+    return arrowFlightSqlPort;
+  }
+
+  public void setArrowFlightSqlPort(int arrowFlightSqlPort) {
+    this.arrowFlightSqlPort = arrowFlightSqlPort;
+  }
+
   public String getMqttHost() {
     return mqttHost;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java
index 157734aef56..820a5b75822 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java
@@ -32,7 +32,11 @@ public enum BuiltinExternalServices {
   REST(
       "REST",
       "org.apache.iotdb.rest.RestService",
-      
IoTDBRestServiceDescriptor.getInstance().getConfig()::isEnableRestService);
+      
IoTDBRestServiceDescriptor.getInstance().getConfig()::isEnableRestService),
+  FLIGHT_SQL(
+      "FLIGHT_SQL",
+      "org.apache.iotdb.flight.FlightSqlService",
+      
IoTDBDescriptor.getInstance().getConfig()::isEnableArrowFlightSqlService);
 
   private final String serviceName;
   private final String className;
diff --git a/pom.xml b/pom.xml
index 99ba9725ca6..334b89200b5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -174,6 +174,7 @@
         <xz.version>1.9</xz.version>
         <zstd-jni.version>1.5.6-3</zstd-jni.version>
         <tsfile.version>2.2.1-260206-SNAPSHOT</tsfile.version>
+        <arrow.version>17.0.0</arrow.version>
     </properties>
     <!--
     if we claim dependencies in dependencyManagement, then we do not claim
@@ -748,6 +749,16 @@
                 <artifactId>jackson-core</artifactId>
                 <version>${jackson.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.arrow</groupId>
+                <artifactId>flight-sql</artifactId>
+                <version>${arrow.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.arrow</groupId>
+                <artifactId>arrow-memory-netty</artifactId>
+                <version>${arrow.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
     <build>

Reply via email to