This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ai-code/flight-sql in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 835b516cab32c6a9f3e1722581a900fa6b672222 Author: JackieTien97 <[email protected]> AuthorDate: Tue Feb 24 12:13:00 2026 +0800 partial fix --- .../apache/iotdb/flight/FlightSqlAuthHandler.java | 2 +- .../iotdb/flight/IoTDBFlightSqlProducer.java | 53 ++++++++++++--- integration-test/pom.xml | 7 ++ integration-test/src/assembly/mpp-share.xml | 4 ++ .../it/flightsql/IoTDBArrowFlightSqlIT.java | 77 +++++++++++++--------- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 26 +++++++- .../apache/iotdb/commons/conf/IoTDBConstant.java | 4 ++ 7 files changed, 130 insertions(+), 43 deletions(-) diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java index b591665eab0..cb8faee54b4 100644 --- a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java +++ b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java @@ -48,7 +48,7 @@ public class FlightSqlAuthHandler implements BasicCallHeaderAuthenticator.Creden LOGGER.debug("Validating credentials for user: {}", username); try { - String token = sessionManager.authenticate(username, password, "unknown"); + String token = sessionManager.authenticate(username, password, "127.0.0.1"); // Return the token as the peer identity; GeneratedBearerTokenAuthenticator // wraps it in a Bearer token and sets it in the response header. return () -> token; diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java index 067ccfb20d4..fa8286db3bf 100644 --- a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java +++ b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java @@ -54,7 +54,6 @@ import org.apache.tsfile.read.common.block.TsBlock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.charset.StandardCharsets; import java.time.ZoneId; import java.util.Collections; import java.util.Optional; @@ -109,7 +108,7 @@ public class IoTDBFlightSqlProducer implements FlightSqlProducer { public FlightInfo getFlightInfoStatement( FlightSql.CommandStatementQuery command, CallContext context, FlightDescriptor descriptor) { String sql = command.getQuery(); - LOGGER.debug("getFlightInfoStatement: {}", sql); + LOGGER.warn("getFlightInfoStatement called with SQL: {}", sql); IClientSession session = getSessionFromContext(context); @@ -120,6 +119,18 @@ public class IoTDBFlightSqlProducer implements FlightSqlProducer { // Parse the SQL statement Statement statement = sqlParser.createStatement(sql, ZoneId.systemDefault(), session); + // Handle USE database statement: set database on session and return empty + // result + if (statement instanceof org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use) { + org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use useStmt = + (org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use) statement; + String dbName = useStmt.getDatabaseId().getValue(); + session.setDatabaseName(dbName); + LOGGER.info("Flight SQL session database set to: {}", dbName); + Schema emptySchema = new Schema(Collections.emptyList()); + return new FlightInfo(emptySchema, descriptor, Collections.emptyList(), -1, -1); + } + // Execute via Coordinator (Table model) ExecutionResult result = coordinator.executeForTableModel( @@ -153,20 +164,31 @@ public class IoTDBFlightSqlProducer implements FlightSqlProducer { // Store the query context for later getStream calls activeQueries.put(queryId, new QueryContext(queryExecution, header, session)); - // Build ticket containing the queryId - byte[] ticketBytes = Long.toString(queryId).getBytes(StandardCharsets.UTF_8); - Ticket ticket = new Ticket(ticketBytes); + // Build ticket containing the queryId as a TicketStatementQuery protobuf. + // FlightSqlClient.getStream() expects this format to route DoGet to + // getStreamStatement(). + ByteString handle = ByteString.copyFromUtf8(Long.toString(queryId)); + FlightSql.TicketStatementQuery ticketQuery = + FlightSql.TicketStatementQuery.newBuilder().setStatementHandle(handle).build(); + Ticket ticket = new Ticket(com.google.protobuf.Any.pack(ticketQuery).toByteArray()); FlightEndpoint endpoint = new FlightEndpoint(ticket); return new FlightInfo(arrowSchema, descriptor, Collections.singletonList(endpoint), -1, -1); - } catch (RuntimeException e) { + } catch (Exception e) { // Cleanup on error if (queryId != null) { coordinator.cleanupQueryExecution(queryId); activeQueries.remove(queryId); } - throw e; + LOGGER.error("Error executing Flight SQL query: {}", sql, e); + if (e instanceof org.apache.arrow.flight.FlightRuntimeException) { + throw (org.apache.arrow.flight.FlightRuntimeException) e; + } + throw CallStatus.INTERNAL + .withDescription("Query execution error: " + e.getMessage()) + .withCause(e) + .toRuntimeException(); } } @@ -185,7 +207,16 @@ public class IoTDBFlightSqlProducer implements FlightSqlProducer { ServerStreamListener listener) { ByteString handle = ticketQuery.getStatementHandle(); long queryId = Long.parseLong(handle.toStringUtf8()); - streamQueryResults(queryId, listener); + LOGGER.warn("getStreamStatement called for queryId={}", queryId); + try { + streamQueryResults(queryId, listener); + } catch (Exception e) { + LOGGER.error("getStreamStatement failed for queryId={}", queryId, e); + listener.error( + CallStatus.INTERNAL + .withDescription("getStreamStatement error: " + e.getMessage()) + .toRuntimeException()); + } } /** Streams query results for a given queryId as Arrow VectorSchemaRoot batches. */ @@ -220,6 +251,12 @@ public class IoTDBFlightSqlProducer implements FlightSqlProducer { } catch (IoTDBException e) { LOGGER.error("Error streaming query results for queryId={}", queryId, e); listener.error(CallStatus.INTERNAL.withDescription(e.getMessage()).toRuntimeException()); + } catch (Exception e) { + LOGGER.error("Unexpected error streaming query results for queryId={}", queryId, e); + listener.error( + CallStatus.INTERNAL + .withDescription("Streaming error: " + e.getMessage()) + .toRuntimeException()); } finally { coordinator.cleanupQueryExecution(queryId); activeQueries.remove(queryId); diff --git a/integration-test/pom.xml b/integration-test/pom.xml index 4ef730957e4..06382ea9913 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -246,6 +246,13 @@ <!--We will integrate rest-jar-with-dependencies into lib by assembly plugin--> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.iotdb</groupId> + <artifactId>flight-sql</artifactId> + <version>2.0.7-SNAPSHOT</version> + <!--We will integrate flight-sql-jar-with-dependencies into lib by assembly plugin--> + <scope>provided</scope> + </dependency> <dependency> <groupId>org.apache.arrow</groupId> <artifactId>flight-sql</artifactId> diff --git a/integration-test/src/assembly/mpp-share.xml b/integration-test/src/assembly/mpp-share.xml index 70072e8282e..74de1ae8b23 100644 --- a/integration-test/src/assembly/mpp-share.xml +++ b/integration-test/src/assembly/mpp-share.xml @@ -39,5 +39,9 @@ <source>${project.basedir}/../external-service-impl/rest/target/rest-${project.version}-jar-with-dependencies.jar</source> <outputDirectory>lib</outputDirectory> </file> + <file> + <source>${project.basedir}/../external-service-impl/flight-sql/target/flight-sql-${project.version}-jar-with-dependencies.jar</source> + <outputDirectory>lib</outputDirectory> + </file> </files> </assembly> diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java index d931ca77999..b683df9a1c4 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java @@ -16,9 +16,11 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.relational.it.flightsql; import org.apache.iotdb.isession.ITableSession; +import org.apache.iotdb.isession.SessionConfig; import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.TableClusterIT; @@ -41,8 +43,8 @@ import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; -import org.junit.After; -import org.junit.Before; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -50,49 +52,58 @@ import org.junit.runner.RunWith; import java.util.ArrayList; import java.util.List; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; /** * Integration tests for Arrow Flight SQL service in IoTDB. Tests the end-to-end flow: client - * connects via Flight SQL protocol, authenticates, executes SQL queries, and receives - * Arrow-formatted results. + * connects via Flight SQL protocol, authenticates via auth2 Bearer token, executes SQL queries, and + * receives Arrow-formatted results. + * + * <p>Uses the standard auth2 pattern: ClientIncomingAuthHeaderMiddleware intercepts the first + * call's response to cache the Bearer token, which is then automatically sent on subsequent calls. + * All queries use fully qualified table names (database.table) for clarity. */ @RunWith(IoTDBTestRunner.class) @Category({TableLocalStandaloneIT.class, TableClusterIT.class}) public class IoTDBArrowFlightSqlIT { private static final String DATABASE = "flightsql_test_db"; - private static final String USER = "root"; - private static final String PASSWORD = "root"; + private static final String TABLE = DATABASE + ".test_table"; - private BufferAllocator allocator; - private FlightClient flightClient; - private FlightSqlClient flightSqlClient; - private CredentialCallOption bearerToken; + private static BufferAllocator allocator; + private static FlightClient flightClient; + private static FlightSqlClient flightSqlClient; + private static CredentialCallOption credentials; - @Before - public void setUp() throws Exception { + @BeforeClass + public static void setUpClass() throws Exception { // Configure and start the cluster with Arrow Flight SQL enabled BaseEnv baseEnv = EnvFactory.getEnv(); - baseEnv.getConfig().getDataNodeConfig().setEnableArrowFlightSqlService(true); + baseEnv.getConfig().getCommonConfig().setEnableArrowFlightSqlService(true); baseEnv.initClusterEnvironment(); // Get the Flight SQL port from the data node int port = EnvFactory.getEnv().getArrowFlightSqlPort(); - // Create Arrow allocator and Flight client with Bearer token auth middleware + // Create Arrow allocator and Flight client with Bearer token auth middleware. + // The ClientIncomingAuthHeaderMiddleware captures the Bearer token from the + // server's + // response on the first authenticated call, and automatically attaches it to + // all + // subsequent calls — ensuring they reuse the same server-side session. allocator = new RootAllocator(Long.MAX_VALUE); Location location = Location.forGrpcInsecure("127.0.0.1", port); - - // The ClientIncomingAuthHeaderMiddleware captures the Bearer token from the - // auth handshake ClientIncomingAuthHeaderMiddleware.Factory authFactory = new ClientIncomingAuthHeaderMiddleware.Factory(new ClientBearerHeaderHandler()); - flightClient = FlightClient.builder(allocator, location).intercept(authFactory).build(); - // Authenticate: sends Basic credentials, server returns Bearer token - bearerToken = new CredentialCallOption(new BasicAuthCredentialWriter(USER, PASSWORD)); + // Create credentials — passed on every call per the auth2 pattern + credentials = + new CredentialCallOption( + new BasicAuthCredentialWriter( + SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD)); // Wrap in FlightSqlClient for Flight SQL protocol operations flightSqlClient = new FlightSqlClient(flightClient); @@ -123,8 +134,8 @@ public class IoTDBArrowFlightSqlIT { } } - @After - public void tearDown() throws Exception { + @AfterClass + public static void tearDownClass() throws Exception { if (flightSqlClient != null) { try { flightSqlClient.close(); @@ -149,10 +160,11 @@ public class IoTDBArrowFlightSqlIT { public void testQueryWithAllDataTypes() throws Exception { FlightInfo flightInfo = flightSqlClient.execute( - "SELECT time, id1, s1, s2, s3, s4, s5, s6 FROM test_table ORDER BY time", bearerToken); + "SELECT time, id1, s1, s2, s3, s4, s5, s6 FROM " + TABLE + " ORDER BY time", + credentials); // Validate schema - Schema schema = flightInfo.getSchema(); + Schema schema = flightInfo.getSchemaOptional().orElse(null); assertNotNull("Schema should not be null", schema); List<Field> fields = schema.getFields(); assertEquals("Should have 8 columns", 8, fields.size()); @@ -166,7 +178,7 @@ public class IoTDBArrowFlightSqlIT { public void testQueryWithFilter() throws Exception { FlightInfo flightInfo = flightSqlClient.execute( - "SELECT id1, s1 FROM test_table WHERE id1 = 'device1' ORDER BY time", bearerToken); + "SELECT id1, s1 FROM " + TABLE + " WHERE id1 = 'device1' ORDER BY time", credentials); List<List<String>> rows = fetchAllRows(flightInfo); assertEquals("Should have 2 rows for device1", 2, rows.size()); @@ -177,8 +189,10 @@ public class IoTDBArrowFlightSqlIT { FlightInfo flightInfo = flightSqlClient.execute( "SELECT id1, COUNT(*) as cnt, SUM(s1) as s1_sum " - + "FROM test_table GROUP BY id1 ORDER BY id1", - bearerToken); + + "FROM " + + TABLE + + " GROUP BY id1 ORDER BY id1", + credentials); List<List<String>> rows = fetchAllRows(flightInfo); assertEquals("Should have 2 groups", 2, rows.size()); @@ -187,7 +201,8 @@ public class IoTDBArrowFlightSqlIT { @Test public void testEmptyResult() throws Exception { FlightInfo flightInfo = - flightSqlClient.execute("SELECT * FROM test_table WHERE id1 = 'nonexistent'", bearerToken); + flightSqlClient.execute( + "SELECT * FROM " + TABLE + " WHERE id1 = 'nonexistent'", credentials); List<List<String>> rows = fetchAllRows(flightInfo); assertEquals("Should have 0 rows", 0, rows.size()); @@ -195,7 +210,7 @@ public class IoTDBArrowFlightSqlIT { @Test public void testShowDatabases() throws Exception { - FlightInfo flightInfo = flightSqlClient.execute("SHOW DATABASES", bearerToken); + FlightInfo flightInfo = flightSqlClient.execute("SHOW DATABASES", credentials); List<List<String>> rows = fetchAllRows(flightInfo); assertTrue("Should have at least 1 database", rows.size() >= 1); @@ -219,7 +234,7 @@ public class IoTDBArrowFlightSqlIT { private List<List<String>> fetchAllRows(FlightInfo flightInfo) throws Exception { List<List<String>> rows = new ArrayList<>(); for (FlightEndpoint endpoint : flightInfo.getEndpoints()) { - try (FlightStream stream = flightSqlClient.getStream(endpoint.getTicket(), bearerToken)) { + try (FlightStream stream = flightSqlClient.getStream(endpoint.getTicket(), credentials)) { while (stream.next()) { VectorSchemaRoot root = stream.getRoot(); int rowCount = root.getRowCount(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 6730138b2af..694a39618e7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -185,7 +185,8 @@ public class IoTDBDescriptor { public static URL getPropsUrl(String configFileName) { String urlString = commonDescriptor.getConfDir(); if (urlString == null) { - // If urlString wasn't provided, try to find a default config in the root of the classpath. + // If urlString wasn't provided, try to find a default config in the root of the + // classpath. URL uri = IoTDBConfig.class.getResource("/" + configFileName); if (uri != null) { return uri; @@ -204,7 +205,8 @@ public class IoTDBDescriptor { urlString += (File.separatorChar + configFileName); } - // If the url doesn't start with "file:" or "classpath:", it's provided as a no path. + // If the url doesn't start with "file:" or "classpath:", it's provided as a no + // path. // So we need to add it to make it a real URL. if (!urlString.startsWith("file:") && !urlString.startsWith("classpath:")) { urlString = "file:" + urlString; @@ -880,6 +882,9 @@ public class IoTDBDescriptor { // mqtt loadMqttProps(properties); + // arrow flight sql + loadArrowFlightSqlProps(properties); + conf.setIntoOperationBufferSizeInByte( Long.parseLong( properties.getProperty( @@ -1724,7 +1729,8 @@ public class IoTDBDescriptor { } } newThrottleThreshold = (long) (newThrottleThreshold * dirUseProportion * walFileStores.size()); - // the new throttle threshold should between MIN_THROTTLE_THRESHOLD and MAX_THROTTLE_THRESHOLD + // the new throttle threshold should between MIN_THROTTLE_THRESHOLD and + // MAX_THROTTLE_THRESHOLD return Math.max(Math.min(newThrottleThreshold, MAX_THROTTLE_THRESHOLD), MIN_THROTTLE_THRESHOLD); } @@ -1942,6 +1948,20 @@ public class IoTDBDescriptor { } } + // Arrow Flight SQL related + private void loadArrowFlightSqlProps(TrimProperties properties) { + if (properties.getProperty(IoTDBConstant.ENABLE_ARROW_FLIGHT_SQL) != null) { + conf.setEnableArrowFlightSqlService( + Boolean.parseBoolean( + properties.getProperty(IoTDBConstant.ENABLE_ARROW_FLIGHT_SQL).trim())); + } + + if (properties.getProperty(IoTDBConstant.ARROW_FLIGHT_SQL_PORT) != null) { + conf.setArrowFlightSqlPort( + Integer.parseInt(properties.getProperty(IoTDBConstant.ARROW_FLIGHT_SQL_PORT).trim())); + } + } + // timed flush memtable private void loadTimedService(TrimProperties properties) throws IOException { conf.setEnableTimedFlushSeqMemtable( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java index a94f472b606..b96389e0309 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java @@ -289,6 +289,10 @@ public class IoTDBConstant { public static final String MQTT_DATA_PATH = "mqtt_data_path"; public static final String MQTT_MAX_MESSAGE_SIZE = "mqtt_max_message_size"; + // arrow flight sql + public static final String ENABLE_ARROW_FLIGHT_SQL = "enable_arrow_flight_sql_service"; + public static final String ARROW_FLIGHT_SQL_PORT = "arrow_flight_sql_port"; + // thrift public static final int DEFAULT_FETCH_SIZE = 5000; public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 0;
