This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 6d1de90ab [VL] Add helper function ColumnarBatches.toString and
InternalRow toString (#6458)
6d1de90ab is described below
commit 6d1de90ab6e982c6b47a26f3aef209cc69584dbd
Author: Jin Chengcheng <[email protected]>
AuthorDate: Tue Aug 20 08:49:30 2024 +0800
[VL] Add helper function ColumnarBatches.toString and InternalRow toString
(#6458)
---
.../gluten/datasource/ArrowCSVFileFormat.scala | 8 -----
.../gluten/execution/RowToVeloxColumnarExec.scala | 17 +++++++++++
.../gluten/execution/VeloxColumnarToRowExec.scala | 16 ++++++++++
.../execution/ColumnarCachedBatchSerializer.scala | 30 ++-----------------
.../gluten/columnarbatch/ColumnarBatchTest.java | 34 +++++++++++++++++++++-
.../gluten/columnarbatch/ColumnarBatches.java | 13 +++++++++
.../org/apache/gluten/utils/InternalRowUtl.scala | 34 ++++++++++++++++++++++
.../org/apache/gluten/utils/InternalRowUtl.scala | 34 ++++++++++++++++++++++
.../org/apache/gluten/utils/InternalRowUtl.scala | 34 ++++++++++++++++++++++
.../org/apache/gluten/utils/InternalRowUtl.scala | 33 +++++++++++++++++++++
10 files changed, 216 insertions(+), 37 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
index a8e65b053..5629811f4 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
@@ -34,7 +34,6 @@ import
org.apache.spark.sql.catalyst.expressions.{AttributeReference, JoinedRow}
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources.{FileFormat,
HadoopFileLinesReader, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
-import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types.{StructField, StructType}
@@ -310,16 +309,9 @@ object ArrowCSVFileFormat {
schema: StructType,
batchSize: Int,
it: Iterator[InternalRow]): Iterator[ColumnarBatch] = {
- // note, these metrics are unused but just make `RowToVeloxColumnarExec`
happy
- val numInputRows = new SQLMetric("numInputRows")
- val numOutputBatches = new SQLMetric("numOutputBatches")
- val convertTime = new SQLMetric("convertTime")
val veloxBatch = RowToVeloxColumnarExec.toColumnarBatchIterator(
it,
schema,
- numInputRows,
- numOutputBatches,
- convertTime,
batchSize
)
veloxBatch
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
index 9ceb6b5b6..aa30cc80d 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
@@ -92,6 +92,23 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends
RowToColumnarExecBas
}
object RowToVeloxColumnarExec {
+
+ def toColumnarBatchIterator(
+ it: Iterator[InternalRow],
+ schema: StructType,
+ columnBatchSize: Int): Iterator[ColumnarBatch] = {
+ val numInputRows = new SQLMetric("numInputRows")
+ val numOutputBatches = new SQLMetric("numOutputBatches")
+ val convertTime = new SQLMetric("convertTime")
+ RowToVeloxColumnarExec.toColumnarBatchIterator(
+ it,
+ schema,
+ numInputRows,
+ numOutputBatches,
+ convertTime,
+ columnBatchSize)
+ }
+
def toColumnarBatchIterator(
it: Iterator[InternalRow],
schema: StructType,
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
index d3fb9c3ff..4bd553b01 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
@@ -99,6 +99,22 @@ case class VeloxColumnarToRowExec(child: SparkPlan) extends
ColumnarToRowExecBas
}
object VeloxColumnarToRowExec {
+
+ def toRowIterator(
+ batches: Iterator[ColumnarBatch],
+ output: Seq[Attribute]): Iterator[InternalRow] = {
+ val numOutputRows = new SQLMetric("numOutputRows")
+ val numInputBatches = new SQLMetric("numInputBatches")
+ val convertTime = new SQLMetric("convertTime")
+ toRowIterator(
+ batches,
+ output,
+ numOutputRows,
+ numInputBatches,
+ convertTime
+ )
+ }
+
def toRowIterator(
batches: Iterator[ColumnarBatch],
output: Seq[Attribute],
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
index db9e75a05..7f4235fdf 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
@@ -28,7 +28,6 @@ import
org.apache.gluten.vectorized.ColumnarBatchSerializerJniWrapper
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer}
@@ -134,22 +133,9 @@ class ColumnarCachedBatchSerializer extends
CachedBatchSerializer with SQLConfHe
conf)
}
- // note, these metrics are unused but just make `RowToVeloxColumnarExec`
happy
- val metrics =
BackendsApiManager.getMetricsApiInstance.genRowToColumnarMetrics(
- SparkSession.getActiveSession.orNull.sparkContext)
- val numInputRows = metrics("numInputRows")
- val numOutputBatches = metrics("numOutputBatches")
- val convertTime = metrics("convertTime")
val numRows = conf.columnBatchSize
val rddColumnarBatch = input.mapPartitions {
- it =>
- RowToVeloxColumnarExec.toColumnarBatchIterator(
- it,
- localSchema,
- numInputRows,
- numOutputBatches,
- convertTime,
- numRows)
+ it => RowToVeloxColumnarExec.toColumnarBatchIterator(it, localSchema,
numRows)
}
convertColumnarBatchToCachedBatch(rddColumnarBatch, schema, storageLevel,
conf)
}
@@ -169,22 +155,10 @@ class ColumnarCachedBatchSerializer extends
CachedBatchSerializer with SQLConfHe
conf)
}
- // note, these metrics are unused but just make `VeloxColumnarToRowExec`
happy
- val metrics =
BackendsApiManager.getMetricsApiInstance.genColumnarToRowMetrics(
- SparkSession.getActiveSession.orNull.sparkContext)
- val numOutputRows = metrics("numOutputRows")
- val numInputBatches = metrics("numInputBatches")
- val convertTime = metrics("convertTime")
val rddColumnarBatch =
convertCachedBatchToColumnarBatch(input, cacheAttributes,
selectedAttributes, conf)
rddColumnarBatch.mapPartitions {
- it =>
- VeloxColumnarToRowExec.toRowIterator(
- it,
- selectedAttributes,
- numOutputRows,
- numInputBatches,
- convertTime)
+ it => VeloxColumnarToRowExec.toRowIterator(it, selectedAttributes)
}
}
diff --git
a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
index 3b78a4067..54994ccd4 100644
---
a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
+++
b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
@@ -16,10 +16,12 @@
*/
package org.apache.gluten.columnarbatch;
+import org.apache.gluten.execution.RowToVeloxColumnarExec;
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators;
import org.apache.gluten.test.VeloxBackendTestBase;
import org.apache.gluten.vectorized.ArrowWritableColumnVector;
+import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.util.TaskResources$;
@@ -30,6 +32,8 @@ import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.StreamSupport;
+import scala.collection.JavaConverters;
+
public class ColumnarBatchTest extends VeloxBackendTestBase {
@Test
@@ -95,7 +99,7 @@ public class ColumnarBatchTest extends VeloxBackendTestBase {
public void testOffloadAndLoadReadRow() {
TaskResources$.MODULE$.runUnsafe(
() -> {
- final int numRows = 100;
+ final int numRows = 20;
final ColumnarBatch batch = newArrowBatch("a boolean, b int",
numRows);
final ArrowWritableColumnVector col0 = (ArrowWritableColumnVector)
batch.column(0);
final ArrowWritableColumnVector col1 = (ArrowWritableColumnVector)
batch.column(1);
@@ -124,6 +128,34 @@ public class ColumnarBatchTest extends
VeloxBackendTestBase {
});
}
+ @Test
+ public void testToString() {
+ TaskResources$.MODULE$.runUnsafe(
+ () -> {
+ final int numRows = 20;
+ final ColumnarBatch batch = newArrowBatch("a boolean, b int",
numRows);
+ final ArrowWritableColumnVector col0 = (ArrowWritableColumnVector)
batch.column(0);
+ final ArrowWritableColumnVector col1 = (ArrowWritableColumnVector)
batch.column(1);
+ for (int j = 0; j < numRows; j++) {
+ col0.putBoolean(j, j % 2 == 0);
+ col1.putInt(j, 15 - j);
+ }
+ col1.putNull(numRows - 1);
+ StructType structType = new StructType();
+ structType = structType.add("a", DataTypes.BooleanType, true);
+ structType = structType.add("b", DataTypes.IntegerType, true);
+ ColumnarBatch veloxBatch =
+ RowToVeloxColumnarExec.toColumnarBatchIterator(
+ JavaConverters.asScalaIterator(batch.rowIterator()),
structType, numRows)
+ .next();
+ Assert.assertEquals("[true,15]\n[false,14]",
ColumnarBatches.toString(veloxBatch, 0, 2));
+ Assert.assertEquals(
+ "[true,-3]\n[false,null]", ColumnarBatches.toString(veloxBatch,
18, 2));
+ veloxBatch.close();
+ return null;
+ });
+ }
+
private static ColumnarBatch newArrowBatch(String schema, int numRows) {
final ArrowWritableColumnVector[] columns =
ArrowWritableColumnVector.allocateColumns(numRows,
StructType.fromDDL(schema));
diff --git
a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
index d00efd7b8..fd9c72c36 100644
---
a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
+++
b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
@@ -16,11 +16,13 @@
*/
package org.apache.gluten.columnarbatch;
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators;
import org.apache.gluten.runtime.Runtime;
import org.apache.gluten.runtime.Runtimes;
import org.apache.gluten.utils.ArrowAbiUtil;
import org.apache.gluten.utils.ArrowUtil;
import org.apache.gluten.utils.ImplicitClass;
+import org.apache.gluten.utils.InternalRowUtl;
import org.apache.gluten.vectorized.ArrowWritableColumnVector;
import com.google.common.annotations.VisibleForTesting;
@@ -31,6 +33,8 @@ import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.utils.SparkArrowUtil;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.vectorized.ColumnarBatchUtil;
@@ -39,6 +43,8 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.NoSuchElementException;
+import scala.collection.JavaConverters;
+
public class ColumnarBatches {
private ColumnarBatches() {}
@@ -353,4 +359,11 @@ public class ColumnarBatches {
public static long getNativeHandle(ColumnarBatch batch) {
return getIndicatorVector(batch).handle();
}
+
+ public static String toString(ColumnarBatch batch, int start, int length) {
+ ColumnarBatch loadedBatch =
ensureLoaded(ArrowBufferAllocators.contextInstance(), batch);
+ StructType type =
SparkArrowUtil.fromArrowSchema(ArrowUtil.toSchema(loadedBatch));
+ return InternalRowUtl.toString(
+ type, JavaConverters.asScalaIterator(loadedBatch.rowIterator()),
start, length);
+ }
}
diff --git
a/shims/spark32/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
b/shims/spark32/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
new file mode 100644
index 000000000..32d694371
--- /dev/null
+++ b/shims/spark32/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.types.StructType
+
+object InternalRowUtl {
+ def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
+ val encoder = RowEncoder(struct).resolveAndBind()
+ val deserializer = encoder.createDeserializer()
+ rows.map(deserializer).mkString(System.lineSeparator())
+ }
+
+ def toString(struct: StructType, rows: Iterator[InternalRow], start: Int,
length: Int): String = {
+ toString(struct, rows.slice(start, start + length))
+ }
+
+}
diff --git
a/shims/spark33/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
b/shims/spark33/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
new file mode 100644
index 000000000..32d694371
--- /dev/null
+++ b/shims/spark33/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.types.StructType
+
+object InternalRowUtl {
+ def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
+ val encoder = RowEncoder(struct).resolveAndBind()
+ val deserializer = encoder.createDeserializer()
+ rows.map(deserializer).mkString(System.lineSeparator())
+ }
+
+ def toString(struct: StructType, rows: Iterator[InternalRow], start: Int,
length: Int): String = {
+ toString(struct, rows.slice(start, start + length))
+ }
+
+}
diff --git
a/shims/spark34/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
b/shims/spark34/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
new file mode 100644
index 000000000..32d694371
--- /dev/null
+++ b/shims/spark34/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.types.StructType
+
+object InternalRowUtl {
+ def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
+ val encoder = RowEncoder(struct).resolveAndBind()
+ val deserializer = encoder.createDeserializer()
+ rows.map(deserializer).mkString(System.lineSeparator())
+ }
+
+ def toString(struct: StructType, rows: Iterator[InternalRow], start: Int,
length: Int): String = {
+ toString(struct, rows.slice(start, start + length))
+ }
+
+}
diff --git
a/shims/spark35/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
b/shims/spark35/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
new file mode 100644
index 000000000..654e43cbd
--- /dev/null
+++ b/shims/spark35/src/main/scala/org/apache/gluten/utils/InternalRowUtl.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.types.StructType
+
+object InternalRowUtl {
+ def toString(struct: StructType, rows: Iterator[InternalRow]): String = {
+ val encoder = ExpressionEncoder(struct).resolveAndBind()
+ val deserializer = encoder.createDeserializer()
+ rows.map(deserializer).mkString(System.lineSeparator())
+ }
+
+ def toString(struct: StructType, rows: Iterator[InternalRow], start: Int,
length: Int): String = {
+ toString(struct, rows.slice(start, start + length))
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]