This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ai-code/flight-sql
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ai-code/flight-sql by this
push:
new da76dadf1dc partial fix
da76dadf1dc is described below
commit da76dadf1dccb3eb0482c3636f65fc0d48ac7b6b
Author: JackieTien97 <[email protected]>
AuthorDate: Fri Feb 27 08:45:33 2026 +0800
partial fix
---
TestArrowOffset.class | Bin 0 -> 3134 bytes
external-service-impl/flight-sql/pom.xml | 45 ++++++++++++++----
.../iotdb/flight/IoTDBFlightSqlProducer.java | 51 +++++++++++++++++----
.../org/apache/iotdb/flight/TestArrowOffset.java | 38 +++++++++++++++
.../it/flightsql/IoTDBArrowFlightSqlIT.java | 22 +++++----
5 files changed, 127 insertions(+), 29 deletions(-)
diff --git a/TestArrowOffset.class b/TestArrowOffset.class
new file mode 100644
index 00000000000..fecab16726f
Binary files /dev/null and b/TestArrowOffset.class differ
diff --git a/external-service-impl/flight-sql/pom.xml
b/external-service-impl/flight-sql/pom.xml
index e1160323af6..bb9536ad987 100644
--- a/external-service-impl/flight-sql/pom.xml
+++ b/external-service-impl/flight-sql/pom.xml
@@ -41,8 +41,18 @@
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
- <artifactId>arrow-memory-netty</artifactId>
- <scope>runtime</scope>
+ <artifactId>flight-core</artifactId>
+ <version>${arrow.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-vector</artifactId>
+ <version>${arrow.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-core</artifactId>
+ <version>${arrow.version}</version>
</dependency>
<!-- IoTDB dependencies (provided at runtime by DataNode classloader)
-->
<dependency>
@@ -51,6 +61,24 @@
<scope>provided</scope>
<version>2.0.7-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-thrift</artifactId>
+ <scope>provided</scope>
+ <version>2.0.7-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>service-rpc</artifactId>
+ <scope>provided</scope>
+ <version>2.0.7-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-thrift-commons</artifactId>
+ <scope>provided</scope>
+ <version>2.0.7-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>node-commons</artifactId>
@@ -69,6 +97,11 @@
<version>${tsfile.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>3.25.1</version>
+ </dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
@@ -113,14 +146,6 @@
<configuration>
<ignoredDependencies>
<ignoredDependency>org.apache.tsfile:common</ignoredDependency>
-
<ignoredDependency>org.apache.iotdb:iotdb-thrift-commons</ignoredDependency>
-
<ignoredDependency>com.google.protobuf:protobuf-java</ignoredDependency>
-
<ignoredDependency>org.apache.iotdb:iotdb-thrift</ignoredDependency>
-
<ignoredDependency>org.apache.arrow:flight-core</ignoredDependency>
-
<ignoredDependency>org.apache.arrow:arrow-vector</ignoredDependency>
-
<ignoredDependency>org.apache.arrow:arrow-memory-core</ignoredDependency>
-
<ignoredDependency>org.apache.iotdb:service-rpc</ignoredDependency>
-
<ignoredDependency>org.apache.arrow:arrow-memory-netty</ignoredDependency>
</ignoredDependencies>
</configuration>
</plugin>
diff --git
a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java
index fa8286db3bf..4a4227ecb34 100644
---
a/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java
+++
b/external-service-impl/flight-sql/src/main/java/org/apache/iotdb/flight/IoTDBFlightSqlProducer.java
@@ -210,7 +210,7 @@ public class IoTDBFlightSqlProducer implements
FlightSqlProducer {
LOGGER.warn("getStreamStatement called for queryId={}", queryId);
try {
streamQueryResults(queryId, listener);
- } catch (Exception e) {
+ } catch (Throwable e) {
LOGGER.error("getStreamStatement failed for queryId={}", queryId, e);
listener.error(
CallStatus.INTERNAL
@@ -230,28 +230,62 @@ public class IoTDBFlightSqlProducer implements
FlightSqlProducer {
return;
}
- VectorSchemaRoot root = null;
- try {
- root = TsBlockToArrowConverter.createVectorSchemaRoot(ctx.header,
allocator);
+ try (VectorSchemaRoot root =
+ TsBlockToArrowConverter.createVectorSchemaRoot(ctx.header, allocator))
{
listener.start(root);
+ LOGGER.warn("streamQueryResults: listener started for queryId={}",
queryId);
+ int batchCount = 0;
while (true) {
+ LOGGER.warn("streamQueryResults: fetching batch {} for queryId={}",
batchCount, queryId);
Optional<TsBlock> optionalTsBlock =
ctx.queryExecution.getBatchResult();
- if (!optionalTsBlock.isPresent() || optionalTsBlock.get().isEmpty()) {
+ if (!optionalTsBlock.isPresent()) {
+ LOGGER.warn(
+ "streamQueryResults: optionalTsBlock not present for queryId={},
breaking", queryId);
break;
}
TsBlock tsBlock = optionalTsBlock.get();
- root.clear();
+ if (tsBlock.isEmpty()) {
+ LOGGER.warn("streamQueryResults: tsBlock isEmpty for queryId={},
continuing", queryId);
+ continue;
+ }
+
+ LOGGER.warn(
+ "streamQueryResults: filling root with batch {} ({} rows)",
+ batchCount,
+ tsBlock.getPositionCount());
TsBlockToArrowConverter.fillVectorSchemaRoot(root, tsBlock,
ctx.header);
listener.putNext();
+ LOGGER.warn("streamQueryResults: putNext done for batch {}",
batchCount);
+
+ while (!listener.isReady() && !listener.isCancelled()) {
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+
+ if (listener.isCancelled()) {
+ LOGGER.warn("Flight stream cancelled by client for queryId={}",
queryId);
+ break;
+ }
+ batchCount++;
}
+ LOGGER.warn(
+ "streamQueryResults: completing listener for queryId={}, total
batches={}",
+ queryId,
+ batchCount);
+ // Detach buffers from root so it's not freed while gRPC sends the last
batch
+ root.allocateNew();
listener.completed();
} catch (IoTDBException e) {
LOGGER.error("Error streaming query results for queryId={}", queryId, e);
listener.error(CallStatus.INTERNAL.withDescription(e.getMessage()).toRuntimeException());
- } catch (Exception e) {
+ } catch (Throwable e) {
LOGGER.error("Unexpected error streaming query results for queryId={}",
queryId, e);
listener.error(
CallStatus.INTERNAL
@@ -260,9 +294,6 @@ public class IoTDBFlightSqlProducer implements
FlightSqlProducer {
} finally {
coordinator.cleanupQueryExecution(queryId);
activeQueries.remove(queryId);
- if (root != null) {
- root.close();
- }
}
}
diff --git
a/external-service-impl/flight-sql/src/test/java/org/apache/iotdb/flight/TestArrowOffset.java
b/external-service-impl/flight-sql/src/test/java/org/apache/iotdb/flight/TestArrowOffset.java
new file mode 100644
index 00000000000..f7d68e5c9f9
--- /dev/null
+++
b/external-service-impl/flight-sql/src/test/java/org/apache/iotdb/flight/TestArrowOffset.java
@@ -0,0 +1,38 @@
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.FieldType;
+
+public class TestArrowOffset {
+ public static void main(String[] args) {
+ try (BufferAllocator allocator = new RootAllocator();
+ VarCharVector vector = new VarCharVector("test",
FieldType.nullable(new ArrowType.Utf8()), allocator)) {
+ vector.allocateNew();
+
+ // Set index 0 to a string
+ vector.setSafe(0, "hello".getBytes());
+ // Set index 1 to null
+ vector.setNull(1);
+ // Set index 2 to another string
+ vector.setSafe(2, "world".getBytes());
+
+ vector.setValueCount(3);
+
+ System.out.println("Offset at 0: " +
vector.getOffsetBuffer().getInt(0));
+ System.out.println("Offset at 1: " +
vector.getOffsetBuffer().getInt(4));
+ System.out.println("Offset at 2: " +
vector.getOffsetBuffer().getInt(8));
+ System.out.println("Offset at 3: " +
vector.getOffsetBuffer().getInt(12));
+
+ VectorUnloader unloader = new VectorUnloader(new
org.apache.arrow.vector.VectorSchemaRoot(
+ java.util.Collections.singletonList(vector.getField()),
+ java.util.Collections.singletonList(vector),
+ 3));
+ try (ArrowRecordBatch batch = unloader.getRecordBatch()) {
+ System.out.println("Record batch length: " +
batch.computeBodyLength());
+ }
+ }
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java
index 1710c15adb2..a3c80256e9d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/flightsql/IoTDBArrowFlightSqlIT.java
@@ -163,6 +163,7 @@ public class IoTDBArrowFlightSqlIT {
Schema schema;
List<Field> fields;
// 1. Query with all data types
+ System.out.println("Executing query...");
flightInfo =
flightSqlClient.execute(
"SELECT time, id1, s1, s2, s3, s4, s5, s6 FROM " + TABLE + " ORDER
BY time",
@@ -174,6 +175,7 @@ public class IoTDBArrowFlightSqlIT {
rows = fetchAllRows(flightInfo);
assertEquals("Should have 3 rows", 3, rows.size());
// 2. Query with filter
+ System.out.println("Executing query...");
flightInfo =
flightSqlClient.execute(
"SELECT id1, s1 FROM " + TABLE + " WHERE id1 = 'device1' ORDER BY
time", credentials);
@@ -181,6 +183,7 @@ public class IoTDBArrowFlightSqlIT {
assertEquals("Should have 2 rows for device1", 2, rows.size());
// 3. Query with aggregation
+ System.out.println("Executing query...");
flightInfo =
flightSqlClient.execute(
"SELECT id1, COUNT(*) as cnt, SUM(s1) as s1_sum "
@@ -192,6 +195,7 @@ public class IoTDBArrowFlightSqlIT {
assertEquals("Should have 2 groups", 2, rows.size());
// 4. Empty result query
+ System.out.println("Executing query...");
flightInfo =
flightSqlClient.execute(
"SELECT * FROM " + TABLE + " WHERE id1 = 'nonexistent'",
credentials);
@@ -199,6 +203,7 @@ public class IoTDBArrowFlightSqlIT {
assertEquals("Should have 0 rows", 0, rows.size());
// 5. Show databases
+ System.out.println("Executing query...");
flightInfo = flightSqlClient.execute("SHOW DATABASES", credentials);
rows = fetchAllRows(flightInfo);
assertTrue("Should have at least 1 database", rows.size() >= 1);
@@ -222,17 +227,16 @@ public class IoTDBArrowFlightSqlIT {
private List<List<String>> fetchAllRows(FlightInfo flightInfo) throws
Exception {
List<List<String>> rows = new ArrayList<>();
for (FlightEndpoint endpoint : flightInfo.getEndpoints()) {
- try (FlightStream stream =
flightSqlClient.getStream(endpoint.getTicket())) {
+ try (FlightStream stream =
flightSqlClient.getStream(endpoint.getTicket(), credentials)) {
while (stream.next()) {
- try (VectorSchemaRoot root = stream.getRoot()) {
- for (int i = 0; i < root.getRowCount(); i++) {
- List<String> row = new ArrayList<>();
- for (FieldVector vector : root.getFieldVectors()) {
- Object value = vector.getObject(i);
- row.add(value == null ? "null" : value.toString());
- }
- rows.add(row);
+ VectorSchemaRoot root = stream.getRoot();
+ for (int i = 0; i < root.getRowCount(); i++) {
+ List<String> row = new ArrayList<>();
+ for (FieldVector vector : root.getFieldVectors()) {
+ Object value = vector.getObject(i);
+ row.add(value == null ? "null" : value.toString());
}
+ rows.add(row);
}
}
}