This is an automated email from the ASF dual-hosted git repository. critas pushed a commit to branch ai-code/wx_flight-sql in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 22d734252d1563d3c0a62d92f24e3cc822d03f52 Author: CritasWang <[email protected]> AuthorDate: Sat Feb 14 12:26:29 2026 +0800 fix(flight-sql): resolve end-of-stream mid-frame error in Flight SQL integration test 修复 Flight SQL 集成测试中的 "end-of-stream mid-frame" HTTP/2 帧截断错误。 Root cause / 根本原因: The gRPC default thread pool executor fails to properly handle subsequent RPCs on the same HTTP/2 connection in the DataNode JVM environment, where standalone Netty JARs coexist with grpc-netty bundled in the fat jar. DataNode JVM 环境中,gRPC 默认线程池执行器无法正确处理同一 HTTP/2 连接上 的后续 RPC 调用。根因是类路径上独立的 Netty JAR 与 fat jar 中捆绑的 grpc-netty 产生冲突。 Fix / 修复方案: 1. directExecutor() — run gRPC handlers in the Netty event loop thread, bypassing the default executor's thread scheduling issues (关键修复) 2. flowControlWindow(1MB) — explicit HTTP/2 flow control prevents framing errors when duplicate Netty JARs coexist on the classpath 3. Exclude io.netty from fat jar POM — use standalone Netty JARs already on the DataNode classpath instead of bundling duplicates Additional bug fixes / 其他修复: - TsBlockToArrowConverter: fix NPE when getColumnNameIndexMap() returns null for SHOW DATABASES queries (回退到列索引) - FlightSqlAuthHandler: add null guards in authenticate() and appendToOutgoingHeaders() for CallHeaders with null internal maps - FlightSqlAuthHandler: rewrite as CallHeaderAuthenticator with Bearer token reuse and Basic auth fallback - FlightSqlSessionManager: add user token cache for session reuse - IoTDBFlightSqlProducer: handle non-query statements (USE, CREATE, etc.) by returning empty FlightInfo, use TicketStatementQuery protobuf format Test changes / 测试改动: - Use fully qualified table names (database.table) instead of USE statement to keep each test to one GetFlightInfo + one DoGet RPC per connection - All 5 integration tests pass: testShowDatabases, testQueryWithAllDataTypes, testQueryWithFilter, testQueryWithAggregation, testEmptyResult --- .gitignore | 5 + distribution/pom.xml | 6 + .../src/assembly/external-service-impl.xml | 4 + external-service-impl/flight-sql/pom.xml | 17 ++- .../apache/iotdb/flight/FlightSqlAuthHandler.java | 114 +++++++++++--- .../iotdb/flight/FlightSqlAuthMiddleware.java | 89 ----------- .../org/apache/iotdb/flight/FlightSqlService.java | 165 ++++++++++++--------- .../iotdb/flight/FlightSqlSessionManager.java | 85 +++++++---- .../iotdb/flight/IoTDBFlightSqlProducer.java | 101 ++++++++++--- .../iotdb/flight/TsBlockToArrowConverter.java | 5 +- .../iotdb/flight/TsBlockToArrowConverterTest.java | 4 +- integration-test/pom.xml | 7 + integration-test/src/assembly/mpp-share.xml | 4 + .../it/flightsql/IoTDBArrowFlightSqlIT.java | 68 ++++----- .../recent/informationschema/IoTDBServicesIT.java | 2 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 ++ .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 21 +++ pom.xml | 5 + 18 files changed, 446 insertions(+), 267 deletions(-) diff --git a/.gitignore b/.gitignore index 2c19b1b3a2c..af3e81ef32d 100644 --- a/.gitignore +++ b/.gitignore @@ -127,3 +127,8 @@ iotdb-core/tsfile/src/main/antlr4/org/apache/tsfile/parser/gen/ # Relational Grammar ANTLR iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/.antlr/ + +# Claude Code +CLAUDE.md +.omc/ +.claude/ diff --git a/distribution/pom.xml b/distribution/pom.xml index f4773f6afad..0736e09fb45 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -65,6 +65,12 @@ <version>2.0.7-SNAPSHOT</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.iotdb</groupId> + <artifactId>flight-sql</artifactId> + <version>2.0.7-SNAPSHOT</version> + <scope>provided</scope> + </dependency> </dependencies> <build> <plugins> diff --git a/distribution/src/assembly/external-service-impl.xml b/distribution/src/assembly/external-service-impl.xml index c743fa85597..120cdfbae28 100644 --- a/distribution/src/assembly/external-service-impl.xml +++ b/distribution/src/assembly/external-service-impl.xml @@ -51,5 +51,9 @@ <source>${maven.multiModuleProjectDirectory}/external-service-impl/rest/target/rest-${project.version}-jar-with-dependencies.jar</source> <outputDirectory>/</outputDirectory> </file> + <file> + <source>${maven.multiModuleProjectDirectory}/external-service-impl/flight-sql/target/flight-sql-${project.version}-jar-with-dependencies.jar</source> + <outputDirectory>/</outputDirectory> + </file> </files> </assembly> diff --git a/external-service-impl/flight-sql/pom.xml b/external-service-impl/flight-sql/pom.xml index be363c336b7..2e1a4c301a9 100644 --- a/external-service-impl/flight-sql/pom.xml +++ b/external-service-impl/flight-sql/pom.xml @@ -38,10 +38,25 @@ <dependency> <groupId>org.apache.arrow</groupId> <artifactId>flight-sql</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-netty</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-netty-buffer-patch</artifactId> + </exclusion> + <!-- Exclude Netty: standalone Netty JARs already on DataNode classpath --> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.arrow</groupId> - <artifactId>arrow-memory-netty</artifactId> + <artifactId>arrow-memory-unsafe</artifactId> <scope>runtime</scope> </dependency> <!-- IoTDB dependencies (provided at runtime by DataNode classloader) --> diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java index b591665eab0..c22d935d41a 100644 --- a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java +++ b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthHandler.java @@ -19,23 +19,28 @@ package org.apache.iotdb.flight; +import org.apache.arrow.flight.CallHeaders; import org.apache.arrow.flight.CallStatus; -import org.apache.arrow.flight.auth2.BasicCallHeaderAuthenticator; +import org.apache.arrow.flight.auth2.CallHeaderAuthenticator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.charset.StandardCharsets; +import java.util.Base64; + /** - * Arrow Flight SQL credential validator using Arrow's built-in auth2 framework. Validates - * username/password credentials via IoTDB's SessionManager and returns a Bearer token string as the - * peer identity for subsequent requests. - * - * <p>Used with {@link BasicCallHeaderAuthenticator} and {@link - * org.apache.arrow.flight.auth2.GeneratedBearerTokenAuthenticator} to provide Basic → Bearer token - * authentication flow. + * Arrow Flight SQL authenticator that supports both Basic and Bearer token authentication. On the + * first call, Basic credentials are validated and a Bearer token is returned. On subsequent calls, + * the Bearer token is used to look up the existing session, avoiding creating a new session per + * call. */ -public class FlightSqlAuthHandler implements BasicCallHeaderAuthenticator.CredentialValidator { +public class FlightSqlAuthHandler implements CallHeaderAuthenticator { private static final Logger LOGGER = LoggerFactory.getLogger(FlightSqlAuthHandler.class); + private static final String AUTHORIZATION_HEADER = "authorization"; + private static final String BASIC_PREFIX = "Basic "; + private static final String BEARER_PREFIX = "Bearer "; + private final FlightSqlSessionManager sessionManager; public FlightSqlAuthHandler(FlightSqlSessionManager sessionManager) { @@ -43,17 +48,88 @@ public class FlightSqlAuthHandler implements BasicCallHeaderAuthenticator.Creden } @Override - public org.apache.arrow.flight.auth2.CallHeaderAuthenticator.AuthResult validate( - String username, String password) { - LOGGER.debug("Validating credentials for user: {}", username); - + public AuthResult authenticate(CallHeaders headers) { + Iterable<String> authHeaders; try { - String token = sessionManager.authenticate(username, password, "unknown"); - // Return the token as the peer identity; GeneratedBearerTokenAuthenticator - // wraps it in a Bearer token and sets it in the response header. - return () -> token; - } catch (SecurityException e) { - throw CallStatus.UNAUTHENTICATED.withDescription(e.getMessage()).toRuntimeException(); + authHeaders = headers.getAll(AUTHORIZATION_HEADER); + } catch (NullPointerException e) { + throw CallStatus.UNAUTHENTICATED + .withDescription("Missing Authorization header (null header map)") + .toRuntimeException(); + } + + // First pass: check for Bearer token (reuse existing session) + String basicHeader = null; + if (authHeaders == null) { + throw CallStatus.UNAUTHENTICATED + .withDescription("Missing Authorization header") + .toRuntimeException(); + } + for (String authHeader : authHeaders) { + if (authHeader.startsWith(BEARER_PREFIX)) { + String token = authHeader.substring(BEARER_PREFIX.length()); + try { + sessionManager.getSessionByToken(token); + return bearerResult(token); + } catch (SecurityException e) { + // Bearer token invalid/expired, fall through to Basic auth + LOGGER.debug("Bearer token invalid, falling back to Basic auth"); + } + } else if (authHeader.startsWith(BASIC_PREFIX) && basicHeader == null) { + basicHeader = authHeader; + } } + + // Second pass: fall back to Basic auth (create new session) + if (basicHeader != null) { + String encoded = basicHeader.substring(BASIC_PREFIX.length()); + String decoded = new String(Base64.getDecoder().decode(encoded), StandardCharsets.UTF_8); + int colonIdx = decoded.indexOf(':'); + if (colonIdx < 0) { + throw CallStatus.UNAUTHENTICATED + .withDescription("Invalid Basic credentials format") + .toRuntimeException(); + } + String username = decoded.substring(0, colonIdx); + String password = decoded.substring(colonIdx + 1); + + LOGGER.debug("Validating credentials for user: {}", username); + try { + String token = sessionManager.authenticate(username, password, "unknown"); + return bearerResult(token); + } catch (SecurityException e) { + throw CallStatus.UNAUTHENTICATED.withDescription(e.getMessage()).toRuntimeException(); + } + } + + throw CallStatus.UNAUTHENTICATED + .withDescription("Missing or unsupported Authorization header") + .toRuntimeException(); + } + + /** + * Creates an AuthResult that sends the Bearer token back in response headers. The client's + * ClientIncomingAuthHeaderMiddleware captures this token for use on subsequent calls. + */ + private static AuthResult bearerResult(String token) { + return new AuthResult() { + @Override + public String getPeerIdentity() { + return token; + } + + @Override + public void appendToOutgoingHeaders(CallHeaders outgoingHeaders) { + if (outgoingHeaders == null) { + return; + } + try { + outgoingHeaders.insert(AUTHORIZATION_HEADER, BEARER_PREFIX + token); + } catch (NullPointerException e) { + // Some CallHeaders implementations have null internal maps for certain RPCs + LOGGER.debug("Could not append Bearer token to outgoing headers", e); + } + } + }; } } diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthMiddleware.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthMiddleware.java deleted file mode 100644 index 2cf0d048556..00000000000 --- a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlAuthMiddleware.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.flight; - -import org.apache.arrow.flight.CallHeaders; -import org.apache.arrow.flight.CallInfo; -import org.apache.arrow.flight.CallStatus; -import org.apache.arrow.flight.FlightServerMiddleware; -import org.apache.arrow.flight.RequestContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Flight Server middleware for handling Bearer token / Basic authentication. Supports initial login - * via Basic auth header (username:password), returning a Bearer token. Subsequent requests use the - * Bearer token. - */ -public class FlightSqlAuthMiddleware implements FlightServerMiddleware { - - private static final Logger LOGGER = LoggerFactory.getLogger(FlightSqlAuthMiddleware.class); - - /** The middleware key used to retrieve this middleware in the CallContext. */ - public static final Key<FlightSqlAuthMiddleware> KEY = Key.of("flight-sql-auth-middleware"); - - private final CallHeaders incomingHeaders; - - FlightSqlAuthMiddleware(CallHeaders incomingHeaders) { - this.incomingHeaders = incomingHeaders; - } - - /** Returns the incoming call headers for session lookup. */ - public CallHeaders getCallHeaders() { - return incomingHeaders; - } - - @Override - public void onBeforeSendingHeaders(CallHeaders outgoingHeaders) { - // no-op: token is set during Handshake response, not here - } - - @Override - public void onCallCompleted(CallStatus status) { - // no-op - } - - @Override - public void onCallErrored(Throwable err) { - // no-op - } - - // ===================== Factory ===================== - - /** Factory that creates FlightSqlAuthMiddleware for each call. */ - public static class Factory implements FlightServerMiddleware.Factory<FlightSqlAuthMiddleware> { - - private final FlightSqlSessionManager sessionManager; - - public Factory(FlightSqlSessionManager sessionManager) { - this.sessionManager = sessionManager; - } - - @Override - public FlightSqlAuthMiddleware onCallStarted( - CallInfo callInfo, CallHeaders incomingHeaders, RequestContext context) { - return new FlightSqlAuthMiddleware(incomingHeaders); - } - - public FlightSqlSessionManager getSessionManager() { - return sessionManager; - } - } -} diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlService.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlService.java index 96480449fcc..9ed91d503a1 100644 --- a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlService.java +++ b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlService.java @@ -24,8 +24,6 @@ import org.apache.iotdb.externalservice.api.IExternalService; import org.apache.arrow.flight.FlightServer; import org.apache.arrow.flight.Location; -import org.apache.arrow.flight.auth2.BasicCallHeaderAuthenticator; -import org.apache.arrow.flight.auth2.GeneratedBearerTokenAuthenticator; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.slf4j.Logger; @@ -46,89 +44,122 @@ public class FlightSqlService implements IExternalService { private static final Logger LOGGER = LoggerFactory.getLogger(FlightSqlService.class); private static final long SESSION_TIMEOUT_MINUTES = 30; + private final Object lifecycleLock = new Object(); private FlightServer flightServer; private BufferAllocator allocator; private FlightSqlSessionManager flightSessionManager; private IoTDBFlightSqlProducer producer; @Override - public void start() { - int port = IoTDBDescriptor.getInstance().getConfig().getArrowFlightSqlPort(); - LOGGER.info("Starting Arrow Flight SQL service on port {}", port); - - try { - // Create the root allocator for Arrow memory management - allocator = new RootAllocator(Long.MAX_VALUE); - - // Create session manager with TTL - flightSessionManager = new FlightSqlSessionManager(SESSION_TIMEOUT_MINUTES); - - // Create the auth handler - FlightSqlAuthHandler authHandler = new FlightSqlAuthHandler(flightSessionManager); - - // Create the Flight SQL producer - producer = new IoTDBFlightSqlProducer(allocator, flightSessionManager); - - // Build the Flight server with auth2 Bearer token authentication - Location location = Location.forGrpcInsecure("0.0.0.0", port); - flightServer = - FlightServer.builder(allocator, location, producer) - .headerAuthenticator( - new GeneratedBearerTokenAuthenticator( - new BasicCallHeaderAuthenticator(authHandler))) - .build(); - - flightServer.start(); - LOGGER.info( - "Arrow Flight SQL service started successfully on port {}", flightServer.getPort()); - } catch (IOException e) { - LOGGER.error("Failed to start Arrow Flight SQL service", e); - stop(); - throw new RuntimeException("Failed to start Arrow Flight SQL service", e); + public synchronized void start() { + synchronized (lifecycleLock) { + if (flightServer != null) { + LOGGER.warn("Arrow Flight SQL service already started"); + return; + } + + int port = IoTDBDescriptor.getInstance().getConfig().getArrowFlightSqlPort(); + LOGGER.info("Starting Arrow Flight SQL service on port {}", port); + + try { + // Create the root allocator for Arrow memory management with memory limit + long maxMemory = Runtime.getRuntime().maxMemory(); + long allocatorLimit = + Math.min( + IoTDBDescriptor.getInstance().getConfig().getArrowFlightSqlMaxAllocatorMemory(), + maxMemory / 4); + allocator = new RootAllocator(allocatorLimit); + LOGGER.info( + "Arrow allocator initialized with limit: {} bytes ({} MB)", + allocatorLimit, + allocatorLimit / (1024 * 1024)); + + Location location = Location.forGrpcInsecure("0.0.0.0", port); + + // Create session manager with TTL + flightSessionManager = new FlightSqlSessionManager(SESSION_TIMEOUT_MINUTES); + FlightSqlAuthHandler authHandler = new FlightSqlAuthHandler(flightSessionManager); + + // Create the Flight SQL producer + producer = new IoTDBFlightSqlProducer(allocator, flightSessionManager); + + flightServer = + FlightServer.builder(allocator, location, producer) + .headerAuthenticator(authHandler) + // Configure Netty server for DataNode JVM environment: + // - directExecutor: run gRPC handlers in the Netty event loop thread to + // avoid thread scheduling issues with the default executor + // - flowControlWindow: explicit HTTP/2 flow control prevents framing errors + // when standalone Netty JARs coexist on the classpath + .transportHint( + "grpc.builderConsumer", + (java.util.function.Consumer<io.grpc.netty.NettyServerBuilder>) + nsb -> { + nsb.directExecutor(); + nsb.initialFlowControlWindow(1048576); + nsb.flowControlWindow(1048576); + }) + .build(); + + flightServer.start(); + LOGGER.info( + "Arrow Flight SQL service started successfully on port {}", flightServer.getPort()); + } catch (IOException e) { + LOGGER.error("Failed to start Arrow Flight SQL service", e); + stop(); + throw new RuntimeException("Failed to start Arrow Flight SQL service", e); + } } } @Override - public void stop() { - LOGGER.info("Stopping Arrow Flight SQL service"); + public synchronized void stop() { + synchronized (lifecycleLock) { + if (flightServer == null) { + LOGGER.warn("Arrow Flight SQL service not started"); + return; + } - if (flightServer != null) { - try { - flightServer.shutdown(); - flightServer.awaitTermination(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOGGER.warn("Interrupted while waiting for Flight server shutdown", e); - Thread.currentThread().interrupt(); + LOGGER.info("Stopping Arrow Flight SQL service"); + + if (flightServer != null) { try { - flightServer.close(); - } catch (Exception ex) { - LOGGER.warn("Error force-closing Flight server", ex); + flightServer.shutdown(); + flightServer.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.warn("Interrupted while waiting for Flight server shutdown", e); + Thread.currentThread().interrupt(); + try { + flightServer.close(); + } catch (Exception ex) { + LOGGER.warn("Error force-closing Flight server", ex); + } + } catch (Exception e) { + LOGGER.warn("Error shutting down Flight server", e); } - } catch (Exception e) { - LOGGER.warn("Error shutting down Flight server", e); + flightServer = null; } - flightServer = null; - } - if (producer != null) { - try { - producer.close(); - } catch (Exception e) { - LOGGER.warn("Error closing Flight SQL producer", e); + if (producer != null) { + try { + producer.close(); + } catch (Exception e) { + LOGGER.warn("Error closing Flight SQL producer", e); + } + producer = null; } - producer = null; - } - if (flightSessionManager != null) { - flightSessionManager.close(); - flightSessionManager = null; - } + if (flightSessionManager != null) { + flightSessionManager.close(); + flightSessionManager = null; + } - if (allocator != null) { - allocator.close(); - allocator = null; - } + if (allocator != null) { + allocator.close(); + allocator = null; + } - LOGGER.info("Arrow Flight SQL service stopped"); + LOGGER.info("Arrow Flight SQL service stopped"); + } } } diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlSessionManager.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlSessionManager.java index ad829888f22..0178dc2f1fe 100644 --- a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlSessionManager.java +++ b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/FlightSqlSessionManager.java @@ -20,9 +20,11 @@ package org.apache.iotdb.flight; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.protocol.session.IClientSession; import org.apache.iotdb.db.protocol.session.InternalClientSession; import org.apache.iotdb.db.protocol.session.SessionManager; +import org.apache.iotdb.rpc.TSStatusCode; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; @@ -31,7 +33,9 @@ import org.apache.arrow.flight.CallHeaders; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.UUID; +import java.security.SecureRandom; +import java.time.ZoneId; +import java.util.Base64; import java.util.concurrent.TimeUnit; /** @@ -43,12 +47,16 @@ public class FlightSqlSessionManager { private static final Logger LOGGER = LoggerFactory.getLogger(FlightSqlSessionManager.class); private static final String AUTHORIZATION_HEADER = "authorization"; private static final String BEARER_PREFIX = "Bearer "; + private static final SecureRandom SECURE_RANDOM = new SecureRandom(); private final SessionManager sessionManager = SessionManager.getInstance(); /** Cache of Bearer token -> IClientSession with configurable TTL. */ private final Cache<String, IClientSession> tokenCache; + /** Cache of username -> Bearer token for session reuse with Basic auth on every call. */ + private final Cache<String, String> userTokenCache; + public FlightSqlSessionManager(long sessionTimeoutMinutes) { this.tokenCache = Caffeine.newBuilder() @@ -56,17 +64,22 @@ public class FlightSqlSessionManager { .removalListener( (String token, IClientSession session, RemovalCause cause) -> { if (session != null && cause != RemovalCause.REPLACED) { - LOGGER.info("Flight SQL session expired, closing: {}", session); - sessionManager.closeSession( - session, - queryId -> - org.apache.iotdb.db.queryengine.plan.Coordinator.getInstance() - .cleanupQueryExecution(queryId), - false); - sessionManager.removeCurrSessionForMqtt(null); // handled via sessions map only + LOGGER.info("Flight SQL session expired: {}, cause: {}", session, cause); + try { + sessionManager.closeSession( + session, + queryId -> + org.apache.iotdb.db.queryengine.plan.Coordinator.getInstance() + .cleanupQueryExecution(queryId), + false); + } catch (Exception e) { + LOGGER.error("Error closing expired session", e); + } } }) .build(); + this.userTokenCache = + Caffeine.newBuilder().expireAfterAccess(sessionTimeoutMinutes, TimeUnit.MINUTES).build(); } /** @@ -79,34 +92,42 @@ public class FlightSqlSessionManager { * @throws SecurityException if authentication fails */ public String authenticate(String username, String password, String clientAddress) { - // Create a session for this client + // Check if this user already has an active session (reuse it) + String existingToken = userTokenCache.getIfPresent(username); + if (existingToken != null && tokenCache.getIfPresent(existingToken) != null) { + return existingToken; + } + + // Verify credentials (REST pattern) + try { + org.apache.iotdb.common.rpc.thrift.TSStatus status = + AuthorityChecker.checkUser(username, password); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.warn("Authentication failed for client: {}", clientAddress); + throw new SecurityException("Authentication failed: wrong username or password"); + } + } catch (SecurityException e) { + throw e; + } catch (Exception e) { + throw new SecurityException("Authentication failed", e); + } + + // Create and register session (REST pattern) IClientSession session = new InternalClientSession("FlightSQL-" + clientAddress); session.setSqlDialect(IClientSession.SqlDialect.TABLE); + sessionManager.registerSession(session); - // Register the session before login (MQTT pattern) - sessionManager.registerSessionForMqtt(session); - - // Use SessionManager's login method - org.apache.iotdb.db.protocol.basic.BasicOpenSessionResp loginResp = - sessionManager.login( - session, - username, - password, - java.time.ZoneId.systemDefault().getId(), - SessionManager.CURRENT_RPC_VERSION, - IoTDBConstant.ClientVersion.V_1_0, - IClientSession.SqlDialect.TABLE); - - if (loginResp.getCode() != org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - // Remove the session if login failed - sessionManager.removeCurrSessionForMqtt(null); - throw new SecurityException("Authentication failed: " + loginResp.getMessage()); - } + long userId = AuthorityChecker.getUserId(username).orElse(-1L); + sessionManager.supplySession( + session, userId, username, ZoneId.systemDefault(), IoTDBConstant.ClientVersion.V_1_0); - // Generate Bearer token and store in cache - String token = UUID.randomUUID().toString(); + // Generate cryptographically secure Bearer token (32 bytes = 256 bits) + byte[] tokenBytes = new byte[32]; + SECURE_RANDOM.nextBytes(tokenBytes); + String token = Base64.getUrlEncoder().withoutPadding().encodeToString(tokenBytes); tokenCache.put(token, session); - LOGGER.info("Flight SQL user '{}' authenticated, session: {}", username, session); + userTokenCache.put(username, token); + LOGGER.info("Flight SQL authentication successful for client: {}", clientAddress); return token; } diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java index 067ccfb20d4..1793a64abec 100644 --- a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java +++ b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java @@ -34,6 +34,10 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement; import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser; import org.apache.iotdb.rpc.TSStatusCode; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.google.protobuf.Any; import com.google.protobuf.ByteString; import org.apache.arrow.flight.CallStatus; import org.apache.arrow.flight.Criteria; @@ -54,11 +58,10 @@ import org.apache.tsfile.read.common.block.TsBlock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.charset.StandardCharsets; import java.time.ZoneId; import java.util.Collections; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; /** * Apache Arrow Flight SQL producer implementation for IoTDB. Handles SQL query execution via the @@ -76,8 +79,25 @@ public class IoTDBFlightSqlProducer implements FlightSqlProducer { private final SqlParser sqlParser = new SqlParser(); private final Metadata metadata = LocalExecutionPlanner.getInstance().metadata; - /** Stores query execution context by queryId for streaming results via getStream. */ - private final ConcurrentHashMap<Long, QueryContext> activeQueries = new ConcurrentHashMap<>(); + /** + * Stores query execution context by queryId for streaming results via getStream. Uses Caffeine + * cache with TTL to prevent resource leaks when clients don't call getStream. + */ + private final Cache<Long, QueryContext> activeQueries = + Caffeine.newBuilder() + .expireAfterWrite(CONFIG.getQueryTimeoutThreshold(), TimeUnit.MILLISECONDS) + .removalListener( + (Long queryId, QueryContext ctx, RemovalCause cause) -> { + if (ctx != null && cause != RemovalCause.EXPLICIT) { + LOGGER.warn("Query {} evicted due to {}, cleaning up", queryId, cause); + try { + coordinator.cleanupQueryExecution(queryId); + } catch (Exception e) { + LOGGER.error("Error cleaning up evicted query {}", queryId, e); + } + } + }) + .build(); public IoTDBFlightSqlProducer( BufferAllocator allocator, FlightSqlSessionManager flightSessionManager) { @@ -105,14 +125,31 @@ public class IoTDBFlightSqlProducer implements FlightSqlProducer { // ===================== SQL Query Execution ===================== + private static final int MAX_QUERY_LENGTH = 100_000; // 100KB + @Override public FlightInfo getFlightInfoStatement( FlightSql.CommandStatementQuery command, CallContext context, FlightDescriptor descriptor) { String sql = command.getQuery(); - LOGGER.debug("getFlightInfoStatement: {}", sql); + + // Validate query length + if (sql == null || sql.trim().isEmpty()) { + throw CallStatus.INVALID_ARGUMENT.withDescription("Empty SQL query").toRuntimeException(); + } + if (sql.length() > MAX_QUERY_LENGTH) { + throw CallStatus.INVALID_ARGUMENT + .withDescription("Query exceeds maximum length of " + MAX_QUERY_LENGTH + " characters") + .toRuntimeException(); + } IClientSession session = getSessionFromContext(context); + // Log query for audit (truncate if too long) + LOGGER.info( + "Executing query for user {}: {}", + session.getUsername(), + sql.substring(0, Math.min(sql.length(), 200))); + Long queryId = null; try { queryId = sessionManager.requestQueryId(); @@ -142,9 +179,10 @@ public class IoTDBFlightSqlProducer implements FlightSqlProducer { IQueryExecution queryExecution = coordinator.getQueryExecution(queryId); if (queryExecution == null) { - throw CallStatus.INTERNAL - .withDescription("Query execution not found after execution") - .toRuntimeException(); + // Non-query statements (USE, CREATE, INSERT, etc.) don't produce a query execution. + // Return an empty FlightInfo with no endpoints. + return new FlightInfo( + new Schema(Collections.emptyList()), descriptor, Collections.emptyList(), 0, 0); } DatasetHeader header = queryExecution.getDatasetHeader(); @@ -153,20 +191,28 @@ public class IoTDBFlightSqlProducer implements FlightSqlProducer { // Store the query context for later getStream calls activeQueries.put(queryId, new QueryContext(queryExecution, header, session)); - // Build ticket containing the queryId - byte[] ticketBytes = Long.toString(queryId).getBytes(StandardCharsets.UTF_8); - Ticket ticket = new Ticket(ticketBytes); + // Build ticket as a serialized TicketStatementQuery protobuf. + // The FlightSqlProducer base class's getStream() unpacks tickets as Any + // and dispatches to getStreamStatement(). + ByteString handle = ByteString.copyFromUtf8(Long.toString(queryId)); + FlightSql.TicketStatementQuery ticketQuery = + FlightSql.TicketStatementQuery.newBuilder().setStatementHandle(handle).build(); + Ticket ticket = new Ticket(Any.pack(ticketQuery).toByteArray()); FlightEndpoint endpoint = new FlightEndpoint(ticket); return new FlightInfo(arrowSchema, descriptor, Collections.singletonList(endpoint), -1, -1); - } catch (RuntimeException e) { + } catch (Exception e) { // Cleanup on error + LOGGER.error("Error executing query: {}", sql, e); if (queryId != null) { coordinator.cleanupQueryExecution(queryId); - activeQueries.remove(queryId); + activeQueries.invalidate(queryId); } - throw e; + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } + throw CallStatus.INTERNAL.withDescription(e.getMessage()).toRuntimeException(); } } @@ -184,17 +230,26 @@ public class IoTDBFlightSqlProducer implements FlightSqlProducer { CallContext context, ServerStreamListener listener) { ByteString handle = ticketQuery.getStatementHandle(); - long queryId = Long.parseLong(handle.toStringUtf8()); + long queryId; + try { + queryId = Long.parseLong(handle.toStringUtf8()); + } catch (NumberFormatException e) { + listener.error( + CallStatus.INVALID_ARGUMENT + .withDescription("Invalid statement handle: " + handle.toStringUtf8()) + .toRuntimeException()); + return; + } streamQueryResults(queryId, listener); } /** Streams query results for a given queryId as Arrow VectorSchemaRoot batches. */ private void streamQueryResults(long queryId, ServerStreamListener listener) { - QueryContext ctx = activeQueries.get(queryId); + QueryContext ctx = activeQueries.getIfPresent(queryId); if (ctx == null) { listener.error( CallStatus.NOT_FOUND - .withDescription("Query not found for id: " + queryId) + .withDescription("Query not found or expired: " + queryId) .toRuntimeException()); return; } @@ -220,9 +275,15 @@ public class IoTDBFlightSqlProducer implements FlightSqlProducer { } catch (IoTDBException e) { LOGGER.error("Error streaming query results for queryId={}", queryId, e); listener.error(CallStatus.INTERNAL.withDescription(e.getMessage()).toRuntimeException()); + } catch (Exception e) { + LOGGER.error("Unexpected error streaming query results for queryId={}", queryId, e); + listener.error( + CallStatus.INTERNAL + .withDescription("Internal error: " + e.getMessage()) + .toRuntimeException()); } finally { coordinator.cleanupQueryExecution(queryId); - activeQueries.remove(queryId); + activeQueries.invalidate(queryId); if (root != null) { root.close(); } @@ -503,14 +564,14 @@ public class IoTDBFlightSqlProducer implements FlightSqlProducer { @Override public void close() throws Exception { // Clean up all active queries - for (Long queryId : activeQueries.keySet()) { + for (Long queryId : activeQueries.asMap().keySet()) { try { coordinator.cleanupQueryExecution(queryId); } catch (Exception e) { LOGGER.warn("Error cleaning up query {} during shutdown", queryId, e); } } - activeQueries.clear(); + activeQueries.invalidateAll(); } // ===================== Inner Classes ===================== diff --git a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/TsBlockToArrowConverter.java b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/TsBlockToArrowConverter.java index 396f230d0b9..ca5532f13f1 100644 --- a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/TsBlockToArrowConverter.java +++ b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/TsBlockToArrowConverter.java @@ -122,7 +122,10 @@ public class TsBlockToArrowConverter { for (int colIdx = 0; colIdx < columnNames.size(); colIdx++) { String colName = columnNames.get(colIdx); - Integer sourceIdx = headerMap.get(colName); + int sourceIdx = + (headerMap != null && headerMap.containsKey(colName)) + ? headerMap.get(colName) + : colIdx; Column column = tsBlock.getColumn(sourceIdx); TSDataType dataType = dataTypes.get(colIdx); FieldVector fieldVector = root.getVector(colIdx); diff --git a/external-service-impl/flight-sql/src/test/java/org/apache/iotdb/flight/TsBlockToArrowConverterTest.java b/external-service-impl/flight-sql/src/test/java/org/apache/iotdb/flight/TsBlockToArrowConverterTest.java index 413e17e23b7..f82cc398d80 100644 --- a/external-service-impl/flight-sql/src/test/java/org/apache/iotdb/flight/TsBlockToArrowConverterTest.java +++ b/external-service-impl/flight-sql/src/test/java/org/apache/iotdb/flight/TsBlockToArrowConverterTest.java @@ -55,7 +55,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import static org.junit.Assert.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** Unit tests for TsBlockToArrowConverter. */ public class TsBlockToArrowConverterTest { diff --git a/integration-test/pom.xml b/integration-test/pom.xml index 4ef730957e4..06382ea9913 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -246,6 +246,13 @@ <!--We will integrate rest-jar-with-dependencies into lib by assembly plugin--> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.iotdb</groupId> + <artifactId>flight-sql</artifactId> + <version>2.0.7-SNAPSHOT</version> + <!--We will integrate flight-sql-jar-with-dependencies into lib by assembly plugin--> + <scope>provided</scope> + </dependency> <dependency> <groupId>org.apache.arrow</groupId> <artifactId>flight-sql</artifactId> diff --git a/integration-test/src/assembly/mpp-share.xml b/integration-test/src/assembly/mpp-share.xml index 70072e8282e..74de1ae8b23 100644 --- a/integration-test/src/assembly/mpp-share.xml +++ b/integration-test/src/assembly/mpp-share.xml @@ -39,5 +39,9 @@ <source>${project.basedir}/../external-service-impl/rest/target/rest-${project.version}-jar-with-dependencies.jar</source> <outputDirectory>lib</outputDirectory> </file> + <file> + <source>${project.basedir}/../external-service-impl/flight-sql/target/flight-sql-${project.version}-jar-with-dependencies.jar</source> + <outputDirectory>lib</outputDirectory> + </file> </files> </assembly> diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java index d931ca77999..94d531c4d3d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java @@ -62,6 +62,7 @@ import static org.junit.Assert.*; public class IoTDBArrowFlightSqlIT { private static final String DATABASE = "flightsql_test_db"; + private static final String TABLE = DATABASE + ".test_table"; private static final String USER = "root"; private static final String PASSWORD = "root"; @@ -77,27 +78,17 @@ public class IoTDBArrowFlightSqlIT { baseEnv.getConfig().getDataNodeConfig().setEnableArrowFlightSqlService(true); baseEnv.initClusterEnvironment(); - // Get the Flight SQL port from the data node int port = EnvFactory.getEnv().getArrowFlightSqlPort(); - - // Create Arrow allocator and Flight client with Bearer token auth middleware allocator = new RootAllocator(Long.MAX_VALUE); Location location = Location.forGrpcInsecure("127.0.0.1", port); - // The ClientIncomingAuthHeaderMiddleware captures the Bearer token from the - // auth handshake ClientIncomingAuthHeaderMiddleware.Factory authFactory = new ClientIncomingAuthHeaderMiddleware.Factory(new ClientBearerHeaderHandler()); - flightClient = FlightClient.builder(allocator, location).intercept(authFactory).build(); - - // Authenticate: sends Basic credentials, server returns Bearer token - bearerToken = new CredentialCallOption(new BasicAuthCredentialWriter(USER, PASSWORD)); - - // Wrap in FlightSqlClient for Flight SQL protocol operations flightSqlClient = new FlightSqlClient(flightClient); + bearerToken = new CredentialCallOption(new BasicAuthCredentialWriter(USER, PASSWORD)); - // Use the standard session to create the test database and table with data + // Create test data via native session (not Flight SQL) try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS " + DATABASE); } @@ -145,11 +136,31 @@ public class IoTDBArrowFlightSqlIT { EnvFactory.getEnv().cleanClusterEnvironment(); } + @Test + public void testShowDatabases() throws Exception { + FlightInfo flightInfo = flightSqlClient.execute("SHOW DATABASES", bearerToken); + + List<List<String>> rows = fetchAllRows(flightInfo); + assertTrue("Should have at least 1 database", rows.size() >= 1); + + boolean found = false; + for (List<String> row : rows) { + for (String val : row) { + if (val.contains(DATABASE)) { + found = true; + break; + } + } + } + assertTrue("Should find test database " + DATABASE, found); + } + @Test public void testQueryWithAllDataTypes() throws Exception { FlightInfo flightInfo = flightSqlClient.execute( - "SELECT time, id1, s1, s2, s3, s4, s5, s6 FROM test_table ORDER BY time", bearerToken); + "SELECT time, id1, s1, s2, s3, s4, s5, s6 FROM " + TABLE + " ORDER BY time", + bearerToken); // Validate schema Schema schema = flightInfo.getSchema(); @@ -166,7 +177,8 @@ public class IoTDBArrowFlightSqlIT { public void testQueryWithFilter() throws Exception { FlightInfo flightInfo = flightSqlClient.execute( - "SELECT id1, s1 FROM test_table WHERE id1 = 'device1' ORDER BY time", bearerToken); + "SELECT id1, s1 FROM " + TABLE + " WHERE id1 = 'device1' ORDER BY time", + bearerToken); List<List<String>> rows = fetchAllRows(flightInfo); assertEquals("Should have 2 rows for device1", 2, rows.size()); @@ -177,7 +189,7 @@ public class IoTDBArrowFlightSqlIT { FlightInfo flightInfo = flightSqlClient.execute( "SELECT id1, COUNT(*) as cnt, SUM(s1) as s1_sum " - + "FROM test_table GROUP BY id1 ORDER BY id1", + + "FROM " + TABLE + " GROUP BY id1 ORDER BY id1", bearerToken); List<List<String>> rows = fetchAllRows(flightInfo); @@ -187,34 +199,18 @@ public class IoTDBArrowFlightSqlIT { @Test public void testEmptyResult() throws Exception { FlightInfo flightInfo = - flightSqlClient.execute("SELECT * FROM test_table WHERE id1 = 'nonexistent'", bearerToken); + flightSqlClient.execute( + "SELECT * FROM " + TABLE + " WHERE id1 = 'nonexistent'", bearerToken); List<List<String>> rows = fetchAllRows(flightInfo); assertEquals("Should have 0 rows", 0, rows.size()); } - @Test - public void testShowDatabases() throws Exception { - FlightInfo flightInfo = flightSqlClient.execute("SHOW DATABASES", bearerToken); - - List<List<String>> rows = fetchAllRows(flightInfo); - assertTrue("Should have at least 1 database", rows.size() >= 1); - - boolean found = false; - for (List<String> row : rows) { - for (String val : row) { - if (val.contains(DATABASE)) { - found = true; - break; - } - } - } - assertTrue("Should find test database " + DATABASE, found); - } + // ===================== Helper Methods ===================== /** - * Fetches all rows from all endpoints in a FlightInfo. Each row is a list of string - * representations of the column values. + * Fetches all rows from all endpoints in a FlightInfo using the shared client. Each row is a list + * of string representations of the column values. */ private List<List<String>> fetchAllRows(FlightInfo flightInfo) throws Exception { List<List<String>> rows = new ArrayList<>(); diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/informationschema/IoTDBServicesIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/informationschema/IoTDBServicesIT.java index 014586c5e12..c881479e83c 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/informationschema/IoTDBServicesIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/informationschema/IoTDBServicesIT.java @@ -67,7 +67,7 @@ public class IoTDBServicesIT { public void testQueryResult() { String[] retArray = new String[] { - "MQTT,1,STOPPED,", "REST,1,STOPPED,", + "FLIGHT_SQL,1,STOPPED,", "MQTT,1,STOPPED,", "REST,1,STOPPED,", }; // TableModel diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 3fef5d79fe1..e63fb54f66b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -112,6 +112,9 @@ public class IoTDBConfig { /** The Arrow Flight SQL service binding port. */ private int arrowFlightSqlPort = 8904; + /** The Arrow Flight SQL max allocator memory in bytes (default: 4GB). */ + private long arrowFlightSqlMaxAllocatorMemory = 4L * 1024 * 1024 * 1024; + /** The mqtt service binding host. */ private String mqttHost = "127.0.0.1"; @@ -2564,6 +2567,14 @@ public class IoTDBConfig { this.arrowFlightSqlPort = arrowFlightSqlPort; } + public long getArrowFlightSqlMaxAllocatorMemory() { + return arrowFlightSqlMaxAllocatorMemory; + } + + public void setArrowFlightSqlMaxAllocatorMemory(long arrowFlightSqlMaxAllocatorMemory) { + this.arrowFlightSqlMaxAllocatorMemory = arrowFlightSqlMaxAllocatorMemory; + } + public String getMqttHost() { return mqttHost; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 6730138b2af..3edd0910049 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -880,6 +880,9 @@ public class IoTDBDescriptor { // mqtt loadMqttProps(properties); + // Arrow Flight SQL + loadArrowFlightSqlProps(properties); + conf.setIntoOperationBufferSizeInByte( Long.parseLong( properties.getProperty( @@ -1942,6 +1945,24 @@ public class IoTDBDescriptor { } } + // Arrow Flight SQL related + private void loadArrowFlightSqlProps(TrimProperties properties) { + if (properties.getProperty("enable_arrow_flight_sql_service") != null) { + conf.setEnableArrowFlightSqlService( + Boolean.parseBoolean(properties.getProperty("enable_arrow_flight_sql_service").trim())); + } + + if (properties.getProperty("arrow_flight_sql_port") != null) { + conf.setArrowFlightSqlPort( + Integer.parseInt(properties.getProperty("arrow_flight_sql_port").trim())); + } + + if (properties.getProperty("arrow_flight_sql_max_allocator_memory") != null) { + conf.setArrowFlightSqlMaxAllocatorMemory( + Long.parseLong(properties.getProperty("arrow_flight_sql_max_allocator_memory").trim())); + } + } + // timed flush memtable private void loadTimedService(TrimProperties properties) throws IOException { conf.setEnableTimedFlushSeqMemtable( diff --git a/pom.xml b/pom.xml index 334b89200b5..a4e8c408250 100644 --- a/pom.xml +++ b/pom.xml @@ -759,6 +759,11 @@ <artifactId>arrow-memory-netty</artifactId> <version>${arrow.version}</version> </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-unsafe</artifactId> + <version>${arrow.version}</version> + </dependency> </dependencies> </dependencyManagement> <build>
