This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch branch-1.2
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/branch-1.2 by this push:
new c940e68540 [VL] Branch 1.2: Backport fixes for #7243 (#7943)
c940e68540 is described below
commit c940e6854099af2396d1b089c0db4a43bb280512
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Nov 20 16:20:43 2024 +0800
[VL] Branch 1.2: Backport fixes for #7243 (#7943)
Co-authored-by: Zhen Wang <[email protected]>
Co-authored-by: liuxiang <[email protected]>
---
.../apache/gluten/test/VeloxBackendTestBase.java | 3 +-
cpp/velox/compute/VeloxBackend.cc | 4 ++
cpp/velox/compute/VeloxPlanConverter.cc | 13 +---
cpp/velox/compute/WholeStageResultIterator.cc | 33 +--------
cpp/velox/memory/VeloxMemoryManager.cc | 25 +++++++
cpp/velox/operators/plannodes/RowVectorStream.h | 84 ++++++++++++++++------
cpp/velox/substrait/SubstraitToVeloxPlan.cc | 8 ++-
cpp/velox/substrait/SubstraitToVeloxPlan.h | 6 ++
.../gluten/utils/velox/VeloxTestSettings.scala | 4 ++
.../parquet/GlutenParquetFilterSuite.scala | 6 +-
.../gluten/utils/velox/VeloxTestSettings.scala | 4 ++
.../parquet/GlutenParquetFilterSuite.scala | 6 +-
.../gluten/utils/velox/VeloxTestSettings.scala | 4 ++
.../parquet/GlutenParquetFilterSuite.scala | 6 +-
.../gluten/utils/velox/VeloxTestSettings.scala | 4 ++
.../parquet/GlutenParquetFilterSuite.scala | 6 +-
.../scala/org/apache/gluten/GlutenConfig.scala | 16 +++--
17 files changed, 147 insertions(+), 85 deletions(-)
diff --git
a/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
b/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
index 1d7df23566..e603755747 100644
---
a/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
+++
b/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
@@ -53,7 +53,8 @@ public abstract class VeloxBackendTestBase {
@Override
public SparkConf conf() {
final SparkConf conf = new SparkConf();
- conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY(), "0");
+ conf.set(GlutenConfig.COLUMNAR_VELOX_CONNECTOR_IO_THREADS().key(),
"0");
+ conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY(), "1g");
return conf;
}
diff --git a/cpp/velox/compute/VeloxBackend.cc
b/cpp/velox/compute/VeloxBackend.cc
index a3658faa3a..d88ee47789 100644
--- a/cpp/velox/compute/VeloxBackend.cc
+++ b/cpp/velox/compute/VeloxBackend.cc
@@ -232,6 +232,10 @@ void VeloxBackend::initConnector() {
ioThreads >= 0,
kVeloxIOThreads + " was set to negative number " +
std::to_string(ioThreads) + ", this should not happen.");
if (ioThreads > 0) {
+ LOG(WARNING)
+ << "Velox background IO threads is enabled. Which is highly
unrecommended as of now, since it may cause"
+ << " some unexpected issues like query crash or hanging. Please turn
it off if you are unsure about"
+ << " this option.";
ioExecutor_ = std::make_unique<folly::IOThreadPoolExecutor>(ioThreads);
}
velox::connector::registerConnector(std::make_shared<velox::connector::hive::HiveConnector>(
diff --git a/cpp/velox/compute/VeloxPlanConverter.cc
b/cpp/velox/compute/VeloxPlanConverter.cc
index 315ff2da67..ed2545c781 100644
--- a/cpp/velox/compute/VeloxPlanConverter.cc
+++ b/cpp/velox/compute/VeloxPlanConverter.cc
@@ -36,18 +36,7 @@ VeloxPlanConverter::VeloxPlanConverter(
bool validationMode)
: validationMode_(validationMode),
substraitVeloxPlanConverter_(veloxPool, confMap, writeFilesTempPath,
validationMode) {
- // avoid include RowVectorStream.h in SubstraitToVeloxPlan.cpp, it may cause
redefinition of array abi.h.
- auto factory = [inputIters = std::move(inputIters), validationMode =
validationMode](
- std::string nodeId, memory::MemoryPool* pool, int32_t
streamIdx, RowTypePtr outputType) {
- std::shared_ptr<ResultIterator> iterator;
- if (!validationMode) {
- VELOX_CHECK_LT(streamIdx, inputIters.size(), "Could not find stream
index {} in input iterator list.", streamIdx);
- iterator = inputIters[streamIdx];
- }
- auto valueStream = std::make_shared<RowVectorStream>(pool, iterator,
outputType);
- return std::make_shared<ValueStreamNode>(nodeId, outputType,
std::move(valueStream));
- };
- substraitVeloxPlanConverter_.setValueStreamNodeFactory(std::move(factory));
+ substraitVeloxPlanConverter_.setInputIters(std::move(inputIters));
}
namespace {
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index aa36315e37..d69a7cf5fb 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -209,51 +209,22 @@ std::shared_ptr<ColumnarBatch>
WholeStageResultIterator::next() {
return std::make_shared<VeloxColumnarBatch>(vector);
}
-namespace {
-class SuspendedSection {
- public:
- SuspendedSection() {
- reclaimer_->enterArbitration();
- }
-
- virtual ~SuspendedSection() {
- reclaimer_->leaveArbitration();
- }
-
- // singleton
- SuspendedSection(const SuspendedSection&) = delete;
- SuspendedSection(SuspendedSection&&) = delete;
- SuspendedSection& operator=(const SuspendedSection&) = delete;
- SuspendedSection& operator=(SuspendedSection&&) = delete;
-
- private:
- // We only use suspension APIs in exec::MemoryReclaimer.
- std::unique_ptr<velox::memory::MemoryReclaimer>
reclaimer_{velox::exec::MemoryReclaimer::create()};
-};
-} // namespace
-
int64_t WholeStageResultIterator::spillFixedSize(int64_t size) {
auto pool = memoryManager_->getAggregateMemoryPool();
std::string poolName{pool->root()->name() + "/" + pool->name()};
std::string logPrefix{"Spill[" + poolName + "]: "};
int64_t shrunken = memoryManager_->shrink(size);
- // todo return the actual spilled size?
if (spillStrategy_ == "auto") {
int64_t remaining = size - shrunken;
- LOG(INFO) << logPrefix << "Trying to request spilling for " << remaining
<< " bytes...";
- // suspend the driver when we are on it
- SuspendedSection suspender;
- velox::exec::MemoryReclaimer::Stats status;
+ LOG(INFO) << logPrefix << "Trying to request spill for " << remaining << "
bytes...";
auto* mm = memoryManager_->getMemoryManager();
uint64_t spilledOut = mm->arbitrator()->shrinkCapacity({pool}, remaining);
// this conducts spilling
LOG(INFO) << logPrefix << "Successfully spilled out " << spilledOut << "
bytes.";
uint64_t total = shrunken + spilledOut;
VLOG(2) << logPrefix << "Successfully reclaimed total " << total << "
bytes.";
return total;
- } else {
- LOG(WARNING) << "Spill-to-disk was disabled since " << kSpillStrategy << "
was not configured.";
}
-
+ LOG(WARNING) << "Spill-to-disk was disabled since " << kSpillStrategy << "
was not configured.";
VLOG(2) << logPrefix << "Successfully reclaimed total " << shrunken << "
bytes.";
return shrunken;
}
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc
b/cpp/velox/memory/VeloxMemoryManager.cc
index 442090004a..1fcd8e0296 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -35,6 +35,30 @@ namespace gluten {
using namespace facebook;
+namespace {
+
+// Added specifically for Gluten 1.2 since
https://github.com/facebookincubator/velox/pull/10705 is not included.
+class SuspendedSection {
+ public:
+ SuspendedSection(velox::memory::MemoryPool* pool) : pool_(pool) {
+ pool_->enterArbitration();
+ }
+
+ virtual ~SuspendedSection() {
+ pool_->leaveArbitration();
+ }
+
+ // singleton
+ SuspendedSection(const SuspendedSection&) = delete;
+ SuspendedSection(SuspendedSection&&) = delete;
+ SuspendedSection& operator=(const SuspendedSection&) = delete;
+ SuspendedSection& operator=(SuspendedSection&&) = delete;
+
+ private:
+ velox::memory::MemoryPool* pool_;
+};
+} // namespace
+
/// We assume in a single Spark task. No thread-safety should be guaranteed.
class ListenableArbitrator : public velox::memory::MemoryArbitrator {
public:
@@ -64,6 +88,7 @@ class ListenableArbitrator : public
velox::memory::MemoryArbitrator {
const std::vector<std::shared_ptr<velox::memory::MemoryPool>>&
candidatePools,
uint64_t targetBytes) override {
velox::memory::ScopedMemoryArbitrationContext ctx(pool);
+ SuspendedSection ss(pool);
VELOX_CHECK_EQ(candidatePools.size(), 1, "ListenableArbitrator should only
be used within a single root pool")
auto candidate = candidatePools.back();
VELOX_CHECK(pool->root() == candidate.get(), "Illegal state in
ListenableArbitrator");
diff --git a/cpp/velox/operators/plannodes/RowVectorStream.h
b/cpp/velox/operators/plannodes/RowVectorStream.h
index e02b288c46..e5a469afee 100644
--- a/cpp/velox/operators/plannodes/RowVectorStream.h
+++ b/cpp/velox/operators/plannodes/RowVectorStream.h
@@ -26,18 +26,48 @@ namespace gluten {
class RowVectorStream {
public:
explicit RowVectorStream(
+ facebook::velox::exec::DriverCtx* driverCtx,
facebook::velox::memory::MemoryPool* pool,
- std::shared_ptr<ResultIterator> iterator,
+ ResultIterator* iterator,
const facebook::velox::RowTypePtr& outputType)
- : iterator_(std::move(iterator)), outputType_(outputType), pool_(pool) {}
+ : driverCtx_(driverCtx), pool_(pool), outputType_(outputType),
iterator_(iterator) {}
bool hasNext() {
- return iterator_->hasNext();
+ if (finished_) {
+ return false;
+ }
+ bool hasNext;
+ {
+ // We are leaving Velox task execution and are probably entering Spark
code through JNI. Suspend the current
+ // driver to make the current task open to spilling.
+ //
+ // When a task is getting spilled, it should have been suspended so has
zero running threads, otherwise there's
+ // possibility that this spill call hangs. See
https://github.com/apache/incubator-gluten/issues/7243.
+ // As of now, non-zero running threads usually happens when:
+ // 1. Task A spills task B;
+ // 2. Task A trys to grow buffers created by task B, during which spill
is requested on task A again.
+ facebook::velox::exec::SuspendedSection ss(driverCtx_->driver);
+ hasNext = iterator_->hasNext();
+ }
+ if (!hasNext) {
+ finished_ = true;
+ }
+ return hasNext;
}
// Convert arrow batch to rowvector and use new output columns
facebook::velox::RowVectorPtr next() {
- const std::shared_ptr<VeloxColumnarBatch>& vb =
VeloxColumnarBatch::from(pool_, iterator_->next());
+ if (finished_) {
+ return nullptr;
+ }
+ std::shared_ptr<ColumnarBatch> cb;
+ {
+ // We are leaving Velox task execution and are probably entering Spark
code through JNI. Suspend the current
+ // driver to make the current task open to spilling.
+ facebook::velox::exec::SuspendedSection ss(driverCtx_->driver);
+ cb = iterator_->next();
+ }
+ const std::shared_ptr<VeloxColumnarBatch>& vb =
VeloxColumnarBatch::from(pool_, cb);
auto vp = vb->getRowVector();
VELOX_DCHECK(vp != nullptr);
return std::make_shared<facebook::velox::RowVector>(
@@ -45,31 +75,32 @@ class RowVectorStream {
}
private:
- std::shared_ptr<ResultIterator> iterator_;
- const facebook::velox::RowTypePtr outputType_;
+ facebook::velox::exec::DriverCtx* driverCtx_;
facebook::velox::memory::MemoryPool* pool_;
+ const facebook::velox::RowTypePtr outputType_;
+ ResultIterator* iterator_;
+
+ bool finished_{false};
};
-class ValueStreamNode : public facebook::velox::core::PlanNode {
+class ValueStreamNode final : public facebook::velox::core::PlanNode {
public:
ValueStreamNode(
const facebook::velox::core::PlanNodeId& id,
const facebook::velox::RowTypePtr& outputType,
- std::shared_ptr<RowVectorStream> valueStream)
- : facebook::velox::core::PlanNode(id), outputType_(outputType),
valueStream_(std::move(valueStream)) {
- VELOX_CHECK_NOT_NULL(valueStream_);
- }
+ std::shared_ptr<ResultIterator> iterator)
+ : facebook::velox::core::PlanNode(id), outputType_(outputType),
iterator_(std::move(iterator)) {}
const facebook::velox::RowTypePtr& outputType() const override {
return outputType_;
}
const std::vector<facebook::velox::core::PlanNodePtr>& sources() const
override {
- return kEmptySources;
+ return kEmptySources_;
};
- const std::shared_ptr<RowVectorStream>& rowVectorStream() const {
- return valueStream_;
+ ResultIterator* iterator() const {
+ return iterator_.get();
}
std::string_view name() const override {
@@ -84,8 +115,8 @@ class ValueStreamNode : public
facebook::velox::core::PlanNode {
void addDetails(std::stringstream& stream) const override{};
const facebook::velox::RowTypePtr outputType_;
- std::shared_ptr<RowVectorStream> valueStream_;
- const std::vector<facebook::velox::core::PlanNodePtr> kEmptySources;
+ std::shared_ptr<ResultIterator> iterator_;
+ const std::vector<facebook::velox::core::PlanNodePtr> kEmptySources_;
};
class ValueStream : public facebook::velox::exec::SourceOperator {
@@ -99,13 +130,18 @@ class ValueStream : public
facebook::velox::exec::SourceOperator {
valueStreamNode->outputType(),
operatorId,
valueStreamNode->id(),
- "ValueStream") {
- valueStream_ = valueStreamNode->rowVectorStream();
+ valueStreamNode->name().data()) {
+ ResultIterator* itr = valueStreamNode->iterator();
+ VELOX_CHECK_NOT_NULL(itr);
+ rvStream_ = std::make_unique<RowVectorStream>(driverCtx, pool(), itr,
outputType_);
}
facebook::velox::RowVectorPtr getOutput() override {
- if (valueStream_->hasNext()) {
- return valueStream_->next();
+ if (finished_) {
+ return nullptr;
+ }
+ if (rvStream_->hasNext()) {
+ return rvStream_->next();
} else {
finished_ = true;
return nullptr;
@@ -122,12 +158,14 @@ class ValueStream : public
facebook::velox::exec::SourceOperator {
private:
bool finished_ = false;
- std::shared_ptr<RowVectorStream> valueStream_;
+ std::unique_ptr<RowVectorStream> rvStream_;
};
class RowVectorStreamOperatorTranslator : public
facebook::velox::exec::Operator::PlanNodeTranslator {
- std::unique_ptr<facebook::velox::exec::Operator>
- toOperator(facebook::velox::exec::DriverCtx* ctx, int32_t id, const
facebook::velox::core::PlanNodePtr& node) {
+ std::unique_ptr<facebook::velox::exec::Operator> toOperator(
+ facebook::velox::exec::DriverCtx* ctx,
+ int32_t id,
+ const facebook::velox::core::PlanNodePtr& node) override {
if (auto valueStreamNode = std::dynamic_pointer_cast<const
ValueStreamNode>(node)) {
return std::make_unique<ValueStream>(id, ctx, valueStreamNode);
}
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 34710c35a4..404e99994f 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -18,6 +18,7 @@
#include "SubstraitToVeloxPlan.h"
#include "TypeUtils.h"
#include "VariantToVectorConverter.h"
+#include "operators/plannodes/RowVectorStream.h"
#include "velox/connectors/hive/HiveDataSink.h"
#include "velox/exec/TableWriter.h"
#include "velox/type/Filter.h"
@@ -1107,7 +1108,12 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::constructValueStreamNode(
}
auto outputType = ROW(std::move(outNames), std::move(veloxTypeList));
- auto node = valueStreamNodeFactory_(nextPlanNodeId(), pool_, streamIdx,
outputType);
+ std::shared_ptr<ResultIterator> iterator;
+ if (!validationMode_) {
+ VELOX_CHECK_LT(streamIdx, inputIters_.size(), "Could not find stream index
{} in input iterator list.", streamIdx);
+ iterator = inputIters_[streamIdx];
+ }
+ auto node = std::make_shared<ValueStreamNode>(nextPlanNodeId(), outputType,
std::move(iterator));
auto splitInfo = std::make_shared<SplitInfo>();
splitInfo->isStream = true;
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h
b/cpp/velox/substrait/SubstraitToVeloxPlan.h
index 1f2f39f51a..0e892469d0 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h
@@ -165,6 +165,10 @@ class SubstraitToVeloxPlanConverter {
valueStreamNodeFactory_ = std::move(factory);
}
+ void setInputIters(std::vector<std::shared_ptr<ResultIterator>> inputIters) {
+ inputIters_ = std::move(inputIters);
+ }
+
/// Used to check if ReadRel specifies an input of stream.
/// If yes, the index of input stream will be returned.
/// If not, -1 will be returned.
@@ -591,6 +595,8 @@ class SubstraitToVeloxPlanConverter {
std::function<core::PlanNodePtr(std::string, memory::MemoryPool*, int32_t,
RowTypePtr)> valueStreamNodeFactory_;
+ std::vector<std::shared_ptr<ResultIterator>> inputIters_;
+
/// The map storing the pre-built plan nodes which can be accessed through
/// index. This map is only used when the computation of a Substrait plan
/// depends on other input nodes.
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index a17f72de31..59abfb37e6 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -866,6 +866,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-17091: Convert IN predicate to Parquet filter push-down")
.exclude("Support Parquet column index")
.exclude("SPARK-34562: Bloom filter push down")
+ // https://github.com/apache/incubator-gluten/issues/7174
+ .excludeGlutenTest("Filter applied on merged Parquet schema with new
column should work")
enableSuite[GlutenParquetV2FilterSuite]
// Rewrite.
.exclude("Filter applied on merged Parquet schema with new column should
work")
@@ -882,6 +884,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-17091: Convert IN predicate to Parquet filter push-down")
.exclude("Support Parquet column index")
.exclude("SPARK-34562: Bloom filter push down")
+ // https://github.com/apache/incubator-gluten/issues/7174
+ .excludeGlutenTest("Filter applied on merged Parquet schema with new
column should work")
enableSuite[GlutenParquetInteroperabilitySuite]
.exclude("parquet timestamp conversion")
enableSuite[GlutenParquetIOSuite]
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
index 4d190f9d59..51204b0777 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
@@ -48,7 +48,7 @@ import java.time.LocalDate
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
-abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with
GlutenSQLTestsBaseTrait {
+abstract class GlutenParquetFilterSuite extends ParquetFilterSuite with
GlutenSQLTestsBaseTrait {
protected def checkFilterPredicate(
predicate: Predicate,
filterClass: Class[_ <: FilterPredicate],
@@ -357,7 +357,7 @@ abstract class GltuenParquetFilterSuite extends
ParquetFilterSuite with GlutenSQ
}
@ExtendedSQLTest
-class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with
GlutenSQLTestsBaseTrait {
+class GlutenParquetV1FilterSuite extends GlutenParquetFilterSuite with
GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are
workable.
override def sparkConf: SparkConf =
super.sparkConf
@@ -445,7 +445,7 @@ class GlutenParquetV1FilterSuite extends
GltuenParquetFilterSuite with GlutenSQL
}
@ExtendedSQLTest
-class GlutenParquetV2FilterSuite extends GltuenParquetFilterSuite with
GlutenSQLTestsBaseTrait {
+class GlutenParquetV2FilterSuite extends GlutenParquetFilterSuite with
GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are
workable.
override def sparkConf: SparkConf =
super.sparkConf
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index ae3e7c7b8e..055b4d5cb2 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -677,6 +677,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("Support Parquet column index")
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer
name are the same")
+ // https://github.com/apache/incubator-gluten/issues/7174
+ .excludeGlutenTest("Filter applied on merged Parquet schema with new
column should work")
enableSuite[GlutenParquetV2FilterSuite]
// Rewrite.
.exclude("Filter applied on merged Parquet schema with new column should
work")
@@ -695,6 +697,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("Support Parquet column index")
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer
name are the same")
+ // https://github.com/apache/incubator-gluten/issues/7174
+ .excludeGlutenTest("Filter applied on merged Parquet schema with new
column should work")
enableSuite[GlutenParquetInteroperabilitySuite]
.exclude("parquet timestamp conversion")
enableSuite[GlutenParquetIOSuite]
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
index 20f5f8aed6..a1163f9525 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
@@ -48,7 +48,7 @@ import java.time.LocalDate
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
-abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with
GlutenSQLTestsBaseTrait {
+abstract class GlutenParquetFilterSuite extends ParquetFilterSuite with
GlutenSQLTestsBaseTrait {
protected def checkFilterPredicate(
predicate: Predicate,
filterClass: Class[_ <: FilterPredicate],
@@ -328,7 +328,7 @@ abstract class GltuenParquetFilterSuite extends
ParquetFilterSuite with GlutenSQ
}
@ExtendedSQLTest
-class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with
GlutenSQLTestsBaseTrait {
+class GlutenParquetV1FilterSuite extends GlutenParquetFilterSuite with
GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are
workable.
override def sparkConf: SparkConf =
super.sparkConf
@@ -416,7 +416,7 @@ class GlutenParquetV1FilterSuite extends
GltuenParquetFilterSuite with GlutenSQL
}
@ExtendedSQLTest
-class GlutenParquetV2FilterSuite extends GltuenParquetFilterSuite with
GlutenSQLTestsBaseTrait {
+class GlutenParquetV2FilterSuite extends GlutenParquetFilterSuite with
GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are
workable.
override def sparkConf: SparkConf =
super.sparkConf
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 0da19922ff..5918fd5344 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -658,6 +658,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer
name are the same")
.exclude("filter pushdown - StringPredicate")
+ // https://github.com/apache/incubator-gluten/issues/7174
+ .excludeGlutenTest("Filter applied on merged Parquet schema with new
column should work")
enableSuite[GlutenParquetV2FilterSuite]
// Rewrite.
.exclude("Filter applied on merged Parquet schema with new column should
work")
@@ -676,6 +678,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer
name are the same")
.exclude("filter pushdown - StringPredicate")
+ // https://github.com/apache/incubator-gluten/issues/7174
+ .excludeGlutenTest("Filter applied on merged Parquet schema with new
column should work")
enableSuite[GlutenParquetInteroperabilitySuite]
.exclude("parquet timestamp conversion")
enableSuite[GlutenParquetIOSuite]
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
index 471d88f362..1f06c4ec54 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
@@ -48,7 +48,7 @@ import java.time.LocalDate
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
-abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with
GlutenSQLTestsBaseTrait {
+abstract class GlutenParquetFilterSuite extends ParquetFilterSuite with
GlutenSQLTestsBaseTrait {
protected def checkFilterPredicate(
predicate: Predicate,
filterClass: Class[_ <: FilterPredicate],
@@ -329,7 +329,7 @@ abstract class GltuenParquetFilterSuite extends
ParquetFilterSuite with GlutenSQ
}
@ExtendedSQLTest
-class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with
GlutenSQLTestsBaseTrait {
+class GlutenParquetV1FilterSuite extends GlutenParquetFilterSuite with
GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are
workable.
override def sparkConf: SparkConf =
super.sparkConf
@@ -417,7 +417,7 @@ class GlutenParquetV1FilterSuite extends
GltuenParquetFilterSuite with GlutenSQL
}
@ExtendedSQLTest
-class GlutenParquetV2FilterSuite extends GltuenParquetFilterSuite with
GlutenSQLTestsBaseTrait {
+class GlutenParquetV2FilterSuite extends GlutenParquetFilterSuite with
GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are
workable.
override def sparkConf: SparkConf =
super.sparkConf
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index e54aca34ec..ed2b20ffe4 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -663,6 +663,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer
name are the same")
.exclude("filter pushdown - StringPredicate")
+ // https://github.com/apache/incubator-gluten/issues/7174
+ .excludeGlutenTest("Filter applied on merged Parquet schema with new
column should work")
enableSuite[GlutenParquetV2FilterSuite]
// Rewrite.
.exclude("Filter applied on merged Parquet schema with new column should
work")
@@ -681,6 +683,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-34562: Bloom filter push down")
.exclude("SPARK-16371 Do not push down filters when inner name and outer
name are the same")
.exclude("filter pushdown - StringPredicate")
+ // https://github.com/apache/incubator-gluten/issues/7174
+ .excludeGlutenTest("Filter applied on merged Parquet schema with new
column should work")
enableSuite[GlutenParquetInteroperabilitySuite]
.exclude("parquet timestamp conversion")
enableSuite[GlutenParquetIOSuite]
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
index 4141acee31..063b424e0d 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala
@@ -47,7 +47,7 @@ import java.time.LocalDate
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
-abstract class GltuenParquetFilterSuite extends ParquetFilterSuite with
GlutenSQLTestsBaseTrait {
+abstract class GlutenParquetFilterSuite extends ParquetFilterSuite with
GlutenSQLTestsBaseTrait {
protected def checkFilterPredicate(
predicate: Predicate,
filterClass: Class[_ <: FilterPredicate],
@@ -328,7 +328,7 @@ abstract class GltuenParquetFilterSuite extends
ParquetFilterSuite with GlutenSQ
}
@ExtendedSQLTest
-class GlutenParquetV1FilterSuite extends GltuenParquetFilterSuite with
GlutenSQLTestsBaseTrait {
+class GlutenParquetV1FilterSuite extends GlutenParquetFilterSuite with
GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are
workable.
override def sparkConf: SparkConf =
super.sparkConf
@@ -416,7 +416,7 @@ class GlutenParquetV1FilterSuite extends
GltuenParquetFilterSuite with GlutenSQL
}
@ExtendedSQLTest
-class GlutenParquetV2FilterSuite extends GltuenParquetFilterSuite with
GlutenSQLTestsBaseTrait {
+class GlutenParquetV2FilterSuite extends GlutenParquetFilterSuite with
GlutenSQLTestsBaseTrait {
// TODO: enable Parquet V2 write path after file source V2 writers are
workable.
override def sparkConf: SparkConf =
super.sparkConf
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index fd1a4907cd..67bd4716df 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -274,7 +274,7 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def veloxSsdODirectEnabled: Boolean =
conf.getConf(COLUMNAR_VELOX_SSD_ODIRECT_ENABLED)
def veloxConnectorIOThreads: Int = {
-
conf.getConf(COLUMNAR_VELOX_CONNECTOR_IO_THREADS).getOrElse(numTaskSlotsPerExecutor)
+ conf.getConf(COLUMNAR_VELOX_CONNECTOR_IO_THREADS)
}
def veloxSplitPreloadPerDriver: Integer =
conf.getConf(COLUMNAR_VELOX_SPLIT_PRELOAD_PER_DRIVER)
@@ -710,7 +710,7 @@ object GlutenConfig {
(AWS_S3_RETRY_MODE.key, AWS_S3_RETRY_MODE.defaultValueString),
(
COLUMNAR_VELOX_CONNECTOR_IO_THREADS.key,
- conf.getOrElse(GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, "-1")),
+ COLUMNAR_VELOX_CONNECTOR_IO_THREADS.defaultValueString),
(COLUMNAR_SHUFFLE_CODEC.key, ""),
(COLUMNAR_SHUFFLE_CODEC_BACKEND.key, ""),
("spark.hadoop.input.connect.timeout", "180000"),
@@ -1331,13 +1331,19 @@ object GlutenConfig {
.booleanConf
.createWithDefault(false)
+ // FIXME: May cause issues when toggled on. Examples:
+ // https://github.com/apache/incubator-gluten/issues/7161
+ // https://github.com/facebookincubator/velox/issues/10173
val COLUMNAR_VELOX_CONNECTOR_IO_THREADS =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.IOThreads")
.internal()
- .doc("The Size of the IO thread pool in the Connector. This thread pool
is used for split" +
- " preloading and DirectBufferedInput.")
+ .doc(
+ "Experimental: The Size of the IO thread pool in the Connector." +
+ " This thread pool is used for split preloading and
DirectBufferedInput." +
+ " The option is experimental. Toggling on it (setting a non-zero
value) may cause some" +
+ " unexpected issues when application reaches some certain
conditions.")
.intConf
- .createOptional
+ .createWithDefault(0)
val COLUMNAR_VELOX_ASYNC_TIMEOUT =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]