This is an automated email from the ASF dual-hosted git repository.

lgbo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 10b90ae6f2 [GLUTEN-9653][Flink] Support boolean in vector-to-row 
conversion (#9654)
10b90ae6f2 is described below

commit 10b90ae6f241cd48c5fec10b00f980ec457eaaf2
Author: lgbo <[email protected]>
AuthorDate: Mon May 19 09:28:28 2025 +0800

    [GLUTEN-9653][Flink] Support boolean in vector-to-row conversion (#9654)
---
 .../vectorized/FlinkRowToVLVectorConvertor.java    | 36 +++++++++++++---------
 .../stream/common/GlutenStreamingTestBase.java     |  1 -
 .../table/runtime/stream/custom/ScanTest.java      |  4 +--
 3 files changed, 24 insertions(+), 17 deletions(-)

diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java
index 06962e0449..264e33ec67 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java
@@ -21,6 +21,7 @@ import io.github.zhztheplayer.velox4j.data.BaseVector;
 import io.github.zhztheplayer.velox4j.data.RowVector;
 import io.github.zhztheplayer.velox4j.session.Session;
 import io.github.zhztheplayer.velox4j.type.BigIntType;
+import io.github.zhztheplayer.velox4j.type.BooleanType;
 import io.github.zhztheplayer.velox4j.type.IntegerType;
 import io.github.zhztheplayer.velox4j.type.RowType;
 import io.github.zhztheplayer.velox4j.type.TimestampType;
@@ -28,6 +29,7 @@ import io.github.zhztheplayer.velox4j.type.Type;
 import io.github.zhztheplayer.velox4j.type.VarCharType;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.TimeStampMilliVector;
@@ -40,12 +42,13 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryStringData;
 
+
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.List;
 
 /** Converter between velox RowVector and Flink RowData. */
 public class FlinkRowToVLVectorConvertor {
-
     public static RowVector fromRowData(
             RowData row,
             BufferAllocator allocator,
@@ -132,26 +135,31 @@ public class FlinkRowToVLVectorConvertor {
             RowType rowType) {
         // TODO: support more types
         BaseVector loadedVector = null;
-        FieldVector fieldVector = null;
-        try {
+        FieldVector structVector = null;
+        try{
             loadedVector = rowVector.loadedVector();
-            fieldVector = Arrow.toArrowVector(
-                    allocator,
-                    loadedVector);
+            // The result is StructVector
+            structVector = Arrow.toArrowVector(
+                allocator,
+                loadedVector);
+            final List<FieldVector> fieldVectors = 
structVector.getChildrenFromFields();
             List<RowData> rowDatas = new ArrayList<>(rowVector.getSize());
             for (int j = 0; j < rowVector.getSize(); j++) {
                 List<Object> fieldValues = new ArrayList<>(rowType.size());
                 for (int i = 0; i < rowType.size(); i++) {
                     Type fieldType = rowType.getChildren().get(i);
-                    if (fieldType instanceof IntegerType) {
-                        fieldValues.add(i, ((IntVector) 
fieldVector.getChildrenFromFields().get(i)).get(j));
+                    if (fieldType instanceof BooleanType) {
+                        // BitVector returns are integer, need to convert to 
boolean
+                        fieldValues.add(i, ((BitVector) 
fieldVectors.get(i)).get(j) != 0);
+                    } else if (fieldType instanceof IntegerType) {
+                        fieldValues.add(i, ((IntVector) 
fieldVectors.get(i)).get(j));
                     } else if (fieldType instanceof BigIntType) {
-                        fieldValues.add(i, ((BigIntVector) 
fieldVector.getChildrenFromFields().get(i)).get(j));
+                        fieldValues.add(i, ((BigIntVector) 
fieldVectors.get(i)).get(j));
                     } else if (fieldType instanceof VarCharType) {
                         fieldValues.add(
-                                i,
-                                BinaryStringData.fromBytes(
-                                        ((VarCharVector) 
fieldVector.getChildrenFromFields().get(i)).get(j)));
+                            i,
+                            BinaryStringData.fromBytes(
+                                    ((VarCharVector) 
fieldVectors.get(i)).get(j)));
                     } else {
                         throw new RuntimeException("Unsupported field type: " 
+ fieldType);
                     }
@@ -162,8 +170,8 @@ public class FlinkRowToVLVectorConvertor {
         } finally {
             /// The FieldVector/BaseVector should be closed in `finally`, to 
avoid it may not be closed when exceptions rasied,
             /// that lead to memory leak.
-            if (fieldVector != null) {
-                fieldVector.close();
+            if (structVector != null) {
+                structVector.close();
             }
             if (loadedVector != null) {
                 loadedVector.close();
diff --git 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
index 80a6c7fa87..7dd9466304 100644
--- 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
+++ 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
@@ -82,7 +82,6 @@ public class GlutenStreamingTestBase extends 
StreamingTestBase {
                 
CollectionUtil.iteratorToList(tEnv().executeSql(query).collect()).stream()
                         .map(Object::toString)
                         .collect(Collectors.toList());
-        actual.sort(String::compareTo);
         assertThat(actual).isEqualTo(expected);
     }
 }
diff --git 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
index a7844a6740..64ed5ba60d 100644
--- 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
+++ 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
@@ -45,9 +45,9 @@ class ScanTest extends GlutenStreamingTestBase {
 
     @Test
     void testFilter() {
-        String query = "select a, b as b,c from MyTable where a > 0";
+        String query = "select a, b as b,c, a > 2 from MyTable where a > 0";
         LOG.info("execution plan: {}", explainExecutionPlan(query));
-        runAndCheck(query, Arrays.asList("+I[1, 1, 1]", "+I[2, 2, 2]", "+I[3, 
3, 3]"));
+        runAndCheck(query, Arrays.asList("+I[1, 1, 1, false]", "+I[2, 2, 2, 
false]", "+I[3, 3, 3, true]"));
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to