This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d1de90ab [VL] Add helper function ColumnarBatches.toString and 
InternalRow toString (#6458)
6d1de90ab is described below

commit 6d1de90ab6e982c6b47a26f3aef209cc69584dbd
Author: Jin Chengcheng <[email protected]>
AuthorDate: Tue Aug 20 08:49:30 2024 +0800

    [VL] Add helper function ColumnarBatches.toString and InternalRow toString 
(#6458)
---
 .../gluten/datasource/ArrowCSVFileFormat.scala     |  8 -----
 .../gluten/execution/RowToVeloxColumnarExec.scala  | 17 +++++++++++
 .../gluten/execution/VeloxColumnarToRowExec.scala  | 16 ++++++++++
 .../execution/ColumnarCachedBatchSerializer.scala  | 30 ++-----------------
 .../gluten/columnarbatch/ColumnarBatchTest.java    | 34 +++++++++++++++++++++-
 .../gluten/columnarbatch/ColumnarBatches.java      | 13 +++++++++
 .../org/apache/gluten/utils/InternalRowUtl.scala   | 34 ++++++++++++++++++++++
 .../org/apache/gluten/utils/InternalRowUtl.scala   | 34 ++++++++++++++++++++++
 .../org/apache/gluten/utils/InternalRowUtl.scala   | 34 ++++++++++++++++++++++
 .../org/apache/gluten/utils/InternalRowUtl.scala   | 33 +++++++++++++++++++++
 10 files changed, 216 insertions(+), 37 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
index a8e65b053..5629811f4 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
@@ -34,7 +34,6 @@ import 
org.apache.spark.sql.catalyst.expressions.{AttributeReference, JoinedRow}
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.execution.datasources.{FileFormat, 
HadoopFileLinesReader, OutputWriterFactory, PartitionedFile}
 import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
-import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
 import org.apache.spark.sql.types.{StructField, StructType}
@@ -310,16 +309,9 @@ object ArrowCSVFileFormat {
       schema: StructType,
       batchSize: Int,
       it: Iterator[InternalRow]): Iterator[ColumnarBatch] = {
-    // note, these metrics are unused but just make `RowToVeloxColumnarExec` 
happy
-    val numInputRows = new SQLMetric("numInputRows")
-    val numOutputBatches = new SQLMetric("numOutputBatches")
-    val convertTime = new SQLMetric("convertTime")
     val veloxBatch = RowToVeloxColumnarExec.toColumnarBatchIterator(
       it,
       schema,
-      numInputRows,
-      numOutputBatches,
-      convertTime,
       batchSize
     )
     veloxBatch
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
index 9ceb6b5b6..aa30cc80d 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
@@ -92,6 +92,23 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends 
RowToColumnarExecBas
 }
 
 object RowToVeloxColumnarExec {
+
+  def toColumnarBatchIterator(
+      it: Iterator[InternalRow],
+      schema: StructType,
+      columnBatchSize: Int): Iterator[ColumnarBatch] = {
+    val numInputRows = new SQLMetric("numInputRows")
+    val numOutputBatches = new SQLMetric("numOutputBatches")
+    val convertTime = new SQLMetric("convertTime")
+    RowToVeloxColumnarExec.toColumnarBatchIterator(
+      it,
+      schema,
+      numInputRows,
+      numOutputBatches,
+      convertTime,
+      columnBatchSize)
+  }
+
   def toColumnarBatchIterator(
       it: Iterator[InternalRow],
       schema: StructType,
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
index d3fb9c3ff..4bd553b01 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
@@ -99,6 +99,22 @@ case class VeloxColumnarToRowExec(child: SparkPlan) extends 
ColumnarToRowExecBas
 }
 
 object VeloxColumnarToRowExec {
+
+  def toRowIterator(
+      batches: Iterator[ColumnarBatch],
+      output: Seq[Attribute]): Iterator[InternalRow] = {
+    val numOutputRows = new SQLMetric("numOutputRows")
+    val numInputBatches = new SQLMetric("numInputBatches")
+    val convertTime = new SQLMetric("convertTime")
+    toRowIterator(
+      batches,
+      output,
+      numOutputRows,
+      numInputBatches,
+      convertTime
+    )
+  }
+
   def toRowIterator(
       batches: Iterator[ColumnarBatch],
       output: Seq[Attribute],
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
index db9e75a05..7f4235fdf 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
@@ -28,7 +28,6 @@ import 
org.apache.gluten.vectorized.ColumnarBatchSerializerJniWrapper
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer}
@@ -134,22 +133,9 @@ class ColumnarCachedBatchSerializer extends 
CachedBatchSerializer with SQLConfHe
         conf)
     }
 
-    // note, these metrics are unused but just make `RowToVeloxColumnarExec` 
happy
-    val metrics = 
BackendsApiManager.getMetricsApiInstance.genRowToColumnarMetrics(
-      SparkSession.getActiveSession.orNull.sparkContext)
-    val numInputRows = metrics("numInputRows")
-    val numOutputBatches = metrics("numOutputBatches")
-    val convertTime = metrics("convertTime")
     val numRows = conf.columnBatchSize
     val rddColumnarBatch = input.mapPartitions {
-      it =>
-        RowToVeloxColumnarExec.toColumnarBatchIterator(
-          it,
-          localSchema,
-          numInputRows,
-          numOutputBatches,
-          convertTime,
-          numRows)
+      it => RowToVeloxColumnarExec.toColumnarBatchIterator(it, localSchema, 
numRows)
     }
     convertColumnarBatchToCachedBatch(rddColumnarBatch, schema, storageLevel, 
conf)
   }
@@ -169,22 +155,10 @@ class ColumnarCachedBatchSerializer extends 
CachedBatchSerializer with SQLConfHe
         conf)
     }
 
