This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ai-code/flight-sql
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ai-code/flight-sql by this
push:
new 0bded83cf4d Add Flight SQL to distribution packaging, Java/Python
examples and tests
0bded83cf4d is described below
commit 0bded83cf4d56e21edaf400d6373f7260e3342a3
Author: JackieTien97 <[email protected]>
AuthorDate: Mon Feb 23 16:48:56 2026 +0800
Add Flight SQL to distribution packaging, Java/Python examples and tests
- Distribution: include flight-sql jar-with-dependencies in all.xml,
datanode.xml, and external-service-impl.xml assembly descriptors
- Example: add flight-sql-example module with three Java examples:
- FlightSqlExample: native Arrow Flight SQL client API
- FlightSqlJdbcExample: JDBC via ArrowFlightJdbcDriver
- FlightSqlAdbcExample: ADBC via FlightSqlDriver
- Python: add integration tests using adbc_driver_flightsql in
iotdb-client/client-py/tests/integration/test_flight_sql.py
- Python: add ADBC example converting query results to DataFrame in
iotdb-client/client-py/flight_sql_example.py
- Root pom.xml: add flight-sql-jdbc-driver and adbc-driver-flight-sql
to dependency management
---
distribution/pom.xml | 6 +
distribution/src/assembly/all.xml | 4 +
distribution/src/assembly/datanode.xml | 4 +
.../src/assembly/external-service-impl.xml | 4 +
example/flight-sql-example/pom.xml | 58 ++++++
.../org/apache/iotdb/FlightSqlAdbcExample.java | 191 ++++++++++++++++++
.../java/org/apache/iotdb/FlightSqlExample.java | 205 +++++++++++++++++++
.../org/apache/iotdb/FlightSqlJdbcExample.java | 163 ++++++++++++++++
example/pom.xml | 1 +
iotdb-client/client-py/flight_sql_example.py | 182 +++++++++++++++++
.../client-py/tests/integration/test_flight_sql.py | 216 +++++++++++++++++++++
pom.xml | 10 +
12 files changed, 1044 insertions(+)
diff --git a/distribution/pom.xml b/distribution/pom.xml
index f4773f6afad..0736e09fb45 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -65,6 +65,12 @@
<version>2.0.7-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>flight-sql</artifactId>
+ <version>2.0.7-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/distribution/src/assembly/all.xml
b/distribution/src/assembly/all.xml
index 1b2e1be054f..9b34f44e011 100644
--- a/distribution/src/assembly/all.xml
+++ b/distribution/src/assembly/all.xml
@@ -100,6 +100,10 @@
<source>${maven.multiModuleProjectDirectory}/external-service-impl/rest/target/rest-${project.version}-jar-with-dependencies.jar</source>
<outputDirectory>lib</outputDirectory>
</file>
+ <file>
+
<source>${maven.multiModuleProjectDirectory}/external-service-impl/flight-sql/target/flight-sql-${project.version}-jar-with-dependencies.jar</source>
+ <outputDirectory>lib</outputDirectory>
+ </file>
</files>
<componentDescriptors>
<componentDescriptor>common-files.xml</componentDescriptor>
diff --git a/distribution/src/assembly/datanode.xml
b/distribution/src/assembly/datanode.xml
index 225fa5a7e7d..83443ac0ca6 100644
--- a/distribution/src/assembly/datanode.xml
+++ b/distribution/src/assembly/datanode.xml
@@ -83,6 +83,10 @@
<source>${maven.multiModuleProjectDirectory}/external-service-impl/rest/target/rest-${project.version}-jar-with-dependencies.jar</source>
<outputDirectory>lib</outputDirectory>
</file>
+ <file>
+
<source>${maven.multiModuleProjectDirectory}/external-service-impl/flight-sql/target/flight-sql-${project.version}-jar-with-dependencies.jar</source>
+ <outputDirectory>lib</outputDirectory>
+ </file>
</files>
<componentDescriptors>
<componentDescriptor>common-files.xml</componentDescriptor>
diff --git a/distribution/src/assembly/external-service-impl.xml
b/distribution/src/assembly/external-service-impl.xml
index c743fa85597..120cdfbae28 100644
--- a/distribution/src/assembly/external-service-impl.xml
+++ b/distribution/src/assembly/external-service-impl.xml
@@ -51,5 +51,9 @@
<source>${maven.multiModuleProjectDirectory}/external-service-impl/rest/target/rest-${project.version}-jar-with-dependencies.jar</source>
<outputDirectory>/</outputDirectory>
</file>
+ <file>
+
<source>${maven.multiModuleProjectDirectory}/external-service-impl/flight-sql/target/flight-sql-${project.version}-jar-with-dependencies.jar</source>
+ <outputDirectory>/</outputDirectory>
+ </file>
</files>
</assembly>
diff --git a/example/flight-sql-example/pom.xml
b/example/flight-sql-example/pom.xml
new file mode 100644
index 00000000000..c87e3682083
--- /dev/null
+++ b/example/flight-sql-example/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-examples</artifactId>
+ <version>2.0.7-SNAPSHOT</version>
+ </parent>
+ <artifactId>flight-sql-example</artifactId>
+ <name>IoTDB: Example: Arrow Flight SQL</name>
+ <properties>
+ <maven.compiler.source>11</maven.compiler.source>
+ <maven.compiler.target>11</maven.compiler.target>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>flight-sql</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-netty</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>flight-sql-jdbc-driver</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow.adbc</groupId>
+ <artifactId>adbc-driver-flight-sql</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/example/flight-sql-example/src/main/java/org/apache/iotdb/FlightSqlAdbcExample.java
b/example/flight-sql-example/src/main/java/org/apache/iotdb/FlightSqlAdbcExample.java
new file mode 100644
index 00000000000..9be6ee09337
--- /dev/null
+++
b/example/flight-sql-example/src/main/java/org/apache/iotdb/FlightSqlAdbcExample.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb;
+
+import org.apache.arrow.adbc.core.AdbcConnection;
+import org.apache.arrow.adbc.core.AdbcDatabase;
+import org.apache.arrow.adbc.core.AdbcDriver;
+import org.apache.arrow.adbc.core.AdbcStatement;
+import org.apache.arrow.adbc.driver.flightsql.FlightSqlDriver;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.Field;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Example demonstrating how to query IoTDB using ADBC (Arrow Database
Connectivity) with the Flight
+ * SQL driver in Java.
+ *
+ * <p>ADBC provides a database-agnostic API that returns data natively as
Apache Arrow columnar
+ * format, avoiding the overhead of row-by-row conversion. This is the
recommended approach for
+ * analytics and data science workloads.
+ *
+ * <p>Before running this example, make sure IoTDB is running with Arrow
Flight SQL enabled:
+ *
+ * <ul>
+ * <li>Set <code>enable_arrow_flight_sql_service=true</code> in
iotdb-system.properties
+ * <li>Default Flight SQL port is 8904
+ * </ul>
+ */
+public class FlightSqlAdbcExample {
+
+ private static final String HOST = "127.0.0.1";
+ private static final int PORT = 8904;
+ private static final String USER = "root";
+ private static final String PASSWORD = "root";
+
+ public static void main(String[] args) throws Exception {
+ // 1. Create Arrow memory allocator
+ try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+
+ // 2. Configure ADBC Flight SQL driver
+ AdbcDriver driver = new FlightSqlDriver(allocator);
+ Map<String, Object> dbParams = new HashMap<>();
+ dbParams.put(AdbcDriver.PARAM_URI.getKey(),
String.format("grpc+tcp://%s:%d", HOST, PORT));
+ dbParams.put(AdbcDriver.PARAM_USERNAME.getKey(), USER);
+ dbParams.put(AdbcDriver.PARAM_PASSWORD.getKey(), PASSWORD);
+
+ // 3. Open ADBC database and connection
+ try (AdbcDatabase db = driver.open(dbParams);
+ AdbcConnection connection = db.connect()) {
+ System.out.println("=== Connected to IoTDB via ADBC Flight SQL ===");
+ System.out.println("URI: grpc+tcp://" + HOST + ":" + PORT);
+
+ // 4. Create database and table, insert data
+ System.out.println("\n--- Setting up test data ---");
+ executeUpdate(connection, "CREATE DATABASE IF NOT EXISTS
adbc_example_db");
+ executeUpdate(
+ connection,
+ "CREATE TABLE IF NOT EXISTS adbc_example_db.sensor_data ("
+ + "region STRING TAG, "
+ + "device_id STRING TAG, "
+ + "temperature FLOAT FIELD, "
+ + "humidity DOUBLE FIELD, "
+ + "status BOOLEAN FIELD)");
+
+ executeUpdate(
+ connection,
+ "INSERT INTO adbc_example_db.sensor_data"
+ + "(time, region, device_id, temperature, humidity, status) "
+ + "VALUES(1, 'north', 'dev001', 25.5, 60.2, true)");
+ executeUpdate(
+ connection,
+ "INSERT INTO adbc_example_db.sensor_data"
+ + "(time, region, device_id, temperature, humidity, status) "
+ + "VALUES(2, 'north', 'dev001', 26.1, 58.5, true)");
+ executeUpdate(
+ connection,
+ "INSERT INTO adbc_example_db.sensor_data"
+ + "(time, region, device_id, temperature, humidity, status) "
+ + "VALUES(3, 'south', 'dev002', 30.2, 45.0, false)");
+ executeUpdate(
+ connection,
+ "INSERT INTO adbc_example_db.sensor_data"
+ + "(time, region, device_id, temperature, humidity, status) "
+ + "VALUES(4, 'south', 'dev002', 31.5, 42.3, false)");
+ System.out.println("Test data inserted successfully.");
+
+ // 5. Query: Full table scan — results returned as Arrow
VectorSchemaRoot
+ System.out.println("\n=== Query 1: Full table scan (Arrow
VectorSchemaRoot) ===");
+ executeQuery(
+ connection,
+ "SELECT time, region, device_id, temperature, humidity, status "
+ + "FROM adbc_example_db.sensor_data ORDER BY time");
+
+ // 6. Filtered query
+ System.out.println("\n=== Query 2: Filtered query (region = 'north')
===");
+ executeQuery(
+ connection,
+ "SELECT time, device_id, temperature, humidity "
+ + "FROM adbc_example_db.sensor_data WHERE region = 'north'
ORDER BY time");
+
+ // 7. Aggregation query
+ System.out.println("\n=== Query 3: Aggregation ===");
+ executeQuery(
+ connection,
+ "SELECT region, COUNT(*) as cnt, AVG(temperature) as avg_temp "
+ + "FROM adbc_example_db.sensor_data GROUP BY region ORDER BY
region");
+
+ // 8. Cleanup
+ executeUpdate(connection, "DROP DATABASE IF EXISTS adbc_example_db");
+ System.out.println("\n=== ADBC Example completed successfully ===");
+ }
+ }
+ }
+
+ /** Execute a non-query SQL statement via ADBC. */
+ private static void executeUpdate(AdbcConnection connection, String sql)
throws Exception {
+ try (AdbcStatement statement = connection.createStatement()) {
+ statement.setSqlQuery(sql);
+ statement.executeUpdate();
+ }
+ }
+
+ /** Execute a query via ADBC and print Arrow columnar results. */
+ private static void executeQuery(AdbcConnection connection, String sql)
throws Exception {
+ try (AdbcStatement statement = connection.createStatement()) {
+ statement.setSqlQuery(sql);
+ AdbcStatement.QueryResult result = statement.executeQuery();
+ try (ArrowReader reader = result.getReader()) {
+ System.out.println("SQL: " + sql);
+
+ int totalRows = 0;
+ boolean headerPrinted = false;
+
+ while (reader.loadNextBatch()) {
+ VectorSchemaRoot root = reader.getVectorSchemaRoot();
+
+ // Print header once
+ if (!headerPrinted) {
+ List<Field> fields = root.getSchema().getFields();
+ System.out.println("Schema: " + fields.size() + " columns");
+ StringBuilder header = new StringBuilder();
+ StringBuilder separator = new StringBuilder();
+ for (Field field : fields) {
+ header.append(String.format("%-20s", field.getName()));
+ separator.append("--------------------");
+ }
+ System.out.println(header);
+ System.out.println(separator);
+ headerPrinted = true;
+ }
+
+ // Print rows from this batch
+ int rowCount = root.getRowCount();
+ for (int i = 0; i < rowCount; i++) {
+ StringBuilder row = new StringBuilder();
+ for (FieldVector vector : root.getFieldVectors()) {
+ Object value = vector.getObject(i);
+ row.append(String.format("%-20s", value == null ? "null" :
value.toString()));
+ }
+ System.out.println(row);
+ totalRows++;
+ }
+ }
+ System.out.println("Total rows: " + totalRows);
+ }
+ }
+ }
+}
diff --git
a/example/flight-sql-example/src/main/java/org/apache/iotdb/FlightSqlExample.java
b/example/flight-sql-example/src/main/java/org/apache/iotdb/FlightSqlExample.java
new file mode 100644
index 00000000000..ffe111f3950
--- /dev/null
+++
b/example/flight-sql-example/src/main/java/org/apache/iotdb/FlightSqlExample.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb;
+
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightEndpoint;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.flight.auth2.BasicAuthCredentialWriter;
+import org.apache.arrow.flight.auth2.ClientBearerHeaderHandler;
+import org.apache.arrow.flight.auth2.ClientIncomingAuthHeaderMiddleware;
+import org.apache.arrow.flight.grpc.CredentialCallOption;
+import org.apache.arrow.flight.sql.FlightSqlClient;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.util.List;
+
+/**
+ * Example demonstrating how to use Arrow Flight SQL to query IoTDB.
+ *
+ * <p>Before running this example, make sure IoTDB is running with Arrow
Flight SQL enabled:
+ *
+ * <ul>
+ * <li>Set <code>enable_arrow_flight_sql_service=true</code> in
iotdb-system.properties
+ * <li>Default Flight SQL port is 8904
+ * </ul>
+ */
+public class FlightSqlExample {
+
+ private static final String HOST = "127.0.0.1";
+ private static final int PORT = 8904;
+ private static final String USER = "root";
+ private static final String PASSWORD = "root";
+
+ public static void main(String[] args) throws Exception {
+ // 1. Create Arrow allocator and Flight SQL client
+ try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+ Location location = Location.forGrpcInsecure(HOST, PORT);
+
+ // Set up Bearer token authentication middleware
+ ClientIncomingAuthHeaderMiddleware.Factory authFactory =
+ new ClientIncomingAuthHeaderMiddleware.Factory(new
ClientBearerHeaderHandler());
+
+ try (FlightClient flightClient =
+ FlightClient.builder(allocator,
location).intercept(authFactory).build();
+ FlightSqlClient sqlClient = new FlightSqlClient(flightClient)) {
+
+ // 2. Authenticate with username/password to get Bearer token
+ CredentialCallOption bearerToken =
+ new CredentialCallOption(new BasicAuthCredentialWriter(USER,
PASSWORD));
+ System.out.println("=== Connected to IoTDB Flight SQL service ===");
+ System.out.println("Host: " + HOST + ", Port: " + PORT);
+
+ // 3. Create database and table, insert data
+ System.out.println("\n--- Setting up test data ---");
+ executeUpdate(sqlClient, "CREATE DATABASE IF NOT EXISTS
flight_example_db", bearerToken);
+ executeUpdate(
+ sqlClient,
+ "CREATE TABLE IF NOT EXISTS flight_example_db.sensor_data ("
+ + "region STRING TAG, "
+ + "device_id STRING TAG, "
+ + "temperature FLOAT FIELD, "
+ + "humidity DOUBLE FIELD, "
+ + "status BOOLEAN FIELD, "
+ + "description TEXT FIELD)",
+ bearerToken);
+
+ // Insert sample data
+ executeUpdate(
+ sqlClient,
+ "INSERT INTO flight_example_db.sensor_data"
+ + "(time, region, device_id, temperature, humidity, status,
description) "
+ + "VALUES(1, 'north', 'dev001', 25.5, 60.2, true, 'normal')",
+ bearerToken);
+ executeUpdate(
+ sqlClient,
+ "INSERT INTO flight_example_db.sensor_data"
+ + "(time, region, device_id, temperature, humidity, status,
description) "
+ + "VALUES(2, 'north', 'dev001', 26.1, 58.5, true, 'normal')",
+ bearerToken);
+ executeUpdate(
+ sqlClient,
+ "INSERT INTO flight_example_db.sensor_data"
+ + "(time, region, device_id, temperature, humidity, status,
description) "
+ + "VALUES(3, 'south', 'dev002', 30.2, 45.0, false, 'high
temp')",
+ bearerToken);
+ executeUpdate(
+ sqlClient,
+ "INSERT INTO flight_example_db.sensor_data"
+ + "(time, region, device_id, temperature, humidity, status,
description) "
+ + "VALUES(4, 'south', 'dev002', 31.5, 42.3, false, 'high
temp')",
+ bearerToken);
+ System.out.println("Test data inserted successfully.");
+
+ // 4. Query: SELECT all data
+ System.out.println("\n=== Query 1: Full table scan ===");
+ executeQuery(
+ sqlClient,
+ "SELECT time, region, device_id, temperature, humidity, status,
description "
+ + "FROM flight_example_db.sensor_data ORDER BY time",
+ bearerToken);
+
+ // 5. Query with filter
+ System.out.println("\n=== Query 2: Filtered query (region = 'north')
===");
+ executeQuery(
+ sqlClient,
+ "SELECT time, device_id, temperature, humidity "
+ + "FROM flight_example_db.sensor_data WHERE region = 'north'
ORDER BY time",
+ bearerToken);
+
+ // 6. Aggregation query
+ System.out.println("\n=== Query 3: Aggregation (AVG temperature by
region) ===");
+ executeQuery(
+ sqlClient,
+ "SELECT region, COUNT(*) as cnt, AVG(temperature) as avg_temp "
+ + "FROM flight_example_db.sensor_data GROUP BY region ORDER BY
region",
+ bearerToken);
+
+ // 7. Cleanup
+ executeUpdate(sqlClient, "DROP DATABASE IF EXISTS flight_example_db",
bearerToken);
+ System.out.println("\n=== Example completed successfully ===");
+ }
+ }
+ }
+
+ /** Execute a non-query SQL statement (DDL/DML). */
+ private static void executeUpdate(
+ FlightSqlClient sqlClient, String sql, CredentialCallOption bearerToken)
throws Exception {
+ FlightInfo info = sqlClient.execute(sql, bearerToken);
+ // Consume the stream to ensure execution
+ for (FlightEndpoint endpoint : info.getEndpoints()) {
+ try (FlightStream stream = sqlClient.getStream(endpoint.getTicket(),
bearerToken)) {
+ while (stream.next()) {
+ // consume
+ }
+ }
+ }
+ }
+
+ /** Execute a query and print results in table format. */
+ private static void executeQuery(
+ FlightSqlClient sqlClient, String sql, CredentialCallOption bearerToken)
throws Exception {
+ FlightInfo flightInfo = sqlClient.execute(sql, bearerToken);
+
+ // Print schema
+ Schema schema = flightInfo.getSchema();
+ List<Field> fields = schema.getFields();
+ System.out.println("SQL: " + sql);
+ System.out.println("Schema: " + fields.size() + " columns");
+
+ // Print column headers
+ StringBuilder header = new StringBuilder();
+ StringBuilder separator = new StringBuilder();
+ for (Field field : fields) {
+ String colName = field.getName();
+ header.append(String.format("%-20s", colName));
+ separator.append("--------------------");
+ }
+ System.out.println(header);
+ System.out.println(separator);
+
+ // Fetch and print rows
+ int totalRows = 0;
+ for (FlightEndpoint endpoint : flightInfo.getEndpoints()) {
+ try (FlightStream stream = sqlClient.getStream(endpoint.getTicket(),
bearerToken)) {
+ while (stream.next()) {
+ VectorSchemaRoot root = stream.getRoot();
+ int rowCount = root.getRowCount();
+ for (int i = 0; i < rowCount; i++) {
+ StringBuilder row = new StringBuilder();
+ for (FieldVector vector : root.getFieldVectors()) {
+ Object value = vector.getObject(i);
+ row.append(String.format("%-20s", value == null ? "null" :
value.toString()));
+ }
+ System.out.println(row);
+ totalRows++;
+ }
+ }
+ }
+ }
+ System.out.println("Total rows: " + totalRows);
+ }
+}
diff --git
a/example/flight-sql-example/src/main/java/org/apache/iotdb/FlightSqlJdbcExample.java
b/example/flight-sql-example/src/main/java/org/apache/iotdb/FlightSqlJdbcExample.java
new file mode 100644
index 00000000000..71c3fd58658
--- /dev/null
+++
b/example/flight-sql-example/src/main/java/org/apache/iotdb/FlightSqlJdbcExample.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+/**
+ * Example demonstrating how to query IoTDB via Arrow Flight SQL JDBC driver.
+ *
+ * <p>The Arrow Flight SQL JDBC driver ({@code
org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver})
+ * provides a standard JDBC interface over the Arrow Flight SQL protocol,
allowing you to use
+ * familiar JDBC APIs with Arrow's high-performance columnar data transfer.
+ *
+ * <p>Before running this example, make sure IoTDB is running with Arrow
Flight SQL enabled:
+ *
+ * <ul>
+ * <li>Set <code>enable_arrow_flight_sql_service=true</code> in
iotdb-system.properties
+ * <li>Default Flight SQL port is 8904
+ * </ul>
+ */
+public class FlightSqlJdbcExample {
+
+ private static final String HOST = "127.0.0.1";
+ private static final int PORT = 8904;
+ private static final String USER = "root";
+ private static final String PASSWORD = "root";
+
+ public static void main(String[] args) throws Exception {
+ // The Arrow Flight SQL JDBC driver URL format
+ String jdbcUrl =
+ String.format(
+
"jdbc:arrow-flight-sql://%s:%d?useEncryption=false&useSystemTrustStore=false",
+ HOST, PORT);
+
+ // Set connection properties
+ Properties properties = new Properties();
+ properties.put("user", USER);
+ properties.put("password", PASSWORD);
+
+ // Load the Arrow Flight SQL JDBC Driver
+ Class.forName("org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver");
+
+ try (Connection connection = DriverManager.getConnection(jdbcUrl,
properties)) {
+ System.out.println("=== Connected to IoTDB via Arrow Flight SQL JDBC
===");
+ System.out.println("URL: " + jdbcUrl);
+
+ try (Statement statement = connection.createStatement()) {
+ // 1. Create database and table
+ System.out.println("\n--- Setting up test data ---");
+ statement.execute("CREATE DATABASE IF NOT EXISTS jdbc_flight_db");
+ statement.execute(
+ "CREATE TABLE IF NOT EXISTS jdbc_flight_db.sensor_data ("
+ + "region STRING TAG, "
+ + "device_id STRING TAG, "
+ + "temperature FLOAT FIELD, "
+ + "humidity DOUBLE FIELD, "
+ + "status BOOLEAN FIELD)");
+
+ // 2. Insert data
+ statement.execute(
+ "INSERT INTO jdbc_flight_db.sensor_data"
+ + "(time, region, device_id, temperature, humidity, status) "
+ + "VALUES(1, 'north', 'dev001', 25.5, 60.2, true)");
+ statement.execute(
+ "INSERT INTO jdbc_flight_db.sensor_data"
+ + "(time, region, device_id, temperature, humidity, status) "
+ + "VALUES(2, 'north', 'dev001', 26.1, 58.5, true)");
+ statement.execute(
+ "INSERT INTO jdbc_flight_db.sensor_data"
+ + "(time, region, device_id, temperature, humidity, status) "
+ + "VALUES(3, 'south', 'dev002', 30.2, 45.0, false)");
+ statement.execute(
+ "INSERT INTO jdbc_flight_db.sensor_data"
+ + "(time, region, device_id, temperature, humidity, status) "
+ + "VALUES(4, 'south', 'dev002', 31.5, 42.3, false)");
+ System.out.println("Test data inserted successfully.");
+
+ // 3. Query: Full table scan
+ System.out.println("\n=== Query 1: Full table scan (JDBC ResultSet)
===");
+ try (ResultSet rs =
+ statement.executeQuery(
+ "SELECT time, region, device_id, temperature, humidity, status
"
+ + "FROM jdbc_flight_db.sensor_data ORDER BY time")) {
+ printResultSet(rs);
+ }
+
+ // 4. Filtered query
+ System.out.println("\n=== Query 2: Filtered query (region = 'north')
===");
+ try (ResultSet rs =
+ statement.executeQuery(
+ "SELECT time, device_id, temperature, humidity "
+ + "FROM jdbc_flight_db.sensor_data WHERE region = 'north'
ORDER BY time")) {
+ printResultSet(rs);
+ }
+
+ // 5. Aggregation query
+ System.out.println("\n=== Query 3: Aggregation ===");
+ try (ResultSet rs =
+ statement.executeQuery(
+ "SELECT region, COUNT(*) as cnt, AVG(temperature) as avg_temp "
+ + "FROM jdbc_flight_db.sensor_data GROUP BY region ORDER
BY region")) {
+ printResultSet(rs);
+ }
+
+ // 6. Cleanup
+ statement.execute("DROP DATABASE IF EXISTS jdbc_flight_db");
+ }
+
+ System.out.println("\n=== JDBC Example completed successfully ===");
+ }
+ }
+
+ /** Print a JDBC ResultSet in table format. */
+ private static void printResultSet(ResultSet rs) throws SQLException {
+ ResultSetMetaData metaData = rs.getMetaData();
+ int columnCount = metaData.getColumnCount();
+
+ // Print column headers
+ StringBuilder header = new StringBuilder();
+ StringBuilder separator = new StringBuilder();
+ for (int i = 1; i <= columnCount; i++) {
+ header.append(String.format("%-20s", metaData.getColumnName(i)));
+ separator.append("--------------------");
+ }
+ System.out.println(header);
+ System.out.println(separator);
+
+ // Print rows
+ int rowCount = 0;
+ while (rs.next()) {
+ StringBuilder row = new StringBuilder();
+ for (int i = 1; i <= columnCount; i++) {
+ Object value = rs.getObject(i);
+ row.append(String.format("%-20s", value == null ? "null" :
value.toString()));
+ }
+ System.out.println(row);
+ rowCount++;
+ }
+ System.out.println("Total rows: " + rowCount);
+ }
+}
diff --git a/example/pom.xml b/example/pom.xml
index 9af648e0d26..10af5c4cfbc 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -40,6 +40,7 @@
<module>session</module>
<module>trigger</module>
<module>udf</module>
+ <module>flight-sql-example</module>
</modules>
<build>
<pluginManagement>
diff --git a/iotdb-client/client-py/flight_sql_example.py
b/iotdb-client/client-py/flight_sql_example.py
new file mode 100644
index 00000000000..2b6b53b8d80
--- /dev/null
+++ b/iotdb-client/client-py/flight_sql_example.py
@@ -0,0 +1,182 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Flight SQL Example: Query IoTDB via ADBC and convert results to pandas
DataFrame.
+
+This example demonstrates how to use the ADBC Flight SQL driver to:
+1. Connect to IoTDB's Arrow Flight SQL service
+2. Execute SQL queries
+3. Convert query results directly to pandas DataFrame
+
+Requirements:
+ pip install adbc_driver_flightsql pyarrow pandas
+
+Before running, ensure IoTDB is running with Arrow Flight SQL enabled:
+ - Set enable_arrow_flight_sql_service=true in iotdb-system.properties
+ - Default Flight SQL port is 8904
+"""
+
+import adbc_driver_flightsql.dbapi as flight_sql
+import pandas as pd
+
+# IoTDB Flight SQL connection parameters
+FLIGHT_SQL_URI = "grpc://localhost:8904"
+USERNAME = "root"
+PASSWORD = "root"
+
+
+def get_connection():
+ """Create an ADBC Flight SQL connection to IoTDB."""
+ return flight_sql.connect(
+ uri=FLIGHT_SQL_URI,
+ db_kwargs={"username": USERNAME, "password": PASSWORD},
+ )
+
+
+def setup_data(conn):
+ """Create database, table, and insert sample data."""
+ cursor = conn.cursor()
+
+ # Create database
+ cursor.execute("CREATE DATABASE IF NOT EXISTS flight_example_db")
+
+ # Create table
+ cursor.execute(
+ "CREATE TABLE IF NOT EXISTS flight_example_db.sensor_data ("
+ "region STRING TAG, "
+ "device_id STRING TAG, "
+ "temperature FLOAT FIELD, "
+ "humidity DOUBLE FIELD, "
+ "status BOOLEAN FIELD, "
+ "description TEXT FIELD)"
+ )
+
+ # Insert sample data
+ insert_sqls = [
+ "INSERT INTO flight_example_db.sensor_data"
+ "(time, region, device_id, temperature, humidity, status, description)
"
+ "VALUES(1, 'north', 'dev001', 25.5, 60.2, true, 'normal')",
+ "INSERT INTO flight_example_db.sensor_data"
+ "(time, region, device_id, temperature, humidity, status, description)
"
+ "VALUES(2, 'north', 'dev001', 26.1, 58.5, true, 'normal')",
+ "INSERT INTO flight_example_db.sensor_data"
+ "(time, region, device_id, temperature, humidity, status, description)
"
+ "VALUES(3, 'south', 'dev002', 30.2, 45.0, false, 'high temp')",
+ "INSERT INTO flight_example_db.sensor_data"
+ "(time, region, device_id, temperature, humidity, status, description)
"
+ "VALUES(4, 'south', 'dev002', 31.5, 42.3, false, 'high temp')",
+ "INSERT INTO flight_example_db.sensor_data"
+ "(time, region, device_id, temperature, humidity, status, description)
"
+ "VALUES(5, 'north', 'dev003', 22.0, 70.1, true, 'cool')",
+ ]
+ for sql in insert_sqls:
+ cursor.execute(sql)
+
+ cursor.close()
+ print("Sample data inserted successfully.")
+
+
+def query_to_dataframe(conn, sql):
+ """Execute a SQL query and return results as a pandas DataFrame."""
+ cursor = conn.cursor()
+ cursor.execute(sql)
+ # Fetch result as Arrow Table, then convert to pandas DataFrame
+ arrow_table = cursor.fetch_arrow_table()
+ df = arrow_table.to_pandas()
+ cursor.close()
+ return df
+
+
+def main():
+ print("=" * 60)
+ print("IoTDB Arrow Flight SQL Example (Python ADBC)")
+ print("=" * 60)
+
+ # Connect to IoTDB Flight SQL service
+ conn = get_connection()
+ print(f"\nConnected to IoTDB Flight SQL at {FLIGHT_SQL_URI}")
+
+ # Setup sample data
+ print("\n--- Setting up test data ---")
+ setup_data(conn)
+
+ # Example 1: Full table scan → DataFrame
+ print("\n" + "=" * 60)
+ print("Example 1: Full table scan → DataFrame")
+ print("=" * 60)
+ df = query_to_dataframe(
+ conn,
+ "SELECT time, region, device_id, temperature, humidity, status,
description "
+ "FROM flight_example_db.sensor_data ORDER BY time",
+ )
+ print(f"\nDataFrame shape: {df.shape}")
+ print(f"Column types:\n{df.dtypes}\n")
+ print(df.to_string(index=False))
+
+ # Example 2: Filtered query → DataFrame
+ print("\n" + "=" * 60)
+ print("Example 2: Filtered query (region = 'north') → DataFrame")
+ print("=" * 60)
+ df_north = query_to_dataframe(
+ conn,
+ "SELECT time, device_id, temperature, humidity "
+ "FROM flight_example_db.sensor_data WHERE region = 'north' ORDER BY
time",
+ )
+ print(f"\nDataFrame shape: {df_north.shape}")
+ print(df_north.to_string(index=False))
+
+ # Example 3: Aggregation query → DataFrame
+ print("\n" + "=" * 60)
+ print("Example 3: Aggregation → DataFrame")
+ print("=" * 60)
+ df_agg = query_to_dataframe(
+ conn,
+ "SELECT region, COUNT(*) as cnt, AVG(temperature) as avg_temp, "
+ "MAX(humidity) as max_humidity "
+ "FROM flight_example_db.sensor_data GROUP BY region ORDER BY region",
+ )
+ print(f"\nAggregation DataFrame shape: {df_agg.shape}")
+ print(df_agg.to_string(index=False))
+
+ # Example 4: Using pandas operations on the DataFrame
+ print("\n" + "=" * 60)
+ print("Example 4: Pandas operations on Flight SQL results")
+ print("=" * 60)
+ df_all = query_to_dataframe(
+ conn,
+ "SELECT time, region, device_id, temperature, humidity "
+ "FROM flight_example_db.sensor_data ORDER BY time",
+ )
+ # Compute statistics
+ print(f"\nTemperature statistics:\n{df_all['temperature'].describe()}")
+ print(f"\nGroup by region mean:\n{df_all.groupby('region')[['temperature',
'humidity']].mean()}")
+
+ # Cleanup
+ cursor = conn.cursor()
+ cursor.execute("DROP DATABASE IF EXISTS flight_example_db")
+ cursor.close()
+
+ conn.close()
+ print("\n" + "=" * 60)
+ print("Example completed successfully!")
+ print("=" * 60)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/iotdb-client/client-py/tests/integration/test_flight_sql.py
b/iotdb-client/client-py/tests/integration/test_flight_sql.py
new file mode 100644
index 00000000000..03ef3ac9271
--- /dev/null
+++ b/iotdb-client/client-py/tests/integration/test_flight_sql.py
@@ -0,0 +1,216 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Integration tests for Arrow Flight SQL service in IoTDB.
+
+Requirements:
+ pip install adbc_driver_flightsql pyarrow pandas
+"""
+
+import pytest
+
+try:
+ import adbc_driver_flightsql.dbapi as flight_sql
+ import pyarrow
+
+ HAS_FLIGHT_SQL = True
+except ImportError:
+ HAS_FLIGHT_SQL = False
+
+from iotdb.Session import Session
+from iotdb.utils.IoTDBConstants import TSDataType
+from iotdb.utils.Tablet import ColumnType, Tablet
+from .iotdb_container import IoTDBContainer
+
+# Default Arrow Flight SQL port
+FLIGHT_SQL_PORT = 8904
+
+
[email protected](scope="module")
+def iotdb_flight_sql():
+ """Start an IoTDB container with Flight SQL port exposed and prepare test
data."""
+ with IoTDBContainer("iotdb:dev") as db:
+ db.with_exposed_ports(FLIGHT_SQL_PORT)
+
+ # Insert test data via standard session
+ session = Session(
+ db.get_container_host_ip(),
+ db.get_exposed_port(6667),
+ "root",
+ "root",
+ )
+ session.open(False)
+
+ # Create database and table
+ session.execute_non_query_statement("CREATE DATABASE IF NOT EXISTS
flight_test_db")
+ session.execute_statement(
+ "CREATE TABLE IF NOT EXISTS flight_test_db.test_table ("
+ "device_id STRING TAG, "
+ "temperature FLOAT FIELD, "
+ "humidity DOUBLE FIELD, "
+ "status BOOLEAN FIELD, "
+ "info TEXT FIELD)"
+ )
+
+ # Insert sample data
+ column_names = ["device_id", "temperature", "humidity", "status",
"info"]
+ data_types = [
+ TSDataType.STRING,
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.BOOLEAN,
+ TSDataType.STRING,
+ ]
+ column_types = [
+ ColumnType.TAG,
+ ColumnType.FIELD,
+ ColumnType.FIELD,
+ ColumnType.FIELD,
+ ColumnType.FIELD,
+ ]
+ timestamps = list(range(1, 6))
+ values = [
+ ["dev001", 25.5, 60.2, True, "normal"],
+ ["dev001", 26.1, 58.5, True, "normal"],
+ ["dev002", 30.2, 45.0, False, "high temp"],
+ ["dev002", 31.5, 42.3, False, "high temp"],
+ ["dev001", 24.8, 62.1, True, "normal"],
+ ]
+ tablet = Tablet(
+ "test_table", column_names, data_types, values, timestamps,
column_types
+ )
+ session.insert_relational_tablet(tablet)
+ session.execute_non_query_statement("FLUSH")
+ session.close()
+
+ yield db
+
+
[email protected](not HAS_FLIGHT_SQL, reason="adbc_driver_flightsql not
installed")
+def test_flight_sql_query(iotdb_flight_sql):
+ """Test querying IoTDB via Arrow Flight SQL and verify Arrow Table
result."""
+ db = iotdb_flight_sql
+ host = db.get_container_host_ip()
+ port = db.get_exposed_port(FLIGHT_SQL_PORT)
+
+ uri = f"grpc://{host}:{port}"
+ conn = flight_sql.connect(
+ uri=uri,
+ db_kwargs={"username": "root", "password": "root"},
+ )
+ cursor = conn.cursor()
+
+ cursor.execute(
+ "SELECT time, device_id, temperature, humidity, status, info "
+ "FROM flight_test_db.test_table ORDER BY time"
+ )
+ table = cursor.fetch_arrow_table()
+
+ # Verify we got all 5 rows
+ assert table.num_rows == 5, f"Expected 5 rows, got {table.num_rows}"
+ # Verify columns
+ assert table.num_columns == 6, f"Expected 6 columns, got
{table.num_columns}"
+
+ cursor.close()
+ conn.close()
+
+
[email protected](not HAS_FLIGHT_SQL, reason="adbc_driver_flightsql not
installed")
+def test_flight_sql_to_dataframe(iotdb_flight_sql):
+ """Test converting Flight SQL results directly to pandas DataFrame."""
+ db = iotdb_flight_sql
+ host = db.get_container_host_ip()
+ port = db.get_exposed_port(FLIGHT_SQL_PORT)
+
+ uri = f"grpc://{host}:{port}"
+ conn = flight_sql.connect(
+ uri=uri,
+ db_kwargs={"username": "root", "password": "root"},
+ )
+ cursor = conn.cursor()
+
+ cursor.execute(
+ "SELECT time, device_id, temperature, humidity "
+ "FROM flight_test_db.test_table ORDER BY time"
+ )
+ table = cursor.fetch_arrow_table()
+ df = table.to_pandas()
+
+ # Verify DataFrame shape
+ assert df.shape == (5, 4), f"Expected shape (5, 4), got {df.shape}"
+
+ # Verify column names
+ expected_cols = {"time", "device_id", "temperature", "humidity"}
+ assert set(df.columns) == expected_cols, f"Unexpected columns:
{df.columns.tolist()}"
+
+ cursor.close()
+ conn.close()
+
+
[email protected](not HAS_FLIGHT_SQL, reason="adbc_driver_flightsql not
installed")
+def test_flight_sql_aggregation(iotdb_flight_sql):
+ """Test aggregation query via Flight SQL."""
+ db = iotdb_flight_sql
+ host = db.get_container_host_ip()
+ port = db.get_exposed_port(FLIGHT_SQL_PORT)
+
+ uri = f"grpc://{host}:{port}"
+ conn = flight_sql.connect(
+ uri=uri,
+ db_kwargs={"username": "root", "password": "root"},
+ )
+ cursor = conn.cursor()
+
+ cursor.execute(
+ "SELECT device_id, COUNT(*) as cnt, AVG(temperature) as avg_temp "
+ "FROM flight_test_db.test_table GROUP BY device_id ORDER BY device_id"
+ )
+ table = cursor.fetch_arrow_table()
+ df = table.to_pandas()
+
+ # Should have 2 groups (dev001 and dev002)
+ assert df.shape[0] == 2, f"Expected 2 groups, got {df.shape[0]}"
+
+ cursor.close()
+ conn.close()
+
+
[email protected](not HAS_FLIGHT_SQL, reason="adbc_driver_flightsql not
installed")
+def test_flight_sql_empty_result(iotdb_flight_sql):
+ """Test query that returns empty result via Flight SQL."""
+ db = iotdb_flight_sql
+ host = db.get_container_host_ip()
+ port = db.get_exposed_port(FLIGHT_SQL_PORT)
+
+ uri = f"grpc://{host}:{port}"
+ conn = flight_sql.connect(
+ uri=uri,
+ db_kwargs={"username": "root", "password": "root"},
+ )
+ cursor = conn.cursor()
+
+ cursor.execute(
+ "SELECT * FROM flight_test_db.test_table WHERE device_id =
'nonexistent'"
+ )
+ table = cursor.fetch_arrow_table()
+
+ assert table.num_rows == 0, f"Expected 0 rows, got {table.num_rows}"
+
+ cursor.close()
+ conn.close()
diff --git a/pom.xml b/pom.xml
index 334b89200b5..9312be038d3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -759,6 +759,16 @@
<artifactId>arrow-memory-netty</artifactId>
<version>${arrow.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>flight-sql-jdbc-driver</artifactId>
+ <version>${arrow.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow.adbc</groupId>
+ <artifactId>adbc-driver-flight-sql</artifactId>
+ <version>0.22.0</version>
+ </dependency>
</dependencies>
</dependencyManagement>
<build>