This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 745f1f39f  [VL] Fix Arrow ColumnarBatch cannnot revoke rowIterator 
correctly (#6797)
745f1f39f is described below

commit 745f1f39fef57b6ec6883b9d4458116c29bfcdd9
Author: Jin Chengcheng <[email protected]>
AuthorDate: Fri Aug 16 15:28:43 2024 +0800

     [VL] Fix Arrow ColumnarBatch cannnot revoke rowIterator correctly (#6797)
---
 .../gluten/columnarbatch/ColumnarBatchTest.java    | 33 ++++++++++
 .../gluten/columnarbatch/ColumnarBatches.java      | 36 ++---------
 .../spark/sql/vectorized/ColumnarBatchUtil.java    | 72 ++++++++++++++++++++++
 3 files changed, 110 insertions(+), 31 deletions(-)

diff --git 
a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
 
b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
index cd2ac50d3..3b78a4067 100644
--- 
a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
+++ 
b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
@@ -91,6 +91,39 @@ public class ColumnarBatchTest extends VeloxBackendTestBase {
         });
   }
 
+  @Test
+  public void testOffloadAndLoadReadRow() {
+    TaskResources$.MODULE$.runUnsafe(
+        () -> {
+          final int numRows = 100;
+          final ColumnarBatch batch = newArrowBatch("a boolean, b int", 
numRows);
+          final ArrowWritableColumnVector col0 = (ArrowWritableColumnVector) 
batch.column(0);
+          final ArrowWritableColumnVector col1 = (ArrowWritableColumnVector) 
batch.column(1);
+          for (int j = 0; j < numRows; j++) {
+            col0.putBoolean(j, j % 2 == 0);
+            col1.putInt(j, 15 - j);
+          }
+          col1.putNull(numRows - 1);
+          Assert.assertTrue(ColumnarBatches.isHeavyBatch(batch));
+          final ColumnarBatch offloaded =
+              
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), batch);
+          Assert.assertTrue(ColumnarBatches.isLightBatch(offloaded));
+          final ColumnarBatch loaded =
+              
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance(), 
offloaded);
+          Assert.assertTrue(ColumnarBatches.isHeavyBatch(loaded));
+          long cnt =
+              StreamSupport.stream(
+                      Spliterators.spliteratorUnknownSize(
+                          loaded.rowIterator(), Spliterator.ORDERED),
+                      false)
+                  .count();
+          Assert.assertEquals(numRows, cnt);
+          Assert.assertEquals(loaded.getRow(0).getInt(1), 15);
+          loaded.close();
+          return null;
+        });
+  }
+
   private static ColumnarBatch newArrowBatch(String schema, int numRows) {
     final ArrowWritableColumnVector[] columns =
         ArrowWritableColumnVector.allocateColumns(numRows, 
StructType.fromDDL(schema));
diff --git 
a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
 
b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
index 543e6d4cf..d00efd7b8 100644
--- 
a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
+++ 
b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
@@ -16,7 +16,6 @@
  */
 package org.apache.gluten.columnarbatch;
 
-import org.apache.gluten.exception.GlutenException;
 import org.apache.gluten.runtime.Runtime;
 import org.apache.gluten.runtime.Runtimes;
 import org.apache.gluten.utils.ArrowAbiUtil;
@@ -34,24 +33,13 @@ import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.sql.vectorized.ColumnVector;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.apache.spark.sql.vectorized.ColumnarBatchUtil;
 
-import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
 public class ColumnarBatches {
-  private static final Field FIELD_COLUMNS;
-
-  static {
-    try {
-      Field f = ColumnarBatch.class.getDeclaredField("columns");
-      f.setAccessible(true);
-      FIELD_COLUMNS = f;
-    } catch (NoSuchFieldException e) {
-      throw new GlutenException(e);
-    }
-  }
 
   private ColumnarBatches() {}
 
@@ -90,21 +78,6 @@ public class ColumnarBatches {
     return BatchType.HEAVY;
   }
 
-  private static void transferVectors(ColumnarBatch from, ColumnarBatch 
target) {
-    try {
-      if (target.numCols() != from.numCols()) {
-        throw new IllegalStateException();
-      }
-      final ColumnVector[] newVectors = new ColumnVector[from.numCols()];
-      for (int i = 0; i < target.numCols(); i++) {
-        newVectors[i] = from.column(i);
-      }
-      FIELD_COLUMNS.set(target, newVectors);
-    } catch (IllegalAccessException e) {
-      throw new GlutenException(e);
-    }
-  }
-
   /** Heavy batch: Data is readable from JVM and formatted as Arrow data. */
   public static boolean isHeavyBatch(ColumnarBatch batch) {
     return identifyBatchType(batch) == BatchType.HEAVY;
@@ -201,8 +174,9 @@ public class ColumnarBatches {
       }
 
       // populate new vectors to input
-      transferVectors(output, input);
-      return input;
+      ColumnarBatchUtil.transferVectors(output, input);
+
+      return output;
     }
   }
 
@@ -236,7 +210,7 @@ public class ColumnarBatches {
       }
 
       // populate new vectors to input
-      transferVectors(output, input);
+      ColumnarBatchUtil.transferVectors(output, input);
       return input;
     }
   }
diff --git 
a/gluten-data/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchUtil.java
 
b/gluten-data/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchUtil.java
new file mode 100644
index 000000000..0e2c74813
--- /dev/null
+++ 
b/gluten-data/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchUtil.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.vectorized;
+
+import org.apache.gluten.columnarbatch.ColumnarBatches;
+import org.apache.gluten.exception.GlutenException;
+
+import java.lang.reflect.Field;
+
+public class ColumnarBatchUtil {
+
+  private static final Field FIELD_COLUMNS;
+  private static final Field FIELD_COLUMNAR_BATCH_ROW;
+
+  static {
+    try {
+      Field f = ColumnarBatch.class.getDeclaredField("columns");
+      f.setAccessible(true);
+      FIELD_COLUMNS = f;
+      Field row = ColumnarBatch.class.getDeclaredField("row");
+      row.setAccessible(true);
+      FIELD_COLUMNAR_BATCH_ROW = row;
+    } catch (NoSuchFieldException e) {
+      throw new GlutenException(e);
+    }
+  }
+
+  private static void setColumnarBatchRow(
+      ColumnarBatch from, ColumnVector[] columns, ColumnarBatch target) {
+    ColumnarBatchRow newRow = new ColumnarBatchRow(columns);
+    try {
+      ColumnarBatchRow row = (ColumnarBatchRow) 
FIELD_COLUMNAR_BATCH_ROW.get(from);
+      newRow.rowId = row.rowId;
+      FIELD_COLUMNAR_BATCH_ROW.set(target, newRow);
+    } catch (IllegalAccessException e) {
+      throw new GlutenException(e);
+    }
+  }
+
+  public static void transferVectors(ColumnarBatch from, ColumnarBatch target) 
{
+    try {
+      if (target.numCols() != from.numCols()) {
+        throw new IllegalStateException();
+      }
+      final ColumnVector[] newVectors = new ColumnVector[from.numCols()];
+      for (int i = 0; i < target.numCols(); i++) {
+        newVectors[i] = from.column(i);
+      }
+      FIELD_COLUMNS.set(target, newVectors);
+      // Light batch does not need the row.
+      if (ColumnarBatches.isHeavyBatch(target)) {
+        setColumnarBatchRow(from, newVectors, target);
+      }
+    } catch (IllegalAccessException e) {
+      throw new GlutenException(e);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to