-    // note, these metrics are unused but just make `VeloxColumnarToRowExec` 
happy
-    val metrics = 
BackendsApiManager.getMetricsApiInstance.genColumnarToRowMetrics(
-      SparkSession.getActiveSession.orNull.sparkContext)
-    val numOutputRows = metrics("numOutputRows")
-    val numInputBatches = metrics("numInputBatches")
-    val convertTime = metrics("convertTime")
     val rddColumnarBatch =
       convertCachedBatchToColumnarBatch(input, cacheAttributes, 
selectedAttributes, conf)
     rddColumnarBatch.mapPartitions {
-      it =>
-        VeloxColumnarToRowExec.toRowIterator(
-          it,
-          selectedAttributes,
-          numOutputRows,
-          numInputBatches,
-          convertTime)
+      it => VeloxColumnarToRowExec.toRowIterator(it, selectedAttributes)
     }
   }
 
diff --git 
a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
 
b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
index 3b78a4067..54994ccd4 100644
--- 
a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
+++ 
b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
@@ -16,10 +16,12 @@
  */
 package org.apache.gluten.columnarbatch;
 
+import org.apache.gluten.execution.RowToVeloxColumnarExec;
 import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators;
 import org.apache.gluten.test.VeloxBackendTestBase;
 import org.apache.gluten.vectorized.ArrowWritableColumnVector;
 
+import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 import org.apache.spark.util.TaskResources$;
@@ -30,6 +32,8 @@ import java.util.Spliterator;
 import java.util.Spliterators;
 import java.util.stream.StreamSupport;
 
