This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ai-code/flight-sql
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 835b516cab32c6a9f3e1722581a900fa6b672222
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Feb 24 12:13:00 2026 +0800

    partial fix
---
 .../apache/iotdb/flight/FlightSqlAuthHandler.java  |  2 +-
 .../iotdb/flight/IoTDBFlightSqlProducer.java       | 53 ++++++++++++---
 integration-test/pom.xml                           |  7 ++
 integration-test/src/assembly/mpp-share.xml        |  4 ++
 .../it/flightsql/IoTDBArrowFlightSqlIT.java        | 77 +++++++++++++---------
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 26 +++++++-
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |  4 ++
 7 files changed, 130 insertions(+), 43 deletions(-)

diff --git 
a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java
 
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java
index b591665eab0..cb8faee54b4 100644
--- 
a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java
+++ 
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java
@@ -48,7 +48,7 @@ public class FlightSqlAuthHandler implements 
BasicCallHeaderAuthenticator.Creden
     LOGGER.debug("Validating credentials for user: {}", username);
 
     try {
-      String token = sessionManager.authenticate(username, password, 
"unknown");
+      String token = sessionManager.authenticate(username, password, 
"127.0.0.1");
       // Return the token as the peer identity; 
GeneratedBearerTokenAuthenticator
       // wraps it in a Bearer token and sets it in the response header.
       return () -> token;
diff --git 
a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java
 
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java
index 067ccfb20d4..fa8286db3bf 100644
--- 
a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java
+++ 
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java
@@ -54,7 +54,6 @@ import org.apache.tsfile.read.common.block.TsBlock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.charset.StandardCharsets;
 import java.time.ZoneId;
 import java.util.Collections;
 import java.util.Optional;
@@ -109,7 +108,7 @@ public class IoTDBFlightSqlProducer implements 
FlightSqlProducer {
   public FlightInfo getFlightInfoStatement(
       FlightSql.CommandStatementQuery command, CallContext context, 
FlightDescriptor descriptor) {
     String sql = command.getQuery();
-    LOGGER.debug("getFlightInfoStatement: {}", sql);
+    LOGGER.warn("getFlightInfoStatement called with SQL: {}", sql);
 
     IClientSession session = getSessionFromContext(context);
 
@@ -120,6 +119,18 @@ public class IoTDBFlightSqlProducer implements 
FlightSqlProducer {
       // Parse the SQL statement
       Statement statement = sqlParser.createStatement(sql, 
ZoneId.systemDefault(), session);
 
+      // Handle USE database statement: set database on session and return 
empty
+      // result
+      if (statement instanceof 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use) {
+        org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use useStmt =
+            (org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use) 
statement;
+        String dbName = useStmt.getDatabaseId().getValue();
+        session.setDatabaseName(dbName);
+        LOGGER.info("Flight SQL session database set to: {}", dbName);
+        Schema emptySchema = new Schema(Collections.emptyList());
+        return new FlightInfo(emptySchema, descriptor, 
Collections.emptyList(), -1, -1);
+      }
+
       // Execute via Coordinator (Table model)
       ExecutionResult result =
           coordinator.executeForTableModel(
@@ -153,20 +164,31 @@ public class IoTDBFlightSqlProducer implements 
FlightSqlProducer {
       // Store the query context for later getStream calls
       activeQueries.put(queryId, new QueryContext(queryExecution, header, 
session));
 
-      // Build ticket containing the queryId
-      byte[] ticketBytes = 
Long.toString(queryId).getBytes(StandardCharsets.UTF_8);
-      Ticket ticket = new Ticket(ticketBytes);
+      // Build ticket containing the queryId as a TicketStatementQuery 
protobuf.
+      // FlightSqlClient.getStream() expects this format to route DoGet to
+      // getStreamStatement().
+      ByteString handle = ByteString.copyFromUtf8(Long.toString(queryId));
+      FlightSql.TicketStatementQuery ticketQuery =
+          
FlightSql.TicketStatementQuery.newBuilder().setStatementHandle(handle).build();
+      Ticket ticket = new 
Ticket(com.google.protobuf.Any.pack(ticketQuery).toByteArray());
       FlightEndpoint endpoint = new FlightEndpoint(ticket);
 
       return new FlightInfo(arrowSchema, descriptor, 
Collections.singletonList(endpoint), -1, -1);
 
-    } catch (RuntimeException e) {
+    } catch (Exception e) {
       // Cleanup on error
       if (queryId != null) {
         coordinator.cleanupQueryExecution(queryId);
         activeQueries.remove(queryId);
       }
-      throw e;
+      LOGGER.error("Error executing Flight SQL query: {}", sql, e);
+      if (e instanceof org.apache.arrow.flight.FlightRuntimeException) {
+        throw (org.apache.arrow.flight.FlightRuntimeException) e;
+      }
+      throw CallStatus.INTERNAL
+          .withDescription("Query execution error: " + e.getMessage())
+          .withCause(e)
+          .toRuntimeException();
     }
   }
 
@@ -185,7 +207,16 @@ public class IoTDBFlightSqlProducer implements 
FlightSqlProducer {
       ServerStreamListener listener) {
     ByteString handle = ticketQuery.getStatementHandle();
     long queryId = Long.parseLong(handle.toStringUtf8());
-    streamQueryResults(queryId, listener);
+    LOGGER.warn("getStreamStatement called for queryId={}", queryId);
+    try {
+      streamQueryResults(queryId, listener);
+    } catch (Exception e) {
+      LOGGER.error("getStreamStatement failed for queryId={}", queryId, e);
+      listener.error(
+          CallStatus.INTERNAL
+              .withDescription("getStreamStatement error: " + e.getMessage())
+              .toRuntimeException());
+    }
   }
 
   /** Streams query results for a given queryId as Arrow VectorSchemaRoot 
batches. */
@@ -220,6 +251,12 @@ public class IoTDBFlightSqlProducer implements 
FlightSqlProducer {
     } catch (IoTDBException e) {
       LOGGER.error("Error streaming query results for queryId={}", queryId, e);
       
listener.error(CallStatus.INTERNAL.withDescription(e.getMessage()).toRuntimeException());
+    } catch (Exception e) {
+      LOGGER.error("Unexpected error streaming query results for queryId={}", 
queryId, e);
+      listener.error(
+          CallStatus.INTERNAL
+              .withDescription("Streaming error: " + e.getMessage())
+              .toRuntimeException());
     } finally {
       coordinator.cleanupQueryExecution(queryId);
       activeQueries.remove(queryId);
diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index 4ef730957e4..06382ea9913 100644
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -246,6 +246,13 @@
             <!--We will integrate rest-jar-with-dependencies into lib by 
assembly plugin-->
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>flight-sql</artifactId>
+            <version>2.0.7-SNAPSHOT</version>
+            <!--We will integrate flight-sql-jar-with-dependencies into lib by 
assembly plugin-->
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.arrow</groupId>
             <artifactId>flight-sql</artifactId>
diff --git a/integration-test/src/assembly/mpp-share.xml 
b/integration-test/src/assembly/mpp-share.xml
index 70072e8282e..74de1ae8b23 100644
--- a/integration-test/src/assembly/mpp-share.xml
+++ b/integration-test/src/assembly/mpp-share.xml
@@ -39,5 +39,9 @@
             
<source>${project.basedir}/../external-service-impl/rest/target/rest-${project.version}-jar-with-dependencies.jar</source>
             <outputDirectory>lib</outputDirectory>
         </file>
+        <file>
+            
<source>${project.basedir}/../external-service-impl/flight-sql/target/flight-sql-${project.version}-jar-with-dependencies.jar</source>
+            <outputDirectory>lib</outputDirectory>
+        </file>
     </files>
 </assembly>
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java
index d931ca77999..b683df9a1c4 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java
@@ -16,9 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.relational.it.flightsql;
 
 import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.isession.SessionConfig;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.TableClusterIT;
@@ -41,8 +43,8 @@ import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -50,49 +52,58 @@ import org.junit.runner.RunWith;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Integration tests for Arrow Flight SQL service in IoTDB. Tests the 
end-to-end flow: client
- * connects via Flight SQL protocol, authenticates, executes SQL queries, and 
receives
- * Arrow-formatted results.
+ * connects via Flight SQL protocol, authenticates via auth2 Bearer token, 
executes SQL queries, and
+ * receives Arrow-formatted results.
+ *
+ * <p>Uses the standard auth2 pattern: ClientIncomingAuthHeaderMiddleware 
intercepts the first
+ * call's response to cache the Bearer token, which is then automatically sent 
on subsequent calls.
+ * All queries use fully qualified table names (database.table) for clarity.
  */
 @RunWith(IoTDBTestRunner.class)
 @Category({TableLocalStandaloneIT.class, TableClusterIT.class})
 public class IoTDBArrowFlightSqlIT {
 
   private static final String DATABASE = "flightsql_test_db";
-  private static final String USER = "root";
-  private static final String PASSWORD = "root";
+  private static final String TABLE = DATABASE + ".test_table";
 
-  private BufferAllocator allocator;
-  private FlightClient flightClient;
-  private FlightSqlClient flightSqlClient;
-  private CredentialCallOption bearerToken;
+  private static BufferAllocator allocator;
+  private static FlightClient flightClient;
+  private static FlightSqlClient flightSqlClient;
+  private static CredentialCallOption credentials;
 
-  @Before
-  public void setUp() throws Exception {
+  @BeforeClass
+  public static void setUpClass() throws Exception {
     // Configure and start the cluster with Arrow Flight SQL enabled
     BaseEnv baseEnv = EnvFactory.getEnv();
-    
baseEnv.getConfig().getDataNodeConfig().setEnableArrowFlightSqlService(true);
+    baseEnv.getConfig().getCommonConfig().setEnableArrowFlightSqlService(true);
     baseEnv.initClusterEnvironment();
 
     // Get the Flight SQL port from the data node
     int port = EnvFactory.getEnv().getArrowFlightSqlPort();
 
-    // Create Arrow allocator and Flight client with Bearer token auth 
middleware
+    // Create Arrow allocator and Flight client with Bearer token auth 
middleware.
+    // The ClientIncomingAuthHeaderMiddleware captures the Bearer token from 
the
+    // server's
+    // response on the first authenticated call, and automatically attaches it 
to
+    // all
+    // subsequent calls — ensuring they reuse the same server-side session.
     allocator = new RootAllocator(Long.MAX_VALUE);
     Location location = Location.forGrpcInsecure("127.0.0.1", port);
-
-    // The ClientIncomingAuthHeaderMiddleware captures the Bearer token from 
the
-    // auth handshake
     ClientIncomingAuthHeaderMiddleware.Factory authFactory =
         new ClientIncomingAuthHeaderMiddleware.Factory(new 
ClientBearerHeaderHandler());
-
     flightClient = FlightClient.builder(allocator, 
location).intercept(authFactory).build();
 
-    // Authenticate: sends Basic credentials, server returns Bearer token
-    bearerToken = new CredentialCallOption(new BasicAuthCredentialWriter(USER, 
PASSWORD));
+    // Create credentials — passed on every call per the auth2 pattern
+    credentials =
+        new CredentialCallOption(
+            new BasicAuthCredentialWriter(
+                SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD));
 
     // Wrap in FlightSqlClient for Flight SQL protocol operations
     flightSqlClient = new FlightSqlClient(flightClient);
@@ -123,8 +134,8 @@ public class IoTDBArrowFlightSqlIT {
     }
   }
 
-  @After
-  public void tearDown() throws Exception {
+  @AfterClass
+  public static void tearDownClass() throws Exception {
     if (flightSqlClient != null) {
       try {
         flightSqlClient.close();
@@ -149,10 +160,11 @@ public class IoTDBArrowFlightSqlIT {
   public void testQueryWithAllDataTypes() throws Exception {
     FlightInfo flightInfo =
         flightSqlClient.execute(
-            "SELECT time, id1, s1, s2, s3, s4, s5, s6 FROM test_table ORDER BY 
time", bearerToken);
+            "SELECT time, id1, s1, s2, s3, s4, s5, s6 FROM " + TABLE + " ORDER 
BY time",
+            credentials);
 
     // Validate schema
-    Schema schema = flightInfo.getSchema();
+    Schema schema = flightInfo.getSchemaOptional().orElse(null);
     assertNotNull("Schema should not be null", schema);
     List<Field> fields = schema.getFields();
     assertEquals("Should have 8 columns", 8, fields.size());
@@ -166,7 +178,7 @@ public class IoTDBArrowFlightSqlIT {
   public void testQueryWithFilter() throws Exception {
     FlightInfo flightInfo =
         flightSqlClient.execute(
-            "SELECT id1, s1 FROM test_table WHERE id1 = 'device1' ORDER BY 
time", bearerToken);
+            "SELECT id1, s1 FROM " + TABLE + " WHERE id1 = 'device1' ORDER BY 
time", credentials);
 
     List<List<String>> rows = fetchAllRows(flightInfo);
     assertEquals("Should have 2 rows for device1", 2, rows.size());
@@ -177,8 +189,10 @@ public class IoTDBArrowFlightSqlIT {
     FlightInfo flightInfo =
         flightSqlClient.execute(
             "SELECT id1, COUNT(*) as cnt, SUM(s1) as s1_sum "
-                + "FROM test_table GROUP BY id1 ORDER BY id1",
-            bearerToken);
+                + "FROM "
+                + TABLE
+                + " GROUP BY id1 ORDER BY id1",
+            credentials);
 
     List<List<String>> rows = fetchAllRows(flightInfo);
     assertEquals("Should have 2 groups", 2, rows.size());
@@ -187,7 +201,8 @@ public class IoTDBArrowFlightSqlIT {
   @Test
   public void testEmptyResult() throws Exception {
     FlightInfo flightInfo =
-        flightSqlClient.execute("SELECT * FROM test_table WHERE id1 = 
'nonexistent'", bearerToken);
+        flightSqlClient.execute(
+            "SELECT * FROM " + TABLE + " WHERE id1 = 'nonexistent'", 
credentials);
 
     List<List<String>> rows = fetchAllRows(flightInfo);
     assertEquals("Should have 0 rows", 0, rows.size());
@@ -195,7 +210,7 @@ public class IoTDBArrowFlightSqlIT {
 
   @Test
   public void testShowDatabases() throws Exception {
-    FlightInfo flightInfo = flightSqlClient.execute("SHOW DATABASES", 
bearerToken);
+    FlightInfo flightInfo = flightSqlClient.execute("SHOW DATABASES", 
credentials);
 
     List<List<String>> rows = fetchAllRows(flightInfo);
     assertTrue("Should have at least 1 database", rows.size() >= 1);
@@ -219,7 +234,7 @@ public class IoTDBArrowFlightSqlIT {
   private List<List<String>> fetchAllRows(FlightInfo flightInfo) throws 
Exception {
     List<List<String>> rows = new ArrayList<>();
     for (FlightEndpoint endpoint : flightInfo.getEndpoints()) {
-      try (FlightStream stream = 
flightSqlClient.getStream(endpoint.getTicket(), bearerToken)) {
+      try (FlightStream stream = 
flightSqlClient.getStream(endpoint.getTicket(), credentials)) {
         while (stream.next()) {
           VectorSchemaRoot root = stream.getRoot();
           int rowCount = root.getRowCount();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 6730138b2af..694a39618e7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -185,7 +185,8 @@ public class IoTDBDescriptor {
   public static URL getPropsUrl(String configFileName) {
     String urlString = commonDescriptor.getConfDir();
     if (urlString == null) {
-      // If urlString wasn't provided, try to find a default config in the 
root of the classpath.
+      // If urlString wasn't provided, try to find a default config in the 
root of the
+      // classpath.
       URL uri = IoTDBConfig.class.getResource("/" + configFileName);
       if (uri != null) {
         return uri;
@@ -204,7 +205,8 @@ public class IoTDBDescriptor {
       urlString += (File.separatorChar + configFileName);
     }
 
-    // If the url doesn't start with "file:" or "classpath:", it's provided as 
a no path.
+    // If the url doesn't start with "file:" or "classpath:", it's provided as 
a no
+    // path.
     // So we need to add it to make it a real URL.
     if (!urlString.startsWith("file:") && !urlString.startsWith("classpath:")) 
{
       urlString = "file:" + urlString;
@@ -880,6 +882,9 @@ public class IoTDBDescriptor {
     // mqtt
     loadMqttProps(properties);
 
+    // arrow flight sql
+    loadArrowFlightSqlProps(properties);
+
     conf.setIntoOperationBufferSizeInByte(
         Long.parseLong(
             properties.getProperty(
@@ -1724,7 +1729,8 @@ public class IoTDBDescriptor {
       }
     }
     newThrottleThreshold = (long) (newThrottleThreshold * dirUseProportion * 
walFileStores.size());
-    // the new throttle threshold should between MIN_THROTTLE_THRESHOLD and 
MAX_THROTTLE_THRESHOLD
+    // the new throttle threshold should between MIN_THROTTLE_THRESHOLD and
+    // MAX_THROTTLE_THRESHOLD
     return Math.max(Math.min(newThrottleThreshold, MAX_THROTTLE_THRESHOLD), 
MIN_THROTTLE_THRESHOLD);
   }
 
@@ -1942,6 +1948,20 @@ public class IoTDBDescriptor {
     }
   }
 
+  // Arrow Flight SQL related
+  private void loadArrowFlightSqlProps(TrimProperties properties) {
+    if (properties.getProperty(IoTDBConstant.ENABLE_ARROW_FLIGHT_SQL) != null) 
{
+      conf.setEnableArrowFlightSqlService(
+          Boolean.parseBoolean(
+              
properties.getProperty(IoTDBConstant.ENABLE_ARROW_FLIGHT_SQL).trim()));
+    }
+
+    if (properties.getProperty(IoTDBConstant.ARROW_FLIGHT_SQL_PORT) != null) {
+      conf.setArrowFlightSqlPort(
+          
Integer.parseInt(properties.getProperty(IoTDBConstant.ARROW_FLIGHT_SQL_PORT).trim()));
+    }
+  }
+
   // timed flush memtable
   private void loadTimedService(TrimProperties properties) throws IOException {
     conf.setEnableTimedFlushSeqMemtable(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index a94f472b606..b96389e0309 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -289,6 +289,10 @@ public class IoTDBConstant {
   public static final String MQTT_DATA_PATH = "mqtt_data_path";
   public static final String MQTT_MAX_MESSAGE_SIZE = "mqtt_max_message_size";
 
+  // arrow flight sql
+  public static final String ENABLE_ARROW_FLIGHT_SQL = 
"enable_arrow_flight_sql_service";
+  public static final String ARROW_FLIGHT_SQL_PORT = "arrow_flight_sql_port";
+
   // thrift
   public static final int DEFAULT_FETCH_SIZE = 5000;
   public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 0;

Reply via email to