This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch branch-1.2
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/branch-1.2 by this push:
     new c940e68540 [VL] Branch 1.2: Backport fixes for #7243 (#7943)
c940e68540 is described below

commit c940e6854099af2396d1b089c0db4a43bb280512
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Nov 20 16:20:43 2024 +0800

    [VL] Branch 1.2: Backport fixes for #7243 (#7943)
    
    Co-authored-by: Zhen Wang <[email protected]>
    Co-authored-by: liuxiang <[email protected]>
---
 .../apache/gluten/test/VeloxBackendTestBase.java   |  3 +-
 cpp/velox/compute/VeloxBackend.cc                  |  4 ++
 cpp/velox/compute/VeloxPlanConverter.cc            | 13 +---
 cpp/velox/compute/WholeStageResultIterator.cc      | 33 +--------
 cpp/velox/memory/VeloxMemoryManager.cc             | 25 +++++++
 cpp/velox/operators/plannodes/RowVectorStream.h    | 84 ++++++++++++++++------
 cpp/velox/substrait/SubstraitToVeloxPlan.cc        |  8 ++-
 cpp/velox/substrait/SubstraitToVeloxPlan.h         |  6 ++
 .../gluten/utils/velox/VeloxTestSettings.scala     |  4 ++
 .../parquet/GlutenParquetFilterSuite.scala         |  6 +-
 .../gluten/utils/velox/VeloxTestSettings.scala     |  4 ++
 .../parquet/GlutenParquetFilterSuite.scala         |  6 +-
 .../gluten/utils/velox/VeloxTestSettings.scala     |  4 ++
 .../parquet/GlutenParquetFilterSuite.scala         |  6 +-
 .../gluten/utils/velox/VeloxTestSettings.scala     |  4 ++
 .../parquet/GlutenParquetFilterSuite.scala         |  6 +-
 .../scala/org/apache/gluten/GlutenConfig.scala     | 16 +++--
 17 files changed, 147 insertions(+), 85 deletions(-)

diff --git 
a/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java 
b/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
index 1d7df23566..e603755747 100644
--- 
a/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
+++ 
b/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
@@ -53,7 +53,8 @@ public abstract class VeloxBackendTestBase {
       @Override
       public SparkConf conf() {
         final SparkConf conf = new SparkConf();
-        conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY(), "0");
+        conf.set(GlutenConfig.COLUMNAR_VELOX_CONNECTOR_IO_THREADS().key(), 
"0");
+        conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY(), "1g");
         return conf;
       }
 
diff --git a/cpp/velox/compute/VeloxBackend.cc 
b/cpp/velox/compute/VeloxBackend.cc
index a3658faa3a..d88ee47789 100644
--- a/cpp/velox/compute/VeloxBackend.cc
+++ b/cpp/velox/compute/VeloxBackend.cc
@@ -232,6 +232,10 @@ void VeloxBackend::initConnector() {
       ioThreads >= 0,
       kVeloxIOThreads + " was set to negative number " + 
std::to_string(ioThreads) + ", this should not happen.");
   if (ioThreads > 0) {
+    LOG(WARNING)
+        << "Velox background IO threads is enabled. Which is highly 
unrecommended as of now, since it may cause"
+        << " some unexpected issues like query crash or hanging. Please turn 
it off if you are unsure about"
+        << " this option.";
     ioExecutor_ = std::make_unique<folly::IOThreadPoolExecutor>(ioThreads);
   }
   
