This is an automated email from the ASF dual-hosted git repository.
lgbo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 10b90ae6f2 [GLUTEN-9653][Flink] Support boolean in vector-to-row
conversion (#9654)
10b90ae6f2 is described below
commit 10b90ae6f241cd48c5fec10b00f980ec457eaaf2
Author: lgbo <[email protected]>
AuthorDate: Mon May 19 09:28:28 2025 +0800
[GLUTEN-9653][Flink] Support boolean in vector-to-row conversion (#9654)
---
.../vectorized/FlinkRowToVLVectorConvertor.java | 36 +++++++++++++---------
.../stream/common/GlutenStreamingTestBase.java | 1 -
.../table/runtime/stream/custom/ScanTest.java | 4 +--
3 files changed, 24 insertions(+), 17 deletions(-)
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java
index 06962e0449..264e33ec67 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java
@@ -21,6 +21,7 @@ import io.github.zhztheplayer.velox4j.data.BaseVector;
import io.github.zhztheplayer.velox4j.data.RowVector;
import io.github.zhztheplayer.velox4j.session.Session;
import io.github.zhztheplayer.velox4j.type.BigIntType;
+import io.github.zhztheplayer.velox4j.type.BooleanType;
import io.github.zhztheplayer.velox4j.type.IntegerType;
import io.github.zhztheplayer.velox4j.type.RowType;
import io.github.zhztheplayer.velox4j.type.TimestampType;
@@ -28,6 +29,7 @@ import io.github.zhztheplayer.velox4j.type.Type;
import io.github.zhztheplayer.velox4j.type.VarCharType;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.TimeStampMilliVector;
@@ -40,12 +42,13 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryStringData;
+
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
/** Converter between velox RowVector and Flink RowData. */
public class FlinkRowToVLVectorConvertor {
-
public static RowVector fromRowData(
RowData row,
BufferAllocator allocator,
@@ -132,26 +135,31 @@ public class FlinkRowToVLVectorConvertor {
RowType rowType) {
// TODO: support more types
BaseVector loadedVector = null;
- FieldVector fieldVector = null;
- try {
+ FieldVector structVector = null;
+ try{
loadedVector = rowVector.loadedVector();
- fieldVector = Arrow.toArrowVector(
- allocator,
- loadedVector);
+ // The result is StructVector
+ structVector = Arrow.toArrowVector(
+ allocator,
+ loadedVector);
+ final List<FieldVector> fieldVectors =
structVector.getChildrenFromFields();
List<RowData> rowDatas = new ArrayList<>(rowVector.getSize());
for (int j = 0; j < rowVector.getSize(); j++) {
List<Object> fieldValues = new ArrayList<>(rowType.size());
for (int i = 0; i < rowType.size(); i++) {
Type fieldType = rowType.getChildren().get(i);
- if (fieldType instanceof IntegerType) {
- fieldValues.add(i, ((IntVector)
fieldVector.getChildrenFromFields().get(i)).get(j));
+ if (fieldType instanceof BooleanType) {
+ // BitVector returns are integer, need to convert to
boolean
+ fieldValues.add(i, ((BitVector)
fieldVectors.get(i)).get(j) != 0);
+ } else if (fieldType instanceof IntegerType) {
+ fieldValues.add(i, ((IntVector)
fieldVectors.get(i)).get(j));
} else if (fieldType instanceof BigIntType) {
- fieldValues.add(i, ((BigIntVector)
fieldVector.getChildrenFromFields().get(i)).get(j));
+ fieldValues.add(i, ((BigIntVector)
fieldVectors.get(i)).get(j));
} else if (fieldType instanceof VarCharType) {
fieldValues.add(
- i,
- BinaryStringData.fromBytes(
- ((VarCharVector)
fieldVector.getChildrenFromFields().get(i)).get(j)));
+ i,
+ BinaryStringData.fromBytes(
+ ((VarCharVector)
fieldVectors.get(i)).get(j)));
} else {
throw new RuntimeException("Unsupported field type: "
+ fieldType);
}
@@ -162,8 +170,8 @@ public class FlinkRowToVLVectorConvertor {
} finally {
/// The FieldVector/BaseVector should be closed in `finally`, to
avoid it may not be closed when exceptions rasied,
/// that lead to memory leak.
- if (fieldVector != null) {
- fieldVector.close();
+ if (structVector != null) {
+ structVector.close();
}
if (loadedVector != null) {
loadedVector.close();
diff --git
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
index 80a6c7fa87..7dd9466304 100644
---
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
+++
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
@@ -82,7 +82,6 @@ public class GlutenStreamingTestBase extends
StreamingTestBase {
CollectionUtil.iteratorToList(tEnv().executeSql(query).collect()).stream()
.map(Object::toString)
.collect(Collectors.toList());
- actual.sort(String::compareTo);
assertThat(actual).isEqualTo(expected);
}
}
diff --git
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
index a7844a6740..64ed5ba60d 100644
---
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
+++
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
@@ -45,9 +45,9 @@ class ScanTest extends GlutenStreamingTestBase {
@Test
void testFilter() {
- String query = "select a, b as b,c from MyTable where a > 0";
+ String query = "select a, b as b,c, a > 2 from MyTable where a > 0";
LOG.info("execution plan: {}", explainExecutionPlan(query));
- runAndCheck(query, Arrays.asList("+I[1, 1, 1]", "+I[2, 2, 2]", "+I[3,
3, 3]"));
+ runAndCheck(query, Arrays.asList("+I[1, 1, 1, false]", "+I[2, 2, 2,
false]", "+I[3, 3, 3, true]"));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]