This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 0625a75d1 [VL] Minor follow-ups for PRs (#6693)
0625a75d1 is described below

commit 0625a75d1e73bf6f6223d47e30a564a54e3b7b82
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Aug 6 15:39:32 2024 +0800

    [VL] Minor follow-ups for PRs (#6693)
---
 .../org/apache/gluten/execution/TestOperator.scala | 19 ++---
 cpp/velox/tests/CMakeLists.txt                     | 10 ++-
 cpp/velox/tests/VeloxBatchResizerTest.cc           | 85 ++++++++++++++++++++++
 cpp/velox/utils/VeloxBatchResizer.cc               | 15 +++-
 .../scala/org/apache/gluten/GlutenConfig.scala     | 27 +++----
 .../apache/gluten/integration/SparkJvmOptions.java | 42 +++++++++++
 .../gluten/integration/command/SparkRunModes.java  |  1 +
 tools/gluten-it/sbin/gluten-it.sh                  | 31 +++-----
 8 files changed, 175 insertions(+), 55 deletions(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala 
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index 5ca5087d9..2a2671926 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -900,12 +900,11 @@ class TestOperator extends 
VeloxWholeStageTransformerSuite with AdaptiveSparkPla
 
   test("combine small batches before shuffle") {
     val minBatchSize = 15
-    val maxBatchSize = 100
     withSQLConf(
       "spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput" -> 
"true",
       "spark.gluten.sql.columnar.maxBatchSize" -> "2",
-      
"spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.range" ->
-        s"$minBatchSize~$maxBatchSize"
+      
"spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize" ->
+        s"$minBatchSize"
     ) {
       val df = runQueryAndCompare(
         "select l_orderkey, sum(l_partkey) as sum from lineitem " +
@@ -921,16 +920,10 @@ class TestOperator extends 
VeloxWholeStageTransformerSuite with AdaptiveSparkPla
       assert(metrics("numOutputRows").value == 27)
       assert(metrics("numOutputBatches").value == 2)
     }
-  }
 
-  test("split small batches before shuffle") {
-    val minBatchSize = 1
-    val maxBatchSize = 4
     withSQLConf(
       "spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput" -> 
"true",
-      "spark.gluten.sql.columnar.maxBatchSize" -> "100",
-      
"spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.range" ->
-        s"$minBatchSize~$maxBatchSize"
+      "spark.gluten.sql.columnar.maxBatchSize" -> "2"
     ) {
       val df = runQueryAndCompare(
         "select l_orderkey, sum(l_partkey) as sum from lineitem " +
@@ -939,12 +932,12 @@ class TestOperator extends 
VeloxWholeStageTransformerSuite with AdaptiveSparkPla
       val ops = collect(df.queryExecution.executedPlan) { case p: 
VeloxResizeBatchesExec => p }
       assert(ops.size == 1)
       val op = ops.head
-      assert(op.minOutputBatchSize == minBatchSize)
+      assert(op.minOutputBatchSize == 1)
       val metrics = op.metrics
       assert(metrics("numInputRows").value == 27)
-      assert(metrics("numInputBatches").value == 1)
+      assert(metrics("numInputBatches").value == 14)
       assert(metrics("numOutputRows").value == 27)
-      assert(metrics("numOutputBatches").value == 7)
+      assert(metrics("numOutputBatches").value == 14)
     }
   }
 
diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt
index f7bc1cb13..b8ea12e94 100644
--- a/cpp/velox/tests/CMakeLists.txt
+++ b/cpp/velox/tests/CMakeLists.txt
@@ -39,9 +39,13 @@ set(VELOX_TEST_COMMON_SRCS JsonToProtoConverter.cc 
FilePathGenerator.cc)
 add_velox_test(velox_shuffle_writer_test SOURCES VeloxShuffleWriterTest.cc)
 # TODO: ORC is not well supported. add_velox_test(orc_test SOURCES OrcTest.cc)
 add_velox_test(
-  velox_operators_test SOURCES VeloxColumnarToRowTest.cc
-  VeloxRowToColumnarTest.cc VeloxColumnarBatchSerializerTest.cc
-  VeloxColumnarBatchTest.cc)
+  velox_operators_test
+  SOURCES
+  VeloxColumnarToRowTest.cc
+  VeloxRowToColumnarTest.cc
+  VeloxColumnarBatchSerializerTest.cc
+  VeloxColumnarBatchTest.cc
+  VeloxBatchResizerTest.cc)
 add_velox_test(
   velox_plan_conversion_test
   SOURCES
diff --git a/cpp/velox/tests/VeloxBatchResizerTest.cc 
b/cpp/velox/tests/VeloxBatchResizerTest.cc
new file mode 100644
index 000000000..aecd52f92
--- /dev/null
+++ b/cpp/velox/tests/VeloxBatchResizerTest.cc
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <limits>
+
+#include "utils/VeloxBatchResizer.h"
+#include "velox/vector/tests/utils/VectorTestBase.h"
+
+using namespace facebook::velox;
+
+namespace gluten {
+class ColumnarBatchArray : public ColumnarBatchIterator {
+ public:
+  explicit ColumnarBatchArray(const 
std::vector<std::shared_ptr<ColumnarBatch>> batches)
+      : batches_(std::move(batches)) {}
+
+  std::shared_ptr<ColumnarBatch> next() override {
+    if (cursor_ >= batches_.size()) {
+      return nullptr;
+    }
+    return batches_[cursor_++];
+  }
+
+ private:
+  const std::vector<std::shared_ptr<ColumnarBatch>> batches_;
+  int32_t cursor_ = 0;
+};
+
+class VeloxBatchResizerTest : public ::testing::Test, public 
test::VectorTestBase {
+ protected:
+  static void SetUpTestCase() {
+    memory::MemoryManager::testingSetInstance({});
+  }
+
+  RowVectorPtr newVector(size_t numRows) {
+    auto constant = makeConstant(1, numRows);
+    auto out =
+        std::make_shared<RowVector>(pool(), ROW({INTEGER()}), nullptr, 
numRows, std::vector<VectorPtr>{constant});
+    return out;
+  }
+
+  void checkResize(int32_t min, int32_t max, std::vector<int32_t> inSizes, 
std::vector<int32_t> outSizes) {
+    auto inBatches = std::vector<std::shared_ptr<ColumnarBatch>>();
+    for (const auto& size : inSizes) {
+      
inBatches.push_back(std::make_shared<VeloxColumnarBatch>(newVector(size)));
+    }
+    VeloxBatchResizer resizer(pool(), min, max, 
std::make_unique<ColumnarBatchArray>(std::move(inBatches)));
+    auto actualOutSizes = std::vector<int32_t>();
+    while (true) {
+      auto next = resizer.next();
+      if (next == nullptr) {
+        break;
+      }
+      actualOutSizes.push_back(next->numRows());
+    }
+    ASSERT_EQ(actualOutSizes, outSizes);
+  }
+};
+
+TEST_F(VeloxBatchResizerTest, sanity) {
+  checkResize(100, std::numeric_limits<int32_t>::max(), {30, 50, 30, 40, 30}, 
{110, 70});
+  checkResize(1, 40, {10, 20, 50, 30, 40, 30}, {10, 20, 40, 10, 30, 40, 30});
+  checkResize(1, 39, {10, 20, 50, 30, 40, 30}, {10, 20, 39, 11, 30, 39, 1, 
30});
+  checkResize(40, 40, {10, 20, 50, 30, 40, 30}, {30, 40, 10, 30, 40, 30});
+  checkResize(39, 39, {10, 20, 50, 30, 40, 30}, {30, 39, 11, 30, 39, 1, 30});
+  checkResize(100, 200, {5, 900, 50}, {5, 200, 200, 200, 200, 100, 50});
+  checkResize(100, 200, {5, 900, 30, 80}, {5, 200, 200, 200, 200, 100, 110});
+  checkResize(100, 200, {5, 900, 700}, {5, 200, 200, 200, 200, 100, 200, 200, 
200, 100});
+  ASSERT_ANY_THROW(checkResize(0, 0, {}, {}));
+}
+} // namespace gluten
diff --git a/cpp/velox/utils/VeloxBatchResizer.cc 
b/cpp/velox/utils/VeloxBatchResizer.cc
index 7b5146306..564292994 100644
--- a/cpp/velox/utils/VeloxBatchResizer.cc
+++ b/cpp/velox/utils/VeloxBatchResizer.cc
@@ -23,9 +23,7 @@ namespace {
 class SliceRowVector : public ColumnarBatchIterator {
  public:
   SliceRowVector(int32_t maxOutputBatchSize, facebook::velox::RowVectorPtr in)
-      : maxOutputBatchSize_(maxOutputBatchSize), in_(in) {
-    GLUTEN_CHECK(in->size() > maxOutputBatchSize, "Invalid state");
-  }
+      : maxOutputBatchSize_(maxOutputBatchSize), in_(in) {}
 
   std::shared_ptr<ColumnarBatch> next() override {
     int32_t remainingLength = in_->size() - cursor_;
@@ -55,7 +53,11 @@ gluten::VeloxBatchResizer::VeloxBatchResizer(
     : pool_(pool),
       minOutputBatchSize_(minOutputBatchSize),
       maxOutputBatchSize_(maxOutputBatchSize),
-      in_(std::move(in)) {}
+      in_(std::move(in)) {
+  GLUTEN_CHECK(
+      minOutputBatchSize_ > 0 && maxOutputBatchSize_ > 0,
+      "Either minOutputBatchSize or maxOutputBatchSize should be larger than 
0");
+}
 
 std::shared_ptr<ColumnarBatch> VeloxBatchResizer::next() {
   if (next_) {
@@ -82,6 +84,11 @@ std::shared_ptr<ColumnarBatch> VeloxBatchResizer::next() {
     for (auto nextCb = in_->next(); nextCb != nullptr; nextCb = in_->next()) {
       auto nextVb = VeloxColumnarBatch::from(pool_, nextCb);
       auto nextRv = nextVb->getRowVector();
+      if (buffer->size() + nextRv->size() > maxOutputBatchSize_) {
+        GLUTEN_CHECK(next_ == nullptr, "Invalid state");
+        next_ = std::make_unique<SliceRowVector>(maxOutputBatchSize_, nextRv);
+        return std::make_shared<VeloxColumnarBatch>(buffer);
+      }
       buffer->append(nextRv.get());
       if (buffer->size() >= minOutputBatchSize_) {
         // Buffer is full.
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index ed7a81192..1310b75a1 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -187,7 +187,6 @@ class GlutenConfig(conf: SQLConf) extends Logging {
   def columnarShuffleCompressionThreshold: Int =
     conf.getConf(COLUMNAR_SHUFFLE_COMPRESSION_THRESHOLD)
 
-  // FIXME: Not clear: MIN or MAX ?
   def maxBatchSize: Int = conf.getConf(COLUMNAR_MAX_BATCH_SIZE)
 
   def columnarToRowMemThreshold: Long =
@@ -329,12 +328,11 @@ class GlutenConfig(conf: SQLConf) extends Logging {
 
   def veloxResizeBatchesShuffleInputRange: ResizeRange = {
     val standardSize = conf.getConf(COLUMNAR_MAX_BATCH_SIZE)
-    val defaultRange: ResizeRange =
-      ResizeRange((0.25 * standardSize).toInt.max(1), 4 * standardSize)
-    conf
-      .getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_RANGE)
-      .map(ResizeRange.parse)
-      .getOrElse(defaultRange)
+    val defaultMinSize: Int = (0.25 * standardSize).toInt.max(1)
+    val minSize = conf
+      .getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE)
+      .getOrElse(defaultMinSize)
+    ResizeRange(minSize, Int.MaxValue)
   }
 
   def chColumnarShuffleSpillThreshold: Long = {
@@ -1492,17 +1490,16 @@ object GlutenConfig {
       .booleanConf
       .createWithDefault(true)
 
-  val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_RANGE =
-    
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.range")
+  val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE =
+    
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize")
       .internal()
       .doc(
-        s"The minimum and maximum batch sizes for shuffle. If the batch size 
is " +
-          s"smaller / bigger than minimum / maximum value, it will be combined 
with other " +
-          s"batches / split before sending to shuffle. Only functions when " +
+        s"The minimum batch size for shuffle. If size of an input batch is " +
+          s"smaller than the value, it will be combined with other " +
+          s"batches before sending to shuffle. Only functions when " +
           s"${COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT.key} is set to true. 
" +
-          s"A valid value for the option is min~max. " +
-          s"E.g., s.g.s.c.b.v.resizeBatches.shuffleInput.range=100~10000")
-      .stringConf
+          s"Default value: 0.25 * <max batch size>")
+      .intConf
       .createOptional
 
   val COLUMNAR_CH_SHUFFLE_SPILL_THRESHOLD =
diff --git 
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/SparkJvmOptions.java
 
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/SparkJvmOptions.java
new file mode 100644
index 000000000..85c0912fd
--- /dev/null
+++ 
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/SparkJvmOptions.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gluten.integration;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+public class SparkJvmOptions {
+  private static final String MODULE_OPTIONS_CLASS_NAME = 
"org.apache.spark.launcher.JavaModuleOptions";
+
+  public static String read() {
+    try {
+      final Class<?> clazz = 
Class.forName("org.apache.spark.launcher.JavaModuleOptions");
+      final Method method = clazz.getMethod("defaultModuleOptions");
+      return (String) method.invoke(null);
+    } catch (ClassNotFoundException e) {
+      // Could happen in Spark 3.2 which doesn't have this class yet.
+      return "";
+    } catch (NoSuchMethodException | InvocationTargetException | 
IllegalAccessException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static void main(String[] args) {
+    System.out.println(read());
+  }
+}
diff --git 
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java
 
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java
index 6750b90e9..d186b5d0b 100644
--- 
a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java
+++ 
b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/command/SparkRunModes.java
@@ -332,6 +332,7 @@ public final class SparkRunModes {
     @Override
     public Map<String, String> extraSparkConf() {
       final Map<String, String> extras = new HashMap<>();
+      extras.put(SparkLauncher.EXECUTOR_DEFAULT_JAVA_OPTIONS, 
"-Dio.netty.tryReflectionSetAccessible=true");
       extras.put(SparkLauncher.EXECUTOR_CORES, 
String.valueOf(resourceEnumeration.lcExecutorCores()));
       extras.put(SparkLauncher.EXECUTOR_MEMORY, String.format("%dm", 
resourceEnumeration.lcExecutorHeapMem()));
       extras.put("spark.memory.offHeap.enabled", "true");
diff --git a/tools/gluten-it/sbin/gluten-it.sh 
b/tools/gluten-it/sbin/gluten-it.sh
index 00ff78e34..8c1a6413b 100755
--- a/tools/gluten-it/sbin/gluten-it.sh
+++ b/tools/gluten-it/sbin/gluten-it.sh
@@ -16,8 +16,6 @@
 
 set -euf
 
-GLUTEN_IT_JVM_ARGS=${GLUTEN_IT_JVM_ARGS:-"-Xmx2G 
-XX:ErrorFile=/var/log/java/hs_err_pid%p.log"}
-
 BASEDIR=$(dirname $0)
 
 LIB_DIR=$BASEDIR/../package/target/lib
@@ -28,32 +26,25 @@ fi
 
 JAR_PATH=$LIB_DIR/*
 
+SPARK_JVM_OPTIONS=$($JAVA_HOME/bin/java -cp $JAR_PATH 
org.apache.gluten.integration.SparkJvmOptions)
+
 EMBEDDED_SPARK_HOME=$BASEDIR/../spark-home
 
+# We temporarily disallow setting these two variables by caller.
+SPARK_HOME=""
+SPARK_SCALA_VERSION=""
 export SPARK_HOME=${SPARK_HOME:-$EMBEDDED_SPARK_HOME}
 export SPARK_SCALA_VERSION=${SPARK_SCALA_VERSION:-'2.12'}
 
 echo "SPARK_HOME set at [$SPARK_HOME]."
 echo "SPARK_SCALA_VERSION set at [$SPARK_SCALA_VERSION]."
 
-$JAVA_HOME/bin/java $GLUTEN_IT_JVM_ARGS \
-    -XX:+IgnoreUnrecognizedVMOptions \
-    --add-opens=java.base/java.lang=ALL-UNNAMED \
-    --add-opens=java.base/java.lang.invoke=ALL-UNNAMED \
-    --add-opens=java.base/java.lang.reflect=ALL-UNNAMED \
-    --add-opens=java.base/java.io=ALL-UNNAMED \
-    --add-opens=java.base/java.net=ALL-UNNAMED \
-    --add-opens=java.base/java.nio=ALL-UNNAMED \
-    --add-opens=java.base/java.util=ALL-UNNAMED \
-    --add-opens=java.base/java.util.concurrent=ALL-UNNAMED \
-    --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED \
-    --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED \
-    --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED \
-    --add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
-    --add-opens=java.base/sun.nio.cs=ALL-UNNAMED \
-    --add-opens=java.base/sun.security.action=ALL-UNNAMED \
-    --add-opens=java.base/sun.util.calendar=ALL-UNNAMED \
-    -Djdk.reflect.useDirectMethodHandle=false \
+GLUTEN_IT_JVM_ARGS=${GLUTEN_IT_JVM_ARGS:-"-Xmx2G"}
+
+$JAVA_HOME/bin/java \
+    $SPARK_JVM_OPTIONS \
+    $GLUTEN_IT_JVM_ARGS \
+    -XX:ErrorFile=/var/log/java/hs_err_pid%p.log \
     -Dio.netty.tryReflectionSetAccessible=true \
     -cp $JAR_PATH \
     org.apache.gluten.integration.Cli $@


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to