This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 0625a75d1 [VL] Minor follow-ups for PRs (#6693)
0625a75d1 is described below
commit 0625a75d1e73bf6f6223d47e30a564a54e3b7b82
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Aug 6 15:39:32 2024 +0800
[VL] Minor follow-ups for PRs (#6693)
---
.../org/apache/gluten/execution/TestOperator.scala | 19 ++---
cpp/velox/tests/CMakeLists.txt | 10 ++-
cpp/velox/tests/VeloxBatchResizerTest.cc | 85 ++++++++++++++++++++++
cpp/velox/utils/VeloxBatchResizer.cc | 15 +++-
.../scala/org/apache/gluten/GlutenConfig.scala | 27 +++----
.../apache/gluten/integration/SparkJvmOptions.java | 42 +++++++++++
.../gluten/integration/command/SparkRunModes.java | 1 +
tools/gluten-it/sbin/gluten-it.sh | 31 +++-----
8 files changed, 175 insertions(+), 55 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index 5ca5087d9..2a2671926 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -900,12 +900,11 @@ class TestOperator extends
VeloxWholeStageTransformerSuite with AdaptiveSparkPla
test("combine small batches before shuffle") {
val minBatchSize = 15
- val maxBatchSize = 100
withSQLConf(
"spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput" ->
"true",
"spark.gluten.sql.columnar.maxBatchSize" -> "2",
-
"spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.range" ->
- s"$minBatchSize~$maxBatchSize"
+
"spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize" ->
+ s"$minBatchSize"
) {
val df = runQueryAndCompare(
"select l_orderkey, sum(l_partkey) as sum from lineitem " +
@@ -921,16 +920,10 @@ class TestOperator extends
VeloxWholeStageTransformerSuite with AdaptiveSparkPla
assert(metrics("numOutputRows").value == 27)
assert(metrics("numOutputBatches").value == 2)
}
- }
- test("split small batches before shuffle") {
- val minBatchSize = 1
- val maxBatchSize = 4
withSQLConf(
"spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput" ->
"true",
- "spark.gluten.sql.columnar.maxBatchSize" -> "100",
-
"spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.range" ->
- s"$minBatchSize~$maxBatchSize"
+ "spark.gluten.sql.columnar.maxBatchSize" -> "2"
) {
val df = runQueryAndCompare(
"select l_orderkey, sum(l_partkey) as sum from lineitem " +
@@ -939,12 +932,12 @@ class TestOperator extends
VeloxWholeStageTransformerSuite with AdaptiveSparkPla
val ops = collect(df.queryExecution.executedPlan) { case p:
VeloxResizeBatchesExec => p }
assert(ops.size == 1)
val op = ops.head
- assert(op.minOutputBatchSize == minBatchSize)
+ assert(op.minOutputBatchSize == 1)
val metrics = op.metrics
assert(metrics("numInputRows").value == 27)
- assert(metrics("numInputBatches").value == 1)
+ assert(metrics("numInputBatches").value == 14)
assert(metrics("numOutputRows").value == 27)
- assert(metrics("numOutputBatches").value == 7)
+ assert(metrics("numOutputBatches").value == 14)
}
}
diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt
index f7bc1cb13..b8ea12e94 100644
--- a/cpp/velox/tests/CMakeLists.txt
+++ b/cpp/velox/tests/CMakeLists.txt
@@ -39,9 +39,13 @@ set(VELOX_TEST_COMMON_SRCS JsonToProtoConverter.cc
FilePathGenerator.cc)
add_velox_test(velox_shuffle_writer_test SOURCES VeloxShuffleWriterTest.cc)
# TODO: ORC is not well supported. add_velox_test(orc_test SOURCES OrcTest.cc)
add_velox_test(
- velox_operators_test SOURCES VeloxColumnarToRowTest.cc
- VeloxRowToColumnarTest.cc VeloxColumnarBatchSerializerTest.cc
- VeloxColumnarBatchTest.cc)
+ velox_operators_test
+ SOURCES
+ VeloxColumnarToRowTest.cc
+ VeloxRowToColumnarTest.cc
+ VeloxColumnarBatchSerializerTest.cc
+ VeloxColumnarBatchTest.cc
+ VeloxBatchResizerTest.cc)
add_velox_test(
velox_plan_conversion_test
SOURCES
diff --git a/cpp/velox/tests/VeloxBatchResizerTest.cc
b/cpp/velox/tests/VeloxBatchResizerTest.cc
new file mode 100644
index 000000000..aecd52f92
--- /dev/null
+++ b/cpp/velox/tests/VeloxBatchResizerTest.cc
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <limits>
+
+#include "utils/VeloxBatchResizer.h"
+#include "velox/vector/tests/utils/VectorTestBase.h"
+
+using namespace facebook::velox;
+
+namespace gluten {
+class ColumnarBatchArray : public ColumnarBatchIterator {
+ public:
+ explicit ColumnarBatchArray(const
std::vector<std::shared_ptr<ColumnarBatch>> batches)
+ : batches_(std::move(batches)) {}
+
+ std::shared_ptr<ColumnarBatch> next() override {
+ if (cursor_ >= batches_.size()) {
+ return nullptr;
+ }
+ return batches_[cursor_++];
+ }
+
+ private:
+ const std::vector<std::shared_ptr<ColumnarBatch>> batches_;
+ int32_t cursor_ = 0;
+};
+
+class VeloxBatchResizerTest : public ::testing::Test, public
test::VectorTestBase {
+ protected:
+ static void SetUpTestCase() {
+ memory::MemoryManager::testingSetInstance({});
+ }
+
+ RowVectorPtr newVector(size_t numRows) {
+ auto constant = makeConstant(1, numRows);
+ auto out =
+ std::make_shared<RowVector>(pool(), ROW({INTEGER()}), nullptr,
numRows, std::vector<VectorPtr>{constant});
+ return out;
+ }
+
+ void checkResize(int32_t min, int32_t max, std::vector<int32_t> inSizes,
std::vector<int32_t> outSizes) {
+ auto inBatches = std::vector<std::shared_ptr<ColumnarBatch>>();
+ for (const auto& size : inSizes) {
+
inBatches.push_back(std::make_shared<VeloxColumnarBatch>(newVector(size)));
+ }
+ VeloxBatchResizer resizer(pool(), min, max,
std::make_unique<ColumnarBatchArray>(std::move(inBatches)));
+ auto actualOutSizes = std::vector<int32_t>();
+ while (true) {
+ auto next = resizer.next();
+ if (next == nullptr) {
+ break;
+ }
+ actualOutSizes.push_back(next->numRows());
+ }
+ ASSERT_EQ(actualOutSizes, outSizes);
+ }
+};
+
+TEST_F(VeloxBatchResizerTest, sanity) {
+ checkResize(100, std::numeric_limits<int32_t>::max(), {30, 50, 30, 40, 30},
{110, 70});
+ checkResize(1, 40, {10, 20, 50, 30, 40, 30}, {10, 20, 40, 10, 30, 40, 30});
+ checkResize(1, 39, {10, 20, 50, 30, 40, 30}, {10, 20, 39, 11, 30, 39, 1,
30});
+ checkResize(40, 40, {10, 20, 50, 30, 40, 30}, {30, 40, 10, 30, 40, 30});
+ checkResize(39, 39, {10, 20, 50, 30, 40, 30}, {30, 39, 11, 30, 39, 1, 30});
+ checkResize(100, 200, {5, 900, 50}, {5, 200, 200, 200, 200, 100, 50});
+ checkResize(100, 200, {5, 900, 30, 80}, {5, 200, 200, 200, 200, 100, 110});
+ checkResize(100, 200, {5, 900, 700}, {5, 200, 200, 200, 200, 100, 200, 200,
200, 100});
+ ASSERT_ANY_THROW(checkResize(0, 0, {}, {}));
+}
+} // namespace gluten
diff --git a/cpp/velox/utils/VeloxBatchResizer.cc
b/cpp/velox/utils/VeloxBatchResizer.cc
index 7b5146306..564292994 100644
--- a/cpp/velox/utils/VeloxBatchResizer.cc
+++ b/cpp/velox/utils/VeloxBatchResizer.cc
@@ -23,9 +23,7 @@ namespace {
class SliceRowVector : public ColumnarBatchIterator {
public:
SliceRowVector(int32_t maxOutputBatchSize, facebook::velox::RowVectorPtr in)
- : maxOutputBatchSize_(maxOutputBatchSize), in_(in) {
- GLUTEN_CHECK(in->size() > maxOutputBatchSize, "Invalid state");
- }
+ : maxOutputBatchSize_(maxOutputBatchSize), in_(in) {}
std::shared_ptr<ColumnarBatch> next() override {
int32_t remainingLength = in_->size() - cursor_;
@@ -55,7 +53,11 @@ gluten::VeloxBatchResizer::VeloxBatchResizer(
: pool_(pool),
minOutputBatchSize_(minOutputBatchSize),
maxOutputBatchSize_(maxOutputBatchSize),
- in_(std::move(in)) {}
+ in_(std::move(in)) {
+ GLUTEN_CHECK(
+ minOutputBatchSize_ > 0 && maxOutputBatchSize_ > 0,
+ "Either minOutputBatchSize or maxOutputBatchSize should be larger than
0");
+}
std::shared_ptr<ColumnarBatch> VeloxBatchResizer::next() {
if (next_) {
@@ -82,6 +84,11 @@ std::shared_ptr<ColumnarBatch> VeloxBatchResizer::next() {
for (auto nextCb = in_->next(); nextCb != nullptr; nextCb = in_->next()) {
auto nextVb = VeloxColumnarBatch::from(pool_, nextCb);
auto nextRv = nextVb->getRowVector();
+ if (buffer->size() + nextRv->size() > maxOutputBatchSize_) {
+ GLUTEN_CHECK(next_ == nullptr, "Invalid state");
+ next_ = std::make_unique<SliceRowVector>(maxOutputBatchSize_, nextRv);
+ return std::make_shared<VeloxColumnarBatch>(buffer);
+ }
buffer->append(nextRv.get());
if (buffer->size() >= minOutputBatchSize_) {
// Buffer is full.
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index ed7a81192..1310b75a1 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -187,7 +187,6 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def columnarShuffleCompressionThreshold: Int =
conf.getConf(COLUMNAR_SHUFFLE_COMPRESSION_THRESHOLD)
- // FIXME: Not clear: MIN or MAX ?
def maxBatchSize: Int = conf.getConf(COLUMNAR_MAX_BATCH_SIZE)
def columnarToRowMemThreshold: Long =
@@ -329,12 +328,11 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def veloxResizeBatchesShuffleInputRange: ResizeRange = {
val standardSize = conf.getConf(COLUMNAR_MAX_BATCH_SIZE)
- val defaultRange: ResizeRange =
- ResizeRange((0.25 * standardSize).toInt.max(1), 4 * standardSize)
- conf
- .getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_RANGE)
- .map(ResizeRange.parse)
- .getOrElse(defaultRange)
+ val defaultMinSize: Int = (0.25 * standardSize).toInt.max(1)
+ val minSize = conf
+ .getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE)
+ .getOrElse(defaultMinSize)
+ ResizeRange(minSize, Int.MaxValue)
}
def chColumnarShuffleSpillThreshold: Long = {
@@ -1492,17 +1490,16 @@ object GlutenConfig {
.booleanConf
.createWithDefault(true)
- val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_RANGE =
-
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.range")
+ val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE =
+
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize")
.internal()
.doc(
- s"The minimum and maximum batch sizes for shuffle. If the batch size
is " +
- s"smaller / bigger than minimum / maximum value, it will be combined
with other " +
- s"batches / split before sending to shuffle. Only functions when " +
+ s"The minimum batch size for shuffle. If size of an input batch is " +
+ s"smaller than the value, it will be combined with other " +
+ s"batches before sending to shuffle. Only functions when " +
s"${COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT.key} is set to true.
" +
- s"A valid value for the option is min~max. " +
- s"E.g., s.g.s.c.b.v.resizeBatches.shuffleInput.range=100~10000")
- .stringConf
+ s"Default value: 0.25 * <max batch size>")
+ .intConf
.createOptional
val COLUMNAR_CH_SHUFFLE_SPILL_THRESHOLD =
diff --git
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/SparkJvmOptions.java
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/SparkJvmOptions.java
new file mode 100644
index 000000000..85c0912fd
--- /dev/null
+++
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/SparkJvmOptions.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gluten.integration;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+public class SparkJvmOptions {
+ private static final String MODULE_OPTIONS_CLASS_NAME =
"org.apache.spark.launcher.JavaModuleOptions";
+
+ public static String read() {
+ try {
+ final Class<?> clazz =
Class.forName("org.apache.spark.launcher.JavaModuleOptions");
+ final Method method = clazz.getMethod("defaultModuleOptions");
+ return (String) method.invoke(null);
+ } catch (ClassNotFoundException e) {
+ // Could happen in Spark 3.2 which doesn't have this class yet.
+ return "";
+ } catch (NoSuchMethodException | InvocationTargetException |
IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void main(String[] args) {
+ System.out.println(read());
+ }
+}
diff --git
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java
index 6750b90e9..d186b5d0b 100644
---
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java
+++
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java
@@ -332,6 +332,7 @@ public final class SparkRunModes {
@Override
public Map<String, String> extraSparkConf() {
final Map<String, String> extras = new HashMap<>();
+ extras.put(SparkLauncher.EXECUTOR_DEFAULT_JAVA_OPTIONS,
"-Dio.netty.tryReflectionSetAccessible=true");
extras.put(SparkLauncher.EXECUTOR_CORES,
String.valueOf(resourceEnumeration.lcExecutorCores()));
extras.put(SparkLauncher.EXECUTOR_MEMORY, String.format("%dm",
resourceEnumeration.lcExecutorHeapMem()));
extras.put("spark.memory.offHeap.enabled", "true");
diff --git a/tools/gluten-it/sbin/gluten-it.sh
b/tools/gluten-it/sbin/gluten-it.sh
index 00ff78e34..8c1a6413b 100755
--- a/tools/gluten-it/sbin/gluten-it.sh
+++ b/tools/gluten-it/sbin/gluten-it.sh
@@ -16,8 +16,6 @@
set -euf
-GLUTEN_IT_JVM_ARGS=${GLUTEN_IT_JVM_ARGS:-"-Xmx2G
-XX:ErrorFile=/var/log/java/hs_err_pid%p.log"}
-
BASEDIR=$(dirname $0)
LIB_DIR=$BASEDIR/../package/target/lib
@@ -28,32 +26,25 @@ fi
JAR_PATH=$LIB_DIR/*
+SPARK_JVM_OPTIONS=$($JAVA_HOME/bin/java -cp $JAR_PATH
org.apache.gluten.integration.SparkJvmOptions)
+
EMBEDDED_SPARK_HOME=$BASEDIR/../spark-home
+# We temporarily disallow setting these two variables by caller.
+SPARK_HOME=""
+SPARK_SCALA_VERSION=""
export SPARK_HOME=${SPARK_HOME:-$EMBEDDED_SPARK_HOME}
export SPARK_SCALA_VERSION=${SPARK_SCALA_VERSION:-'2.12'}
echo "SPARK_HOME set at [$SPARK_HOME]."
echo "SPARK_SCALA_VERSION set at [$SPARK_SCALA_VERSION]."
-$JAVA_HOME/bin/java $GLUTEN_IT_JVM_ARGS \
- -XX:+IgnoreUnrecognizedVMOptions \
- --add-opens=java.base/java.lang=ALL-UNNAMED \
- --add-opens=java.base/java.lang.invoke=ALL-UNNAMED \
- --add-opens=java.base/java.lang.reflect=ALL-UNNAMED \
- --add-opens=java.base/java.io=ALL-UNNAMED \
- --add-opens=java.base/java.net=ALL-UNNAMED \
- --add-opens=java.base/java.nio=ALL-UNNAMED \
- --add-opens=java.base/java.util=ALL-UNNAMED \
- --add-opens=java.base/java.util.concurrent=ALL-UNNAMED \
- --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED \
- --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED \
- --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED \
- --add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
- --add-opens=java.base/sun.nio.cs=ALL-UNNAMED \
- --add-opens=java.base/sun.security.action=ALL-UNNAMED \
- --add-opens=java.base/sun.util.calendar=ALL-UNNAMED \
- -Djdk.reflect.useDirectMethodHandle=false \
+GLUTEN_IT_JVM_ARGS=${GLUTEN_IT_JVM_ARGS:-"-Xmx2G"}
+
+$JAVA_HOME/bin/java \
+ $SPARK_JVM_OPTIONS \
+ $GLUTEN_IT_JVM_ARGS \
+ -XX:ErrorFile=/var/log/java/hs_err_pid%p.log \
-Dio.netty.tryReflectionSetAccessible=true \
-cp $JAR_PATH \
org.apache.gluten.integration.Cli $@
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]