velox::connector::registerConnector(std::make_shared<velox::connector::hive::HiveConnector>(
diff --git a/cpp/velox/compute/VeloxPlanConverter.cc 
b/cpp/velox/compute/VeloxPlanConverter.cc
index 315ff2da67..ed2545c781 100644
--- a/cpp/velox/compute/VeloxPlanConverter.cc
+++ b/cpp/velox/compute/VeloxPlanConverter.cc
@@ -36,18 +36,7 @@ VeloxPlanConverter::VeloxPlanConverter(
     bool validationMode)
     : validationMode_(validationMode),
       substraitVeloxPlanConverter_(veloxPool, confMap, writeFilesTempPath, 
validationMode) {
-  // avoid include RowVectorStream.h in SubstraitToVeloxPlan.cpp, it may cause 
redefinition of array abi.h.
-  auto factory = [inputIters = std::move(inputIters), validationMode = 
validationMode](
-                     std::string nodeId, memory::MemoryPool* pool, int32_t 
streamIdx, RowTypePtr outputType) {
-    std::shared_ptr<ResultIterator> iterator;
-    if (!validationMode) {
-      VELOX_CHECK_LT(streamIdx, inputIters.size(), "Could not find stream 
index {} in input iterator list.", streamIdx);
-      iterator = inputIters[streamIdx];
-    }
-    auto valueStream = std::make_shared<RowVectorStream>(pool, iterator, 
outputType);
-    return std::make_shared<ValueStreamNode>(nodeId, outputType, 
std::move(valueStream));
-  };
-  substraitVeloxPlanConverter_.setValueStreamNodeFactory(std::move(factory));
+  substraitVeloxPlanConverter_.setInputIters(std::move(inputIters));
 }
 
 namespace {
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index aa36315e37..d69a7cf5fb 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -209,51 +209,22 @@ std::shared_ptr<ColumnarBatch> 
WholeStageResultIterator::next() {
   return std::make_shared<VeloxColumnarBatch>(vector);
 }
 
-namespace {
-class SuspendedSection {
- public:
-  SuspendedSection() {
-    reclaimer_->enterArbitration();
-  }
-
-  virtual ~SuspendedSection() {
-    reclaimer_->leaveArbitration();
-  }
-
-  // singleton
-  SuspendedSection(const SuspendedSection&) = delete;
-  SuspendedSection(SuspendedSection&&) = delete;
-  SuspendedSection& operator=(const SuspendedSection&) = delete;
-  SuspendedSection& operator=(SuspendedSection&&) = delete;
-
- private:
-  // We only use suspension APIs in exec::MemoryReclaimer.
-  std::unique_ptr<velox::memory::MemoryReclaimer> 
reclaimer_{velox::exec::MemoryReclaimer::create()};
-};
-} // namespace
-
 int64_t WholeStageResultIterator::spillFixedSize(int64_t size) {
   auto pool = memoryManager_->getAggregateMemoryPool();
   std::string poolName{pool->root()->name() + "/" + pool->name()};
   std::string logPrefix{"Spill[" + poolName + "]: "};
   int64_t shrunken = memoryManager_->shrink(size);
-  // todo return the actual spilled size?
   if (spillStrategy_ == "auto") {
     int64_t remaining = size - shrunken;
-    LOG(INFO) << logPrefix << "Trying to request spilling for " << remaining 
<< " bytes...";
-    // suspend the driver when we are on it
-    SuspendedSection suspender;
-    velox::exec::MemoryReclaimer::Stats status;
+    LOG(INFO) << logPrefix << "Trying to request spill for " << remaining << " 
bytes...";
     auto* mm = memoryManager_->getMemoryManager();
     uint64_t spilledOut = mm->arbitrator()->shrinkCapacity({pool}, remaining); 
// this conducts spilling
     LOG(INFO) << logPrefix << "Successfully spilled out " << spilledOut << " 
bytes.";
     uint64_t total = shrunken + spilledOut;
     VLOG(2) << logPrefix << "Successfully reclaimed total " << total << " 
bytes.";
     return total;
-  } else {
-    LOG(WARNING) << "Spill-to-disk was disabled since " << kSpillStrategy << " 
was not configured.";
   }
-
+  LOG(WARNING) << "Spill-to-disk was disabled since " << kSpillStrategy << " 
was not configured.";
   VLOG(2) << logPrefix << "Successfully reclaimed total " << shrunken << " 
bytes.";
   return shrunken;
 }
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc 
b/cpp/velox/memory/VeloxMemoryManager.cc
index 442090004a..1fcd8e0296 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -35,6 +35,30 @@ namespace gluten {
 
 using namespace facebook;
 
+namespace {
+
+// Added specifically for Gluten 1.2 since 
https://github.com/facebookincubator/velox/pull/10705 is not included.
+class SuspendedSection {
+ public:
+  SuspendedSection(velox::memory::MemoryPool* pool) : pool_(pool) {
+    pool_->enterArbitration();
+  }
+
+  virtual ~SuspendedSection() {
+    pool_->leaveArbitration();
+  }
+
+  // singleton
+  SuspendedSection(const SuspendedSection&) = delete;
+  SuspendedSection(SuspendedSection&&) = delete;
+  SuspendedSection& operator=(const SuspendedSection&) = delete;
+  SuspendedSection& operator=(SuspendedSection&&) = delete;
+
+ private:
+  velox::memory::MemoryPool* pool_;
+};
+} // namespace
+
 /// We assume in a single Spark task. No thread-safety should be guaranteed.
 class ListenableArbitrator : public velox::memory::MemoryArbitrator {
  public:
@@ -64,6 +88,7 @@ class ListenableArbitrator : public 
velox::memory::MemoryArbitrator {
       const std::vector<std::shared_ptr<velox::memory::MemoryPool>>& 
candidatePools,
       uint64_t targetBytes) override {
     velox::memory::ScopedMemoryArbitrationContext ctx(pool);
+    SuspendedSection ss(pool);
     VELOX_CHECK_EQ(candidatePools.size(), 1, "ListenableArbitrator should only 
be used within a single root pool")
     auto candidate = candidatePools.back();
     VELOX_CHECK(pool->root() == candidate.get(), "Illegal state in 
ListenableArbitrator");
diff --git a/cpp/velox/operators/plannodes/RowVectorStream.h 
b/cpp/velox/operators/plannodes/RowVectorStream.h
index e02b288c46..e5a469afee 100644
--- a/cpp/velox/operators/plannodes/RowVectorStream.h
+++ b/cpp/velox/operators/plannodes/RowVectorStream.h
@@ -26,18 +26,48 @@ namespace gluten {
 class RowVectorStream {
  public:
   explicit RowVectorStream(
+      facebook::velox::exec::DriverCtx* driverCtx,
       facebook::velox::memory::MemoryPool* pool,
-      std::shared_ptr<ResultIterator> iterator,
+      ResultIterator* iterator,
       const facebook::velox::RowTypePtr& outputType)
-      : iterator_(std::move(iterator)), outputType_(outputType), pool_(pool) {}
+      : driverCtx_(driverCtx), pool_(pool), outputType_(outputType), 
iterator_(iterator) {}
 
   bool hasNext() {
-    return iterator_->hasNext();
+    if (finished_) {
+      return false;
+    }
+    bool hasNext;
+    {
+      // We are leaving Velox task execution and are probably entering Spark 
code through JNI. Suspend the current
+      // driver to make the current task open to spilling.
+      //
+      // When a task is getting spilled, it should have been suspended so has 
zero running threads, otherwise there's
+      // possibility that this spill call hangs. See 
https://github.com/apache/incubator-gluten/issues/7243.
+      // As of now, non-zero running threads usually happens when:
+      // 1. Task A spills task B;
+      // 2. Task A trys to grow buffers created by task B, during which spill 
is requested on task A again.
+      facebook::velox::exec::SuspendedSection ss(driverCtx_->driver);
+      hasNext = iterator_->hasNext();
+    }
+    if (!hasNext) {
+      finished_ = true;
+    }
+    return hasNext;
   }
 
   // Convert arrow batch to rowvector and use new output columns
   facebook::velox::RowVectorPtr next() {
-    const std::shared_ptr<VeloxColumnarBatch>& vb = 
VeloxColumnarBatch::from(pool_, iterator_->next());
+    if (finished_) {
+      return nullptr;
+    }
+    std::shared_ptr<ColumnarBatch> cb;
+    {
+      // We are leaving Velox task execution and are probably entering Spark 
code through JNI. Suspend the current
+      // driver to make the current task open to spilling.
+      facebook::velox::exec::SuspendedSection ss(driverCtx_->driver);
+      cb = iterator_->next();
+    }
+    const std::shared_ptr<VeloxColumnarBatch>& vb = 
VeloxColumnarBatch::from(pool_, cb);
     auto vp = vb->getRowVector();
     VELOX_DCHECK(vp != nullptr);
     return std::make_shared<facebook::velox::RowVector>(
@@ -45,31 +75,32 @@ class RowVectorStream {
   }
 
  private:
-  std::shared_ptr<ResultIterator> iterator_;
-  const facebook::velox::RowTypePtr outputType_;
+  facebook::velox::exec::DriverCtx* driverCtx_;
   facebook::velox::memory::MemoryPool* pool_;
+  const facebook::velox::RowTypePtr outputType_;
+  ResultIterator* iterator_;
+
+  bool finished_{false};
 };
 
-class ValueStreamNode : public facebook::velox::core::PlanNode {
+class ValueStreamNode final : public facebook::velox::core::PlanNode {
  public:
   ValueStreamNode(
       const facebook::velox::core::PlanNodeId& id,
       const facebook::velox::RowTypePtr& outputType,
-      std::shared_ptr<RowVectorStream> valueStream)
-      : facebook::velox::core::PlanNode(id), outputType_(outputType), 
valueStream_(std::move(valueStream)) {
-    VELOX_CHECK_NOT_NULL(valueStream_);
-  }
+      std::shared_ptr<ResultIterator> iterator)
+      : facebook::velox::core::PlanNode(id), outputType_(outputType), 
iterator_(std::move(iterator)) {}
 
   const facebook::velox::RowTypePtr& outputType() const override {
     return outputType_;
   }
 
   const std::vector<facebook::velox::core::PlanNodePtr>& sources() const 
override {
-    return kEmptySources;
+    return kEmptySources_;
   };
 
-  const std::shared_ptr<RowVectorStream>& rowVectorStream() const {
-    return valueStream_;
+  ResultIterator* iterator() const {
+    return iterator_.get();
   }
 
   std::string_view name() const override {
@@ -84,8 +115,8 @@ class ValueStreamNode : public 
facebook::velox::core::PlanNode {
   void addDetails(std::stringstream& stream) const override{};
 
   const facebook::velox::RowTypePtr outputType_;
-  std::shared_ptr<RowVectorStream> valueStream_;
-  const std::vector<facebook::velox::core::PlanNodePtr> kEmptySources;
+  std::shared_ptr<ResultIterator> iterator_;
+  const std::vector<facebook::velox::core::PlanNodePtr> kEmptySources_;
 };
 
 class ValueStream : public facebook::velox::exec::SourceOperator {
@@ -99,13 +130,18 @@ class ValueStream : public 
facebook::velox::exec::SourceOperator {
             valueStreamNode->outputType(),
             operatorId,
             valueStreamNode->id(),
-            "ValueStream") {
-    valueStream_ = valueStreamNode->rowVectorStream();
+            valueStreamNode->name().data()) {
+    ResultIterator* itr = valueStreamNode->iterator();
+    VELOX_CHECK_NOT_NULL(itr);
+    rvStream_ = std::make_unique<RowVectorStream>(driverCtx, pool(), itr, 
outputType_);
   }
 
   facebook::velox::RowVectorPtr getOutput() override {
-    if (valueStream_->hasNext()) {
-      return valueStream_->next();
+    if (finished_) {
+      return nullptr;
+    }
+    if (rvStream_->hasNext()) {
+      return rvStream_->next();
     } else {
       finished_ = true;
       return nullptr;
@@ -122,12 +158,14 @@ class ValueStream : public 
facebook::velox::exec::SourceOperator {
 
  private:
   bool finished_ = false;
-  std::shared_ptr<RowVectorStream> valueStream_;
+  std::unique_ptr<RowVectorStream> rvStream_;
 };
 
 class RowVectorStreamOperatorTranslator : public 
facebook::velox::exec::Operator::PlanNodeTranslator {
-  std::unique_ptr<facebook::velox::exec::Operator>
-  toOperator(facebook::velox::exec::DriverCtx* ctx, int32_t id, const 
facebook::velox::core::PlanNodePtr& node) {
+  std::unique_ptr<facebook::velox::exec::Operator> toOperator(
+      facebook::velox::exec::DriverCtx* ctx,
+      int32_t id,
+      const facebook::velox::core::PlanNodePtr& node) override {
     if (auto valueStreamNode = std::dynamic_pointer_cast<const 
ValueStreamNode>(node)) {
       return std::make_unique<ValueStream>(id, ctx, valueStreamNode);
     }
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 34710c35a4..404e99994f 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -18,6 +18,7 @@
 #include "SubstraitToVeloxPlan.h"
 #include "TypeUtils.h"
 #include "VariantToVectorConverter.h"
+#include "operators/plannodes/RowVectorStream.h"
 #include "velox/connectors/hive/HiveDataSink.h"
 #include "velox/exec/TableWriter.h"
 #include "velox/type/Filter.h"
@@ -1107,7 +1108,12 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::constructValueStreamNode(
   }
 
   auto outputType = ROW(std::move(outNames), std::move(veloxTypeList));
-  auto node = valueStreamNodeFactory_(nextPlanNodeId(), pool_, streamIdx, 
outputType);
+  std::shared_ptr<ResultIterator> iterator;
+  if (!validationMode_) {
+    VELOX_CHECK_LT(streamIdx, inputIters_.size(), "Could not find stream index 
{} in input iterator list.", streamIdx);
+    iterator = inputIters_[streamIdx];
+  }
+  auto node = std::make_shared<ValueStreamNode>(nextPlanNodeId(), outputType, 
std::move(iterator));
 
   auto splitInfo = std::make_shared<SplitInfo>();
   splitInfo->isStream = true;
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h 
b/cpp/velox/substrait/SubstraitToVeloxPlan.h
index 1f2f39f51a..0e892469d0 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h
@@ -165,6 +165,10 @@ class SubstraitToVeloxPlanConverter {
     valueStreamNodeFactory_ = std::move(factory);
   }
 
+  void setInputIters(std::vector<std::shared_ptr<ResultIterator>> inputIters) {
+    inputIters_ = std::move(inputIters);
+  }
+
   /// Used to check if ReadRel specifies an input of stream.
   /// If yes, the index of input stream will be returned.
   /// If not, -1 will be returned.
@@ -591,6 +595,8 @@ class SubstraitToVeloxPlanConverter {
 
   std::function<core::PlanNodePtr(std::string, memory::MemoryPool*, int32_t, 
RowTypePtr)> valueStreamNodeFactory_;
 
+  std::vector<std::shared_ptr<ResultIterator>> inputIters_;
+
   /// The map storing the pre-built plan nodes which can be accessed through
   /// index. This map is only used when the computation of a Substrait plan
   /// depends on other input nodes.
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index a17f72de31..59abfb37e6 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -866,6 +866,8 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("SPARK-17091: Convert IN predicate to Parquet filter push-down")
     .exclude("Support Parquet column index")
     .exclude("SPARK-34562: Bloom filter push down")
+    // https://github.com/apache/incubator-gluten/issues/7174
+    .excludeGlutenTest("Filter applied on merged Parquet schema with new 
column should work")
   enableSuite[GlutenParquetV2FilterSuite]
     // Rewrite.
     .exclude("Filter applied on merged Parquet schema with new column should 
work")
@@ -882,6 +884,8 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("SPARK-17091: Convert IN predicate to Parquet filter push-down")
     .exclude("Support Parquet column index")
     .exclude("SPARK-34562: Bloom filter push down")
+    // https://github.com/apache/incubator-gluten/issues/7174
+    .excludeGlutenTest("Filter applied on merged Parquet schema with new 
column should work")
   enableSuite[GlutenParquetInteroperabilitySuite]
     .exclude("parquet timestamp conversion")
   enableSuite[GlutenParquetIOSuite]
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
index 4d190f9d59..51204b0777 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
@@ -48,7 +48,7 @@ import java.time.LocalDate
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.TypeTag
 
-abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
+abstract class GlutenParquetFilterSuite extends ParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
   protected def checkFilterPredicate(
       predicate: Predicate,
       filterClass: Class[_ <: FilterPredicate],
@@ -357,7 +357,7 @@ abstract class GltuenParquetFilterSuite extends 
ParquetFilterSuite with GlutenSQ
 }
 
 @ExtendedSQLTest
-class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
+class GlutenParquetV1FilterSuite extends GlutenParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
   // TODO: enable Parquet V2 write path after file source V2 writers are 
workable.
   override def sparkConf: SparkConf =
     super.sparkConf
@@ -445,7 +445,7 @@ class GlutenParquetV1FilterSuite extends 
GltuenParquetFilterSuite with GlutenSQL
 }
 
 @ExtendedSQLTest
-class GlutenParquetV2FilterSuite extends GltuenParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
+class GlutenParquetV2FilterSuite extends GlutenParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
   // TODO: enable Parquet V2 write path after file source V2 writers are 
workable.
   override def sparkConf: SparkConf =
     super.sparkConf
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index ae3e7c7b8e..055b4d5cb2 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -677,6 +677,8 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("Support Parquet column index")
     .exclude("SPARK-34562: Bloom filter push down")
     .exclude("SPARK-16371 Do not push down filters when inner name and outer 
name are the same")
+    // https://github.com/apache/incubator-gluten/issues/7174
+    .excludeGlutenTest("Filter applied on merged Parquet schema with new 
column should work")
   enableSuite[GlutenParquetV2FilterSuite]
     // Rewrite.
     .exclude("Filter applied on merged Parquet schema with new column should 
work")
@@ -695,6 +697,8 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("Support Parquet column index")
     .exclude("SPARK-34562: Bloom filter push down")
     .exclude("SPARK-16371 Do not push down filters when inner name and outer 
name are the same")
+    // https://github.com/apache/incubator-gluten/issues/7174
+    .excludeGlutenTest("Filter applied on merged Parquet schema with new 
column should work")
   enableSuite[GlutenParquetInteroperabilitySuite]
     .exclude("parquet timestamp conversion")
   enableSuite[GlutenParquetIOSuite]
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
index 20f5f8aed6..a1163f9525 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
@@ -48,7 +48,7 @@ import java.time.LocalDate
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.TypeTag
 
-abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
+abstract class GlutenParquetFilterSuite extends ParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
   protected def checkFilterPredicate(
       predicate: Predicate,
       filterClass: Class[_ <: FilterPredicate],
@@ -328,7 +328,7 @@ abstract class GltuenParquetFilterSuite extends 
ParquetFilterSuite with GlutenSQ
 }
 
 @ExtendedSQLTest
-class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
+class GlutenParquetV1FilterSuite extends GlutenParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
   // TODO: enable Parquet V2 write path after file source V2 writers are 
workable.
   override def sparkConf: SparkConf =
     super.sparkConf
@@ -416,7 +416,7 @@ class GlutenParquetV1FilterSuite extends 
GltuenParquetFilterSuite with GlutenSQL
 }
 
 @ExtendedSQLTest
-class GlutenParquetV2FilterSuite extends GltuenParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
+class GlutenParquetV2FilterSuite extends GlutenParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
   // TODO: enable Parquet V2 write path after file source V2 writers are 
workable.
   override def sparkConf: SparkConf =
     super.sparkConf
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 0da19922ff..5918fd5344 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -658,6 +658,8 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("SPARK-34562: Bloom filter push down")
     .exclude("SPARK-16371 Do not push down filters when inner name and outer 
name are the same")
     .exclude("filter pushdown - StringPredicate")
+    // https://github.com/apache/incubator-gluten/issues/7174
+    .excludeGlutenTest("Filter applied on merged Parquet schema with new 
column should work")
   enableSuite[GlutenParquetV2FilterSuite]
     // Rewrite.
     .exclude("Filter applied on merged Parquet schema with new column should 
work")
@@ -676,6 +678,8 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("SPARK-34562: Bloom filter push down")
     .exclude("SPARK-16371 Do not push down filters when inner name and outer 
name are the same")
     .exclude("filter pushdown - StringPredicate")
+    // https://github.com/apache/incubator-gluten/issues/7174
+    .excludeGlutenTest("Filter applied on merged Parquet schema with new 
column should work")
   enableSuite[GlutenParquetInteroperabilitySuite]
     .exclude("parquet timestamp conversion")
   enableSuite[GlutenParquetIOSuite]
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
index 471d88f362..1f06c4ec54 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
@@ -48,7 +48,7 @@ import java.time.LocalDate
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.TypeTag
 
-abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
+abstract class GlutenParquetFilterSuite extends ParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
   protected def checkFilterPredicate(
       predicate: Predicate,
       filterClass: Class[_ <: FilterPredicate],
@@ -329,7 +329,7 @@ abstract class GltuenParquetFilterSuite extends 
ParquetFilterSuite with GlutenSQ
 }
 
 @ExtendedSQLTest
-class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
+class GlutenParquetV1FilterSuite extends GlutenParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
   // TODO: enable Parquet V2 write path after file source V2 writers are 
workable.
   override def sparkConf: SparkConf =
     super.sparkConf
@@ -417,7 +417,7 @@ class GlutenParquetV1FilterSuite extends 
GltuenParquetFilterSuite with GlutenSQL
 }
 
 @ExtendedSQLTest
-class GlutenParquetV2FilterSuite extends GltuenParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
+class GlutenParquetV2FilterSuite extends GlutenParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
   // TODO: enable Parquet V2 write path after file source V2 writers are 
workable.
   override def sparkConf: SparkConf =
     super.sparkConf
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index e54aca34ec..ed2b20ffe4 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -663,6 +663,8 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("SPARK-34562: Bloom filter push down")
     .exclude("SPARK-16371 Do not push down filters when inner name and outer 
name are the same")
     .exclude("filter pushdown - StringPredicate")
+    // https://github.com/apache/incubator-gluten/issues/7174
+    .excludeGlutenTest("Filter applied on merged Parquet schema with new 
column should work")
   enableSuite[GlutenParquetV2FilterSuite]
     // Rewrite.
     .exclude("Filter applied on merged Parquet schema with new column should 
work")
@@ -681,6 +683,8 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("SPARK-34562: Bloom filter push down")
     .exclude("SPARK-16371 Do not push down filters when inner name and outer 
name are the same")
     .exclude("filter pushdown - StringPredicate")
+    // https://github.com/apache/incubator-gluten/issues/7174
+    .excludeGlutenTest("Filter applied on merged Parquet schema with new 
column should work")
   enableSuite[GlutenParquetInteroperabilitySuite]
     .exclude("parquet timestamp conversion")
   enableSuite[GlutenParquetIOSuite]
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
index 4141acee31..063b424e0d 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
@@ -47,7 +47,7 @@ import java.time.LocalDate
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.TypeTag
 
-abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
+abstract class GlutenParquetFilterSuite extends ParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
   protected def checkFilterPredicate(
       predicate: Predicate,
       filterClass: Class[_ <: FilterPredicate],
@@ -328,7 +328,7 @@ abstract class GltuenParquetFilterSuite extends 
ParquetFilterSuite with GlutenSQ
 }
 
 @ExtendedSQLTest
-class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
+class GlutenParquetV1FilterSuite extends GlutenParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
   // TODO: enable Parquet V2 write path after file source V2 writers are 
workable.
   override def sparkConf: SparkConf =
     super.sparkConf
@@ -416,7 +416,7 @@ class GlutenParquetV1FilterSuite extends 
GltuenParquetFilterSuite with GlutenSQL
 }
 
 @ExtendedSQLTest
-class GlutenParquetV2FilterSuite extends GltuenParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
+class GlutenParquetV2FilterSuite extends GlutenParquetFilterSuite with 
GlutenSQLTestsBaseTrait {
   // TODO: enable Parquet V2 write path after file source V2 writers are 
workable.
   override def sparkConf: SparkConf =
     super.sparkConf
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index fd1a4907cd..67bd4716df 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -274,7 +274,7 @@ class GlutenConfig(conf: SQLConf) extends Logging {
   def veloxSsdODirectEnabled: Boolean = 
conf.getConf(COLUMNAR_VELOX_SSD_ODIRECT_ENABLED)
 
   def veloxConnectorIOThreads: Int = {
-    
conf.getConf(COLUMNAR_VELOX_CONNECTOR_IO_THREADS).getOrElse(numTaskSlotsPerExecutor)
+    conf.getConf(COLUMNAR_VELOX_CONNECTOR_IO_THREADS)
   }
 
   def veloxSplitPreloadPerDriver: Integer = 
conf.getConf(COLUMNAR_VELOX_SPLIT_PRELOAD_PER_DRIVER)
@@ -710,7 +710,7 @@ object GlutenConfig {
       (AWS_S3_RETRY_MODE.key, AWS_S3_RETRY_MODE.defaultValueString),
       (
         COLUMNAR_VELOX_CONNECTOR_IO_THREADS.key,
-        conf.getOrElse(GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, "-1")),
+        COLUMNAR_VELOX_CONNECTOR_IO_THREADS.defaultValueString),
       (COLUMNAR_SHUFFLE_CODEC.key, ""),
       (COLUMNAR_SHUFFLE_CODEC_BACKEND.key, ""),
       ("spark.hadoop.input.connect.timeout", "180000"),
@@ -1331,13 +1331,19 @@ object GlutenConfig {
       .booleanConf
       .createWithDefault(false)
 
+  // FIXME: May cause issues when toggled on. Examples:
+  //  https://github.com/apache/incubator-gluten/issues/7161
+  //  https://github.com/facebookincubator/velox/issues/10173
   val COLUMNAR_VELOX_CONNECTOR_IO_THREADS =
     buildStaticConf("spark.gluten.sql.columnar.backend.velox.IOThreads")
       .internal()
-      .doc("The Size of the IO thread pool in the Connector. This thread pool 
is used for split" +
-        " preloading and DirectBufferedInput.")
+      .doc(
+        "Experimental: The Size of the IO thread pool in the Connector." +
+          " This thread pool is used for split preloading and 
DirectBufferedInput." +
+          " The option is experimental. Toggling on it (setting a non-zero 
value) may cause some" +
+          " unexpected issues when application reaches some certain 
conditions.")
       .intConf
-      .createOptional
+      .createWithDefault(0)
 
   val COLUMNAR_VELOX_ASYNC_TIMEOUT =
     
buildStaticConf("spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to