+import scala.collection.JavaConverters;
+
 public class ColumnarBatchTest extends VeloxBackendTestBase {
 
   @Test
@@ -95,7 +99,7 @@ public class ColumnarBatchTest extends VeloxBackendTestBase {
   public void testOffloadAndLoadReadRow() {
     TaskResources$.MODULE$.runUnsafe(
         () -> {
-          final int numRows = 100;
+          final int numRows = 20;
           final ColumnarBatch batch = newArrowBatch("a boolean, b int", 
numRows);
           final ArrowWritableColumnVector col0 = (ArrowWritableColumnVector) 
batch.column(0);
           final ArrowWritableColumnVector col1 = (ArrowWritableColumnVector) 
batch.column(1);
@@ -124,6 +128,34 @@ public class ColumnarBatchTest extends 
VeloxBackendTestBase {
         });
   }
 
+  @Test
+  public void testToString() {
+    TaskResources$.MODULE$.runUnsafe(
+        () -> {
+          final int numRows = 20;
+          final ColumnarBatch batch = newArrowBatch("a boolean, b int", 
numRows);
+          final ArrowWritableColumnVector col0 = (ArrowWritableColumnVector) 
batch.column(0);
+          final ArrowWritableColumnVector col1 = (ArrowWritableColumnVector) 
batch.column(1);
+          for (int j = 0; j < numRows; j++) {
+            col0.putBoolean(j, j % 2 == 0);
+            col1.putInt(j, 15 - j);
+          }
+          col1.putNull(numRows - 1);
+          StructType structType = new StructType();
+          structType = structType.add("a", DataTypes.BooleanType, true);
+          structType = structType.add("b", DataTypes.IntegerType, true);
+          ColumnarBatch veloxBatch =
+              RowToVeloxColumnarExec.toColumnarBatchIterator(
+                      JavaConverters.asScalaIterator(batch.rowIterator()), 
structType, numRows)
+                  .next();
+          Assert.assertEquals("[true,15]\n[false,14]", 
ColumnarBatches.toString(veloxBatch, 0, 2));
+          Assert.assertEquals(
+              "[true,-3]\n[false,null]", ColumnarBatches.toString(veloxBatch, 
18, 2));
+          veloxBatch.close();
+          return null;
+        });
+  }
+
   private static ColumnarBatch newArrowBatch(String schema, int numRows) {
     final ArrowWritableColumnVector[] columns =
         ArrowWritableColumnVector.allocateColumns(numRows, 
StructType.fromDDL(schema));
diff --git 
a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
 
b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
index d00efd7b8..fd9c72c36 100644
--- 
a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
+++ 
b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
@@ -16,11 +16,13 @@
  */
 package org.apache.gluten.columnarbatch;
 
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators;
 import org.apache.gluten.runtime.Runtime;
 import org.apache.gluten.runtime.Runtimes;
 import org.apache.gluten.utils.ArrowAbiUtil;
 import org.apache.gluten.utils.ArrowUtil;
 import org.apache.gluten.utils.ImplicitClass;
+import org.apache.gluten.utils.InternalRowUtl;
 import org.apache.gluten.vectorized.ArrowWritableColumnVector;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -31,6 +33,8 @@ import org.apache.arrow.c.Data;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.utils.SparkArrowUtil;
 import org.apache.spark.sql.vectorized.ColumnVector;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 import org.apache.spark.sql.vectorized.ColumnarBatchUtil;
@@ -39,6 +43,8 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
+import scala.collection.JavaConverters;
+
 public class ColumnarBatches {
 
   private ColumnarBatches() {}
@@ -353,4 +359,11 @@ public class ColumnarBatches {
   public static long getNativeHandle(ColumnarBatch batch) {
     return getIndicatorVector(batch).handle();
   }
+
+  public static String toString(ColumnarBatch batch, int start, int length) {
+    ColumnarBatch loadedBatch = 
ensureLoaded(ArrowBufferAllocators.contextInstance(), batch);
+    StructType type = 
SparkArrowUtil.fromArrowSchema(ArrowUtil.toSchema(loadedBatch));
+    return InternalRowUtl.toString(
+        type, JavaConverters.asScalaIterator(loadedBatch.rowIterator()), 
start, length);
+  }
 }
diff --git 
a/shims/spark32/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala 
b/shims/spark32/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
new file mode 100644
index 000000000..32d694371
--- /dev/null
+++ b/shims/spark32/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.types.StructType
+
+object InternalRowUtl {
+  def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
+    val encoder = RowEncoder(struct).resolveAndBind()
+    val deserializer = encoder.createDeserializer()
+    rows.map(deserializer).mkString(System.lineSeparator())
+  }
+
+  def toString(struct: StructType, rows: Iterator[InternalRow], start: Int, 
length: Int): String = {
+    toString(struct, rows.slice(start, start + length))
+  }
+
+}
diff --git 
a/shims/spark33/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala 
b/shims/spark33/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
new file mode 100644
index 000000000..32d694371
--- /dev/null
+++ b/shims/spark33/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.types.StructType
+
+object InternalRowUtl {
+  def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
+    val encoder = RowEncoder(struct).resolveAndBind()
+    val deserializer = encoder.createDeserializer()
+    rows.map(deserializer).mkString(System.lineSeparator())
+  }
+
+  def toString(struct: StructType, rows: Iterator[InternalRow], start: Int, 
length: Int): String = {
+    toString(struct, rows.slice(start, start + length))
+  }
+
+}
diff --git 
a/shims/spark34/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala 
b/shims/spark34/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
new file mode 100644
index 000000000..32d694371
--- /dev/null
+++ b/shims/spark34/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.types.StructType
+
+object InternalRowUtl {
+  def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
+    val encoder = RowEncoder(struct).resolveAndBind()
+    val deserializer = encoder.createDeserializer()
+    rows.map(deserializer).mkString(System.lineSeparator())
+  }
+
+  def toString(struct: StructType, rows: Iterator[InternalRow], start: Int, 
length: Int): String = {
+    toString(struct, rows.slice(start, start + length))
+  }
+
+}
diff --git 
a/shims/spark35/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala 
b/shims/spark35/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
new file mode 100644
index 000000000..654e43cbd
--- /dev/null
+++ b/shims/spark35/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.types.StructType
+
+object InternalRowUtl {
+  def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
+    val encoder = ExpressionEncoder(struct).resolveAndBind()
+    val deserializer = encoder.createDeserializer()
+    rows.map(deserializer).mkString(System.lineSeparator())
+  }
+
+  def toString(struct: StructType, rows: Iterator[InternalRow], start: Int, 
length: Int): String = {
+    toString(struct, rows.slice(start, start + length))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to