This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b69a6e8bc [GLUTEN-7031][VL] Minimized backend API (#7218)
b69a6e8bc is described below
commit b69a6e8bcf6432199fad3863a003913c3ab2cd10
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Sep 13 14:06:33 2024 +0800
[GLUTEN-7031][VL] Minimized backend API (#7218)
---
.../LowCopyFileSegmentShuffleInputStream.java | 12 +--
.../vectorized/LowCopyNettyShuffleInputStream.java | 10 +--
.../vectorized/OnHeapCopyShuffleInputStream.java | 50 ++++++------
...i.Backend => org.apache.gluten.backend.Backend} | 0
.../gluten/backendsapi/clickhouse/CHBackend.scala | 9 ++-
.../backendsapi/clickhouse/CHIteratorApi.scala | 2 +-
.../backendsapi/clickhouse/CHMetricsApi.scala | 2 +-
.../gluten/backendsapi/clickhouse/CHRuleApi.scala | 3 +
.../clickhouse/CHSparkPlanExecApi.scala | 4 -
...i.Backend => org.apache.gluten.backend.Backend} | 0
.../gluten/backendsapi/velox/VeloxBackend.scala | 20 +++--
.../backendsapi/velox/VeloxListenerApi.scala | 5 +-
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 3 +
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 20 -----
.../apache/gluten/test/VeloxBackendTestBase.java | 26 ++++---
gluten-core/pom.xml | 5 ++
.../apache/gluten/exception/GlutenException.java | 10 ---
.../DynamicOffHeapSizingMemoryTarget.java | 2 +
.../gluten/memory/memtarget/MemoryTargets.java | 2 +
.../gluten/softaffinity/SoftAffinityManager.scala | 5 +-
.../strategy/SoftAffinityAllocationTrait.scala | 0
.../strategy/SoftAffinityStrategy.scala | 6 +-
.../apache/spark/softaffinity/SoftAffinity.scala | 3 +-
.../spark/softaffinity}/SoftAffinityListener.scala | 11 ++-
.../scala/org/apache/gluten/GlutenPlugin.scala | 72 ++++++++---------
.../scala/org/apache/gluten/backend/Backend.scala | 90 +++++++++-------------
.../gluten/extension/GlutenColumnarRule.scala | 10 +--
.../gluten/extension/GlutenSessionExtensions.scala | 4 +-
.../extension/columnar/ColumnarRuleApplier.scala | 9 ++-
.../columnar/enumerated/EnumeratedApplier.scala | 22 +++---
.../columnar/heuristic/FallbackNode.scala | 22 +++---
.../columnar/heuristic/HeuristicApplier.scala | 41 +++++-----
.../extension/columnar/transition/Convention.scala | 0
.../columnar/transition/ConventionFunc.scala | 6 +-
.../columnar/transition/ConventionReq.scala | 4 +-
.../extension/columnar/transition/Transition.scala | 0
.../columnar/transition/Transitions.scala | 4 +-
.../extension/columnar/transition/package.scala | 0
.../gluten/extension/injector/GlutenInjector.scala | 26 +++++--
.../gluten/extension/injector/RuleInjector.scala | 0
.../gluten/extension/injector/SparkInjector.scala | 0
.../gluten/extension}/util/AdaptiveContext.scala | 2 +-
.../src/main/scala/org/apache/gluten/gluten.scala | 0
.../org/apache/gluten/logging}/LogLevelUtil.scala | 2 +-
.../org/apache/spark/util/SparkResourceUtil.scala | 0
gluten-substrait/pom.xml | 5 --
.../java/org/apache/gluten/test/TestStats.java | 25 +++---
.../gluten/backendsapi/BackendSettingsApi.scala | 8 --
.../gluten/backendsapi/BackendsApiManager.scala | 26 +------
.../gluten/backendsapi/SparkPlanExecApi.scala | 11 ---
.../{Backend.scala => SubstraitBackend.scala} | 40 +++++-----
.../org/apache/gluten/extension/GlutenPlan.scala | 5 +-
.../extension/columnar/ExpandFallbackPolicy.scala | 10 +--
.../extension/columnar/MiscColumnarRules.scala | 3 +-
.../extension/columnar/OffloadSingleNode.scala | 3 +-
.../columnar/enumerated/EnumeratedTransform.scala | 2 +-
.../scala/org/apache/gluten/utils/PlanUtil.scala | 6 +-
.../apache/gluten/utils/QueryPlanSelector.scala | 6 +-
.../spark/listener/GlutenListenerFactory.scala | 35 ---------
.../ColumnarCollapseTransformStages.scala | 3 +-
.../sql/execution/GlutenFallbackReporter.scala | 2 +-
.../apache/spark/sql/hive/HiveUDAFInspector.scala | 0
.../spark/softaffinity/SoftAffinitySuite.scala | 1 -
.../SoftAffinityWithRDDInfoSuite.scala | 3 +-
.../spark/sql/execution/ui/GlutenEventUtils.scala | 6 --
.../execution/ui/GlutenSQLAppStatusListener.scala | 10 ++-
.../sql/execution/FallbackStrategiesSuite.scala | 3 +-
.../extension/GlutenSessionExtensionSuite.scala | 5 +-
.../sql/execution/FallbackStrategiesSuite.scala | 3 +-
.../extension/GlutenSessionExtensionSuite.scala | 5 +-
.../sql/execution/FallbackStrategiesSuite.scala | 3 +-
.../extension/GlutenSessionExtensionSuite.scala | 5 +-
.../sql/execution/FallbackStrategiesSuite.scala | 3 +-
.../extension/GlutenSessionExtensionSuite.scala | 5 +-
.../scala/org/apache/gluten/GlutenConfig.scala | 15 ++--
75 files changed, 360 insertions(+), 421 deletions(-)
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyFileSegmentShuffleInputStream.java
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyFileSegmentShuffleInputStream.java
index 3d9157a8e..62dd57664 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyFileSegmentShuffleInputStream.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyFileSegmentShuffleInputStream.java
@@ -89,11 +89,11 @@ public class LowCopyFileSegmentShuffleInputStream
implements ShuffleInputStream
@Override
public void close() {
- GlutenException.wrap(
- () -> {
- channel.close();
- in.close();
- return null;
- });
+ try {
+ channel.close();
+ in.close();
+ } catch (Exception e) {
+ throw new GlutenException(e);
+ }
}
}
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyNettyShuffleInputStream.java
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyNettyShuffleInputStream.java
index 0eb921579..ca6ad213d 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyNettyShuffleInputStream.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/LowCopyNettyShuffleInputStream.java
@@ -65,10 +65,10 @@ public class LowCopyNettyShuffleInputStream implements
ShuffleInputStream {
@Override
public void close() {
- GlutenException.wrap(
- () -> {
- in.close();
- return null;
- });
+ try {
+ in.close();
+ } catch (Exception e) {
+ throw new GlutenException(e);
+ }
}
}
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/OnHeapCopyShuffleInputStream.java
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/OnHeapCopyShuffleInputStream.java
index ea7a1e236..0b1bd60be 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/OnHeapCopyShuffleInputStream.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/OnHeapCopyShuffleInputStream.java
@@ -37,24 +37,26 @@ public class OnHeapCopyShuffleInputStream implements
ShuffleInputStream {
@Override
public long read(long destAddress, long maxReadSize) {
- return GlutenException.wrap(
- () -> {
- int maxReadSize32 = Math.toIntExact(maxReadSize);
- if (buffer == null || maxReadSize32 > buffer.length) {
- this.buffer = new byte[maxReadSize32];
- }
- // The code conducts copy as long as 'in' wraps off-heap data,
- // which is about to be moved to heap
- int read = in.read(buffer, 0, maxReadSize32);
- if (read == -1 || read == 0) {
- return 0;
- }
- // The code conducts copy, from heap to off-heap
- // memCopyFromHeap(buffer, destAddress, read);
- PlatformDependent.copyMemory(buffer, 0, destAddress, read);
- bytesRead += read;
- return read;
- });
+ try {
+
+ int maxReadSize32 = Math.toIntExact(maxReadSize);
+ if (buffer == null || maxReadSize32 > buffer.length) {
+ this.buffer = new byte[maxReadSize32];
+ }
+ // The code conducts copy as long as 'in' wraps off-heap data,
+ // which is about to be moved to heap
+ int read = in.read(buffer, 0, maxReadSize32);
+ if (read == -1 || read == 0) {
+ return 0;
+ }
+ // The code conducts copy, from heap to off-heap
+ // memCopyFromHeap(buffer, destAddress, read);
+ PlatformDependent.copyMemory(buffer, 0, destAddress, read);
+ bytesRead += read;
+ return read;
+ } catch (Exception e) {
+ throw new GlutenException(e);
+ }
}
@Override
@@ -69,11 +71,11 @@ public class OnHeapCopyShuffleInputStream implements
ShuffleInputStream {
@Override
public void close() {
- GlutenException.wrap(
- () -> {
- in.close();
- in = null;
- return null;
- });
+ try {
+ in.close();
+ in = null;
+ } catch (Exception e) {
+ throw new GlutenException(e);
+ }
}
}
diff --git
a/backends-clickhouse/src/main/resources/META-INF/services/org.apache.gluten.backendsapi.Backend
b/backends-clickhouse/src/main/resources/META-INF/services/org.apache.gluten.backend.Backend
similarity index 100%
rename from
backends-clickhouse/src/main/resources/META-INF/services/org.apache.gluten.backendsapi.Backend
rename to
backends-clickhouse/src/main/resources/META-INF/services/org.apache.gluten.backend.Backend
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 45aee4322..9fd66b473 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -17,10 +17,12 @@
package org.apache.gluten.backendsapi.clickhouse
import org.apache.gluten.{CH_BRANCH, CH_COMMIT, GlutenConfig}
+import org.apache.gluten.backend.Backend
import org.apache.gluten.backendsapi._
import org.apache.gluten.execution.WriteFilesExecTransformer
import org.apache.gluten.expression.WindowFunctionsBuilder
import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._
@@ -43,10 +45,11 @@ import java.util.Locale
import scala.util.control.Breaks.{break, breakable}
-class CHBackend extends Backend {
+class CHBackend extends SubstraitBackend {
override def name(): String = CHBackend.BACKEND_NAME
- override def buildInfo(): BackendBuildInfo =
- BackendBuildInfo("ClickHouse", CH_BRANCH, CH_COMMIT, "UNKNOWN")
+ override def batchType: Convention.BatchType = CHBatch
+ override def buildInfo(): Backend.BuildInfo =
+ Backend.BuildInfo("ClickHouse", CH_BRANCH, CH_COMMIT, "UNKNOWN")
override def iteratorApi(): IteratorApi = new CHIteratorApi
override def sparkPlanExecApi(): SparkPlanExecApi = new CHSparkPlanExecApi
override def transformerApi(): TransformerApi = new CHTransformerApi
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index f33e767e1..d3a0b7e28 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -20,13 +20,13 @@ import org.apache.gluten.GlutenNumaBindingInfo
import org.apache.gluten.backendsapi.IteratorApi
import org.apache.gluten.execution._
import org.apache.gluten.expression.ConverterUtils
+import org.apache.gluten.logging.LogLevelUtil
import org.apache.gluten.memory.CHThreadGroup
import org.apache.gluten.metrics.{IMetrics, NativeMetrics}
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.plan.PlanNode
import org.apache.gluten.substrait.rel._
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
-import org.apache.gluten.utils.LogLevelUtil
import org.apache.gluten.vectorized.{BatchIterator,
CHNativeExpressionEvaluator, CloseableCHColumnBatchIterator, GeneralInIterator}
import org.apache.spark.{InterruptibleIterator, SparkConf, TaskContext}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
index c53448cdd..d84181fec 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
@@ -17,9 +17,9 @@
package org.apache.gluten.backendsapi.clickhouse
import org.apache.gluten.backendsapi.MetricsApi
+import org.apache.gluten.logging.LogLevelUtil
import org.apache.gluten.metrics._
import org.apache.gluten.substrait.{AggregationParams, JoinParams}
-import org.apache.gluten.utils.LogLevelUtil
import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 04644d4a2..d5e079508 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -26,6 +26,7 @@ import org.apache.gluten.extension.injector.{RuleInjector,
SparkInjector}
import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector,
RasInjector}
import org.apache.gluten.parser.{GlutenCacheFilesSqlParser,
GlutenClickhouseSqlParser}
import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.utils.PhysicalPlanSelector
import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule,
EqualToRewrite}
import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages,
GlutenFallbackReporter}
@@ -34,6 +35,8 @@ import org.apache.spark.util.SparkPlanRules
class CHRuleApi extends RuleApi {
import CHRuleApi._
override def injectRules(injector: RuleInjector): Unit = {
+ injector.gluten.skipOn(PhysicalPlanSelector.skipCond)
+
injectSpark(injector.spark)
injectLegacy(injector.gluten.legacy)
injectRas(injector.gluten.ras)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index f765a75d2..c9c6a14a8 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -24,7 +24,6 @@ import org.apache.gluten.expression._
import org.apache.gluten.extension.ExpressionExtensionTrait
import org.apache.gluten.extension.columnar.AddFallbackTagRule
import
org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
-import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.expression.{ExpressionBuilder,
ExpressionNode, WindowFunctionNode}
import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy}
@@ -67,9 +66,6 @@ import scala.collection.mutable.ArrayBuffer
class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
- /** The columnar-batch type this backend is using. */
- override def batchType: Convention.BatchType = CHBatch
-
/** Transform GetArrayItem to Substrait. */
override def genGetArrayItemTransformer(
substraitExprName: String,
diff --git
a/backends-velox/src/main/resources/META-INF/services/org.apache.gluten.backendsapi.Backend
b/backends-velox/src/main/resources/META-INF/services/org.apache.gluten.backend.Backend
similarity index 100%
rename from
backends-velox/src/main/resources/META-INF/services/org.apache.gluten.backendsapi.Backend
rename to
backends-velox/src/main/resources/META-INF/services/org.apache.gluten.backend.Backend
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index ecd053a63..19985d6b8 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -17,11 +17,14 @@
package org.apache.gluten.backendsapi.velox
import org.apache.gluten.{GlutenConfig, VELOX_BRANCH, VELOX_REVISION,
VELOX_REVISION_TIME}
+import org.apache.gluten.backend.Backend
import org.apache.gluten.backendsapi._
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution.WriteFilesExecTransformer
import org.apache.gluten.expression.WindowFunctionsBuilder
import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.extension.columnar.transition.Convention
+import
org.apache.gluten.extension.columnar.transition.ConventionFunc.BatchOverride
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.{DwrfReadFormat,
OrcReadFormat, ParquetReadFormat}
@@ -32,6 +35,8 @@ import org.apache.spark.sql.catalyst.expressions.{Alias,
CumeDist, DenseRank, De
import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
ApproximatePercentile}
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.execution.ColumnarCachedBatchSerializer
+import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
import org.apache.spark.sql.execution.datasources.{FileFormat,
InsertIntoHadoopFsRelationCommand}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
@@ -43,10 +48,17 @@ import org.apache.hadoop.fs.Path
import scala.util.control.Breaks.breakable
-class VeloxBackend extends Backend {
+class VeloxBackend extends SubstraitBackend {
override def name(): String = VeloxBackend.BACKEND_NAME
- override def buildInfo(): BackendBuildInfo =
- BackendBuildInfo("Velox", VELOX_BRANCH, VELOX_REVISION,
VELOX_REVISION_TIME)
+ override def batchType: Convention.BatchType = VeloxBatch
+ override def batchTypeFunc(): BatchOverride = {
+ case i: InMemoryTableScanExec
+ if i.supportsColumnar && i.relation.cacheBuilder.serializer
+ .isInstanceOf[ColumnarCachedBatchSerializer] =>
+ VeloxBatch
+ }
+ override def buildInfo(): Backend.BuildInfo =
+ Backend.BuildInfo("Velox", VELOX_BRANCH, VELOX_REVISION,
VELOX_REVISION_TIME)
override def iteratorApi(): IteratorApi = new VeloxIteratorApi
override def sparkPlanExecApi(): SparkPlanExecApi = new VeloxSparkPlanExecApi
override def transformerApi(): TransformerApi = new VeloxTransformerApi
@@ -454,7 +466,5 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def supportColumnarArrowUdf(): Boolean = true
- override def generateHdfsConfForLibhdfs(): Boolean = true
-
override def needPreComputeRangeFrameBoundary(): Boolean = true
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
index 02597f815..2cec8d392 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
@@ -25,7 +25,7 @@ import org.apache.gluten.jni.{JniLibLoader, JniWorkspace}
import org.apache.gluten.udf.UdfJniWrapper
import org.apache.gluten.utils._
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{HdfsConfGenerator, SparkConf, SparkContext}
import org.apache.spark.api.plugin.PluginContext
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.ByteUnit
@@ -44,6 +44,9 @@ class VeloxListenerApi extends ListenerApi with Logging {
override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {
val conf = pc.conf()
+ // Generate HDFS client configurations.
+ HdfsConfGenerator.addHdfsClientToSparkWorkDirectory(sc)
+
// Overhead memory limits.
val offHeapSize = conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)
val desiredOverheadSize = (0.1 *
offHeapSize).toLong.max(ByteUnit.MiB.toBytes(384))
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 6204ab092..7f95c78be 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -28,6 +28,7 @@ import
org.apache.gluten.extension.columnar.transition.{InsertTransitions, Remov
import org.apache.gluten.extension.injector.{RuleInjector, SparkInjector}
import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector,
RasInjector}
import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.utils.PhysicalPlanSelector
import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages,
GlutenFallbackReporter}
@@ -35,6 +36,8 @@ class VeloxRuleApi extends RuleApi {
import VeloxRuleApi._
override def injectRules(injector: RuleInjector): Unit = {
+ injector.gluten.skipOn(PhysicalPlanSelector.skipCond)
+
injectSpark(injector.spark)
injectLegacy(injector.gluten.legacy)
injectRas(injector.gluten.ras)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 4a9bfef55..64c212690 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -23,8 +23,6 @@ import org.apache.gluten.execution._
import org.apache.gluten.expression._
import org.apache.gluten.expression.aggregate.{HLLAdapter,
VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet}
import org.apache.gluten.extension.columnar.FallbackTags
-import org.apache.gluten.extension.columnar.transition.Convention
-import
org.apache.gluten.extension.columnar.transition.ConventionFunc.BatchOverride
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.vectorized.{ColumnarBatchSerializer,
ColumnarBatchSerializeResult}
@@ -42,7 +40,6 @@ import org.apache.spark.sql.catalyst.optimizer.BuildSide
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BuildSideRelation,
HashedRelationBroadcastMode}
@@ -61,23 +58,6 @@ import javax.ws.rs.core.UriBuilder
class VeloxSparkPlanExecApi extends SparkPlanExecApi {
- /** The columnar-batch type this backend is using. */
- override def batchType: Convention.BatchType = {
- VeloxBatch
- }
-
- /**
- * Overrides
[[org.apache.gluten.extension.columnar.transition.ConventionFunc]] Gluten is
using to
- * determine the convention (its row-based processing / columnar-batch
processing support) of a
- * plan with a user-defined function that accepts a plan then returns batch
type it outputs.
- */
- override def batchTypeFunc(): BatchOverride = {
- case i: InMemoryTableScanExec
- if i.supportsColumnar && i.relation.cacheBuilder.serializer
- .isInstanceOf[ColumnarCachedBatchSerializer] =>
- VeloxBatch
- }
-
/** Transform GetArrayItem to Substrait. */
override def genGetArrayItemTransformer(
substraitExprName: String,
diff --git
a/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
b/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
index 9e5843060..211731159 100644
---
a/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
+++
b/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
@@ -22,25 +22,26 @@ import org.apache.gluten.backendsapi.velox.VeloxListenerApi;
import com.codahale.metrics.MetricRegistry;
import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
import org.apache.spark.api.plugin.PluginContext;
import org.apache.spark.resource.ResourceInformation;
+import org.jetbrains.annotations.NotNull;
+import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.io.IOException;
import java.util.Map;
-/** For testing Velox backend without starting a Spark context. */
public abstract class VeloxBackendTestBase {
+ private static final ListenerApi API = new VeloxListenerApi();
+
@BeforeClass
public static void setup() {
- final ListenerApi api = new VeloxListenerApi();
- api.onDriverStart(mockSparkContext(), mockPluginContext());
+ API.onExecutorStart(mockPluginContext());
}
- private static SparkContext mockSparkContext() {
- // Not yet implemented.
- return null;
+ @AfterClass
+ public static void tearDown() {
+ API.onExecutorShutdown();
}
private static PluginContext mockPluginContext() {
@@ -52,9 +53,7 @@ public abstract class VeloxBackendTestBase {
@Override
public SparkConf conf() {
- final SparkConf conf = new SparkConf();
- conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY(), "1g");
- return conf;
+ return newSparkConf();
}
@Override
@@ -83,4 +82,11 @@ public abstract class VeloxBackendTestBase {
}
};
}
+
+ @NotNull
+ private static SparkConf newSparkConf() {
+ final SparkConf conf = new SparkConf();
+ conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY(), "1g");
+ return conf;
+ }
}
diff --git a/gluten-core/pom.xml b/gluten-core/pom.xml
index 448bc9ee2..5e077a8b7 100644
--- a/gluten-core/pom.xml
+++ b/gluten-core/pom.xml
@@ -31,6 +31,11 @@
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>gluten-ui</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- Prevent our dummy JAR from being included in Spark distributions or
uploaded to YARN -->
<dependency>
<groupId>org.apache.spark</groupId>
diff --git
a/gluten-core/src/main/java/org/apache/gluten/exception/GlutenException.java
b/gluten-core/src/main/java/org/apache/gluten/exception/GlutenException.java
index 147e69719..adbc219df 100644
--- a/gluten-core/src/main/java/org/apache/gluten/exception/GlutenException.java
+++ b/gluten-core/src/main/java/org/apache/gluten/exception/GlutenException.java
@@ -16,8 +16,6 @@
*/
package org.apache.gluten.exception;
-import java.util.concurrent.Callable;
-
public class GlutenException extends RuntimeException {
public GlutenException() {}
@@ -33,12 +31,4 @@ public class GlutenException extends RuntimeException {
public GlutenException(Throwable cause) {
super(cause);
}
-
- public static <V> V wrap(Callable<V> callable) {
- try {
- return callable.call();
- } catch (Exception e) {
- throw new GlutenException(e);
- }
- }
}
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
index b7f15d830..ea18a05d0 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
@@ -18,11 +18,13 @@ package org.apache.gluten.memory.memtarget;
import org.apache.gluten.GlutenConfig;
+import org.apache.spark.annotation.Experimental;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicLong;
+@Experimental
public class DynamicOffHeapSizingMemoryTarget implements MemoryTarget {
private static final Logger LOG =
LoggerFactory.getLogger(DynamicOffHeapSizingMemoryTarget.class);
private final MemoryTarget delegated;
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
index bb1e7102b..6f7cc9bd9 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
@@ -20,6 +20,7 @@ import org.apache.gluten.GlutenConfig;
import org.apache.gluten.memory.MemoryUsageStatsBuilder;
import org.apache.gluten.memory.memtarget.spark.TreeMemoryConsumers;
+import org.apache.spark.annotation.Experimental;
import org.apache.spark.memory.TaskMemoryManager;
import java.util.Map;
@@ -42,6 +43,7 @@ public final class MemoryTargets {
return new OverAcquire(target, overTarget, overAcquiredRatio);
}
+ @Experimental
public static MemoryTarget dynamicOffHeapSizingIfEnabled(MemoryTarget
memoryTarget) {
if (GlutenConfig.getConf().dynamicOffHeapSizingEnabled()) {
return new DynamicOffHeapSizingMemoryTarget(memoryTarget);
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
b/gluten-core/src/main/java/org/apache/gluten/softaffinity/SoftAffinityManager.scala
similarity index 99%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
rename to
gluten-core/src/main/java/org/apache/gluten/softaffinity/SoftAffinityManager.scala
index 222c7f91e..7cb0b2fa2 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala
+++
b/gluten-core/src/main/java/org/apache/gluten/softaffinity/SoftAffinityManager.scala
@@ -19,18 +19,15 @@ package org.apache.gluten.softaffinity
import org.apache.gluten.GlutenConfig
import org.apache.gluten.softaffinity.strategy.SoftAffinityStrategy
import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.gluten.utils.LogLevelUtil
-
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListenerStageCompleted,
SparkListenerStageSubmitted, SparkListenerTaskEnd}
import org.apache.spark.sql.execution.datasources.FilePartition
-
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+import org.apache.gluten.logging.LogLevelUtil
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantReadWriteLock
-
import scala.collection.mutable
import scala.util.Random
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
b/gluten-core/src/main/java/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
similarity index 100%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
rename to
gluten-core/src/main/java/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala
b/gluten-core/src/main/java/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala
similarity index 92%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala
rename to
gluten-core/src/main/java/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala
index 7a45a07b4..bc36c3b1e 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala
+++
b/gluten-core/src/main/java/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.softaffinity.strategy
import org.apache.spark.internal.Logging
-import scala.collection.mutable.LinkedHashSet
+import scala.collection.mutable
import scala.collection.mutable.ListBuffer
class SoftAffinityStrategy extends SoftAffinityAllocationTrait with Logging {
@@ -32,7 +32,7 @@ class SoftAffinityStrategy extends
SoftAffinityAllocationTrait with Logging {
} else {
val candidatesSize = candidates.size
val halfCandidatesSize = candidatesSize / softAffinityReplicationNum
- val resultSet = new LinkedHashSet[(String, String)]
+ val resultSet = new mutable.LinkedHashSet[(String, String)]
// TODO: try to use ConsistentHash
val mod = file.hashCode % candidatesSize
@@ -41,7 +41,7 @@ class SoftAffinityStrategy extends
SoftAffinityAllocationTrait with Logging {
if (candidates(c1).isDefined) {
resultSet.add(candidates(c1).get)
}
- for (i <- 1 to (softAffinityReplicationNum - 1)) {
+ for (i <- 1 until softAffinityReplicationNum) {
val c2 = (c1 + halfCandidatesSize + i) % candidatesSize
if (candidates(c2).isDefined) {
resultSet.add(candidates(c2).get)
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
b/gluten-core/src/main/java/org/apache/spark/softaffinity/SoftAffinity.scala
similarity index 98%
rename from
gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
rename to
gluten-core/src/main/java/org/apache/spark/softaffinity/SoftAffinity.scala
index cfdce5360..0e439d245 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala
+++ b/gluten-core/src/main/java/org/apache/spark/softaffinity/SoftAffinity.scala
@@ -16,9 +16,8 @@
*/
package org.apache.spark.softaffinity
+import org.apache.gluten.logging.LogLevelUtil
import org.apache.gluten.softaffinity.{AffinityManager, SoftAffinityManager}
-import org.apache.gluten.utils.LogLevelUtil
-
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql.execution.datasources.FilePartition
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/scheduler/SoftAffinityListener.scala
b/gluten-core/src/main/java/org/apache/spark/softaffinity/SoftAffinityListener.scala
similarity index 89%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/scheduler/SoftAffinityListener.scala
rename to
gluten-core/src/main/java/org/apache/spark/softaffinity/SoftAffinityListener.scala
index b9ffd62dc..d6b9a8d70 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/softaffinity/scheduler/SoftAffinityListener.scala
+++
b/gluten-core/src/main/java/org/apache/spark/softaffinity/SoftAffinityListener.scala
@@ -14,15 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.softaffinity.scheduler
+package org.apache.spark.softaffinity
import org.apache.gluten.softaffinity.SoftAffinityManager
-
+import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler._
class SoftAffinityListener extends SparkListener with Logging {
-
override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
SoftAffinityManager.updateStageMap(event)
}
@@ -46,3 +45,9 @@ class SoftAffinityListener extends SparkListener with Logging
{
SoftAffinityManager.handleExecutorRemoved(execId)
}
}
+
+object SoftAffinityListener {
+ def register(sc: SparkContext): Unit = {
+ sc.listenerBus.addToStatusQueue(new SoftAffinityListener())
+ }
+}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/GlutenPlugin.scala
b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
similarity index 83%
rename from gluten-substrait/src/main/scala/org/apache/gluten/GlutenPlugin.scala
rename to gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
index 66bab72bf..72998c0b0 100644
--- a/gluten-substrait/src/main/scala/org/apache/gluten/GlutenPlugin.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
@@ -17,19 +17,18 @@
package org.apache.gluten
import org.apache.gluten.GlutenConfig.GLUTEN_DEFAULT_SESSION_TIMEZONE_KEY
-import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.backend.Backend
import org.apache.gluten.events.GlutenBuildInfoEvent
import org.apache.gluten.exception.GlutenException
-import
org.apache.gluten.extension.GlutenSessionExtensions.{GLUTEN_SESSION_EXTENSION_NAME,
SPARK_SESSION_EXTS_KEY}
+import org.apache.gluten.extension.GlutenSessionExtensions
import org.apache.gluten.task.TaskListener
-import org.apache.gluten.test.TestStats
-import org.apache.spark.{HdfsConfGenerator, SparkConf, SparkContext,
TaskFailedReason}
+import org.apache.spark.{SparkConf, SparkContext, TaskFailedReason}
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin,
PluginContext, SparkPlugin}
import org.apache.spark.internal.Logging
-import org.apache.spark.listener.GlutenListenerFactory
import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.sql.execution.ui.GlutenEventUtils
+import org.apache.spark.softaffinity.SoftAffinityListener
+import org.apache.spark.sql.execution.ui.{GlutenEventUtils,
GlutenSQLAppStatusListener}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.task.TaskResources
import org.apache.spark.util.SparkResourceUtil
@@ -54,23 +53,21 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
override def init(sc: SparkContext, pluginContext: PluginContext):
util.Map[String, String] = {
_sc = Some(sc)
- GlutenEventUtils.registerListener(sc)
+ GlutenSQLAppStatusListener.register(sc)
postBuildInfoEvent(sc)
val conf = pluginContext.conf()
- if (conf.getBoolean(GlutenConfig.UT_STATISTIC.key, defaultValue = false)) {
- // Only statistic in UT, not thread safe
- TestStats.beginStatistic()
- }
setPredefinedConfigs(sc, conf)
- if (BackendsApiManager.getSettings.generateHdfsConfForLibhdfs()) {
- HdfsConfGenerator.addHdfsClientToSparkWorkDirectory(sc)
+ // Initialize Backends API.
+ Backend.get().onDriverStart(sc, pluginContext)
+ if (
+ sc.getConf.getBoolean(
+ GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED,
+ GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED_DEFAULT_VALUE)
+ ) {
+ SoftAffinityListener.register(sc)
}
- // Initialize Backends API
- BackendsApiManager.initialize()
- BackendsApiManager.getListenerApiInstance.onDriverStart(sc, pluginContext)
- GlutenListenerFactory.addToSparkListenerBus(sc)
Collections.emptyMap()
}
@@ -86,11 +83,11 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
}
override def shutdown(): Unit = {
- BackendsApiManager.getListenerApiInstance.onDriverShutdown()
+ Backend.get().onDriverShutdown()
}
private def postBuildInfoEvent(sc: SparkContext): Unit = {
- val buildInfo = BackendsApiManager.getBuildInfo
+ val buildInfo = Backend.get().buildInfo()
// export gluten version to property to spark
System.setProperty("gluten.version", VERSION)
@@ -107,10 +104,10 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
glutenBuildInfo.put("Gluten Revision Time", REVISION_TIME)
glutenBuildInfo.put("Gluten Build Time", BUILD_DATE)
glutenBuildInfo.put("Gluten Repo URL", REPO_URL)
- glutenBuildInfo.put("Backend", buildInfo.backend)
- glutenBuildInfo.put("Backend Branch", buildInfo.backendBranch)
- glutenBuildInfo.put("Backend Revision", buildInfo.backendRevision)
- glutenBuildInfo.put("Backend Revision Time", buildInfo.backendRevisionTime)
+ glutenBuildInfo.put("Backend", buildInfo.name)
+ glutenBuildInfo.put("Backend Branch", buildInfo.branch)
+ glutenBuildInfo.put("Backend Revision", buildInfo.revision)
+ glutenBuildInfo.put("Backend Revision Time", buildInfo.revisionTime)
val infoMap = glutenBuildInfo.toMap
val loggingInfo = infoMap.toSeq
.sortBy(_._1)
@@ -130,12 +127,13 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
private def setPredefinedConfigs(sc: SparkContext, conf: SparkConf): Unit = {
// sql extensions
- val extensions = if (conf.contains(SPARK_SESSION_EXTS_KEY)) {
- s"${conf.get(SPARK_SESSION_EXTS_KEY)},$GLUTEN_SESSION_EXTENSION_NAME"
+ val extensions = if
(conf.contains(GlutenSessionExtensions.SPARK_SESSION_EXTS_KEY)) {
+ s"${conf.get(GlutenSessionExtensions.SPARK_SESSION_EXTS_KEY)}," +
+ s"${GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME}"
} else {
- s"$GLUTEN_SESSION_EXTENSION_NAME"
+ s"${GlutenSessionExtensions.GLUTEN_SESSION_EXTENSION_NAME}"
}
- conf.set(SPARK_SESSION_EXTS_KEY, extensions)
+ conf.set(GlutenSessionExtensions.SPARK_SESSION_EXTS_KEY, extensions)
// adaptive custom cost evaluator class
if (GlutenConfig.getConf.enableGluten &&
GlutenConfig.getConf.enableGlutenCostEvaluator) {
@@ -230,13 +228,19 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
conservativeOffHeapPerTask.toString)
}
- // disable vanilla columnar readers, to prevent columnar-to-columnar
conversions
- if (BackendsApiManager.getSettings.disableVanillaColumnarReaders(conf)) {
+ // Disable vanilla columnar readers, to prevent columnar-to-columnar
conversions.
+ // FIXME: Do we still need this trick since
+ // https://github.com/apache/incubator-gluten/pull/1931 was merged?
+ if (
+ !conf.getBoolean(
+ GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED.key,
+ GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED.defaultValue.get)
+ ) {
// FIXME Hongze 22/12/06
// BatchScan.scala in shim was not always loaded by class loader.
// The file should be removed and the "ClassCastException" issue caused
by
// spark.sql.<format>.enableVectorizedReader=true should be fixed in
another way.
- // Before the issue was fixed we force the use of vanilla row reader by
using
+ // Before the issue is fixed we force the use of vanilla row reader by
using
// the following statement.
conf.set("spark.sql.parquet.enableVectorizedReader", "false")
conf.set("spark.sql.orc.enableVectorizedReader", "false")
@@ -262,15 +266,13 @@ private[gluten] class GlutenExecutorPlugin extends
ExecutorPlugin {
override def init(ctx: PluginContext, extraConf: util.Map[String, String]):
Unit = {
val conf = ctx.conf()
- // Initialize Backends API
- // TODO categorize the APIs by driver's or executor's
- BackendsApiManager.initialize()
- BackendsApiManager.getListenerApiInstance.onExecutorStart(ctx)
+ // Initialize Backends API.
+ Backend.get().onExecutorStart(ctx)
}
/** Clean up and terminate this plugin. For example: close the native
engine. */
override def shutdown(): Unit = {
- BackendsApiManager.getListenerApiInstance.onExecutorShutdown()
+ Backend.get().onExecutorShutdown()
super.shutdown()
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
b/gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala
similarity index 52%
copy from
gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
copy to gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala
index e4f5cbdc9..5f82a2ee7 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala
@@ -14,18 +14,47 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.backendsapi
+package org.apache.gluten.backend
+
+import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionFunc}
+import org.apache.gluten.extension.injector.RuleInjector
+
+import org.apache.spark.SparkContext
+import org.apache.spark.api.plugin.PluginContext
import java.util.ServiceLoader
import scala.collection.JavaConverters
-object BackendsApiManager {
+trait Backend {
+ import Backend._
+
+ /** Base information. */
+ def name(): String
+ def buildInfo(): BuildInfo
+
+ /** Spark listeners. */
+ def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {}
+ def onDriverShutdown(): Unit = {}
+ def onExecutorStart(pc: PluginContext): Unit = {}
+ def onExecutorShutdown(): Unit = {}
- private lazy val backend: Backend = initializeInternal()
+ /** The columnar-batch type this backend is using. */
+ def batchType: Convention.BatchType
- /** Initialize all backends api. */
- private def initializeInternal(): Backend = {
+ /**
+ * Overrides
[[org.apache.gluten.extension.columnar.transition.ConventionFunc]] Gluten is
using to
+ * determine the convention (its row-based processing / columnar-batch
processing support) of a
+ * plan with a user-defined function that accepts a plan then returns batch
type it outputs.
+ */
+ def batchTypeFunc(): ConventionFunc.BatchOverride = PartialFunction.empty
+
+ /** Query planner rules. */
+ def injectRules(injector: RuleInjector): Unit
+}
+
+object Backend {
+ private val be: Backend = {
val discoveredBackends =
JavaConverters.iterableAsScalaIterable(ServiceLoader.load(classOf[Backend])).toSeq
if (discoveredBackends.isEmpty) {
@@ -40,54 +69,9 @@ object BackendsApiManager {
backend
}
- /**
- * Automatically detect the backend api.
- * @return
- */
- def initialize(): String = {
- getBackendName
- }
-
- // Note: Do not make direct if-else checks based on output of the method.
- // Any form of backend-specific code should be avoided from appearing in
common module
- // (e.g. gluten-substrait, gluten-data)
- def getBackendName: String = {
- backend.name()
- }
-
- def getBuildInfo: BackendBuildInfo = {
- backend.buildInfo()
+ def get(): Backend = {
+ be
}
- def getListenerApiInstance: ListenerApi = {
- backend.listenerApi()
- }
-
- def getIteratorApiInstance: IteratorApi = {
- backend.iteratorApi()
- }
-
- def getSparkPlanExecApiInstance: SparkPlanExecApi = {
- backend.sparkPlanExecApi()
- }
-
- def getTransformerApiInstance: TransformerApi = {
- backend.transformerApi()
- }
-
- def getValidatorApiInstance: ValidatorApi = {
- backend.validatorApi()
- }
-
- def getMetricsApiInstance: MetricsApi = {
- backend.metricsApi()
- }
-
- def getRuleApiInstance: RuleApi = {
- backend.ruleApi()
- }
-
- def getSettings: BackendSettingsApi = {
- backend.settings
- }
+ case class BuildInfo(name: String, branch: String, revision: String,
revisionTime: String)
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala
similarity index 96%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala
rename to
gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala
index c5a9afec3..ce6d50350 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala
@@ -16,9 +16,9 @@
*/
package org.apache.gluten.extension
-import org.apache.gluten.extension.columnar._
+import org.apache.gluten.extension.columnar.ColumnarRuleApplier
import org.apache.gluten.extension.columnar.transition.Transitions
-import org.apache.gluten.utils.LogLevelUtil
+import org.apache.gluten.logging.LogLevelUtil
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.vectorized.ColumnarBatch
-object ColumnarOverrideRules {
+object GlutenColumnarRule {
// Utilities to infer columnar rule's caller's property:
// ApplyColumnarRulesAndInsertTransitions#outputsColumnar.
@@ -92,14 +92,14 @@ object ColumnarOverrideRules {
}
}
-case class ColumnarOverrideRules(
+case class GlutenColumnarRule(
session: SparkSession,
applierBuilder: SparkSession => ColumnarRuleApplier)
extends ColumnarRule
with Logging
with LogLevelUtil {
- import ColumnarOverrideRules._
+ import GlutenColumnarRule._
/**
* Note: Do not implement this API. We basically inject all of Gluten's
physical rules through
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
similarity index 92%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
rename to
gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
index 4456dda61..42c9cc94b 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.extension
-import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.backend.Backend
import org.apache.gluten.extension.injector.RuleInjector
import org.apache.spark.sql.SparkSessionExtensions
@@ -27,7 +27,7 @@ import java.util.Objects
private[gluten] class GlutenSessionExtensions extends (SparkSessionExtensions
=> Unit) {
override def apply(exts: SparkSessionExtensions): Unit = {
val injector = new RuleInjector()
- BackendsApiManager.getRuleApiInstance.injectRules(injector)
+ Backend.get().injectRules(injector)
injector.inject(exts)
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
similarity index 90%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
rename to
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
index 9b78ccd11..b47b2f338 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
@@ -17,9 +17,9 @@
package org.apache.gluten.extension.columnar
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.extension.columnar.util.AdaptiveContext
+import org.apache.gluten.extension.util.AdaptiveContext
+import org.apache.gluten.logging.LogLevelUtil
import org.apache.gluten.metrics.GlutenTimeMetric
-import org.apache.gluten.utils.LogLevelUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
@@ -74,6 +74,11 @@ object ColumnarRuleApplier {
logOnLevel(transformPlanLogLevel, message(plan, out, millisTime))
out
}
+ }
+ // A temporary workaround for applying toggle `spark.gluten.enabled`, to be
removed.
+ trait SkipCondition {
+ // True if the rule execution should be skipped.
+ def skip(session: SparkSession, plan: SparkPlan): Boolean
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
similarity index 82%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
rename to
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
index 6ce4e24ed..de6ac2a7d 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
@@ -17,9 +17,9 @@
package org.apache.gluten.extension.columnar.enumerated
import org.apache.gluten.extension.columnar._
-import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.{ColumnarRuleBuilder,
ColumnarRuleCall}
-import org.apache.gluten.extension.columnar.util.AdaptiveContext
-import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector}
+import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.{ColumnarRuleBuilder,
ColumnarRuleCall, SkipCondition}
+import org.apache.gluten.extension.util.AdaptiveContext
+import org.apache.gluten.logging.LogLevelUtil
import org.apache.spark.annotation.Experimental
import org.apache.spark.internal.Logging
@@ -36,20 +36,24 @@ import org.apache.spark.sql.execution.SparkPlan
* implementing them in EnumeratedTransform.
*/
@Experimental
-class EnumeratedApplier(session: SparkSession, ruleBuilders:
Seq[ColumnarRuleBuilder])
+class EnumeratedApplier(
+ session: SparkSession,
+ skipConditions: Seq[SkipCondition],
+ ruleBuilders: Seq[ColumnarRuleBuilder])
extends ColumnarRuleApplier
with Logging
with LogLevelUtil {
private val adaptiveContext = AdaptiveContext(session)
override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
+ if (skipConditions.exists(_.skip(session, plan))) {
+ return plan
+ }
val call = new ColumnarRuleCall(session, adaptiveContext, outputsColumnar)
- PhysicalPlanSelector.maybe(session, plan) {
- val finalPlan = maybeAqe {
- apply0(ruleBuilders.map(b => b(call)), plan)
- }
- finalPlan
+ val finalPlan = maybeAqe {
+ apply0(ruleBuilders.map(b => b(call)), plan)
}
+ finalPlan
}
private def apply0(rules: Seq[Rule[SparkPlan]], plan: SparkPlan): SparkPlan
= {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/FallbackNode.scala
similarity index 58%
copy from
gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala
copy to
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/FallbackNode.scala
index bccbd38b2..89409e5ad 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/FallbackNode.scala
@@ -14,19 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.extension.injector
+package org.apache.gluten.extension.columnar.heuristic
-import org.apache.spark.sql.SparkSessionExtensions
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
-/** Injector used to inject query planner rules into Spark and Gluten. */
-class RuleInjector {
- val spark: SparkInjector = new SparkInjector()
- val gluten: GlutenInjector = new GlutenInjector()
-
- private[extension] def inject(extensions: SparkSessionExtensions): Unit = {
- spark.inject(extensions)
- gluten.inject(extensions)
- }
+/** A wrapper to specify the plan is fallback plan, the caller side should
unwrap it. */
+case class FallbackNode(fallbackPlan: SparkPlan) extends LeafExecNode {
+ override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
+ override def output: Seq[Attribute] = fallbackPlan.output
}
-
-object RuleInjector {}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
similarity index 80%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
rename to
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
index 85f44878f..8e612c6ae 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
@@ -17,9 +17,9 @@
package org.apache.gluten.extension.columnar.heuristic
import org.apache.gluten.extension.columnar._
-import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.{ColumnarRuleBuilder,
ColumnarRuleCall}
-import org.apache.gluten.extension.columnar.util.AdaptiveContext
-import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector}
+import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.{ColumnarRuleBuilder,
ColumnarRuleCall, SkipCondition}
+import org.apache.gluten.extension.util.AdaptiveContext
+import org.apache.gluten.logging.LogLevelUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
@@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.SparkPlan
*/
class HeuristicApplier(
session: SparkSession,
+ skipConditions: Seq[SkipCondition],
transformBuilders: Seq[ColumnarRuleBuilder],
fallbackPolicyBuilders: Seq[ColumnarRuleBuilder],
postBuilders: Seq[ColumnarRuleBuilder],
@@ -42,28 +43,30 @@ class HeuristicApplier(
private val adaptiveContext = AdaptiveContext(session)
override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
+ if (skipConditions.exists(_.skip(session, plan))) {
+ return plan
+ }
val call = new ColumnarRuleCall(session, adaptiveContext, outputsColumnar)
makeRule(call).apply(plan)
}
- private def makeRule(call: ColumnarRuleCall): Rule[SparkPlan] =
+ private def makeRule(call: ColumnarRuleCall): Rule[SparkPlan] = {
plan =>
- PhysicalPlanSelector.maybe(session, plan) {
- val finalPlan = prepareFallback(plan) {
- p =>
- val suggestedPlan = transformPlan("transform",
transformRules(call), p)
- transformPlan("fallback", fallbackPolicies(call), suggestedPlan)
match {
- case FallbackNode(fallbackPlan) =>
- // we should use vanilla c2r rather than native c2r,
- // and there should be no `GlutenPlan` any more,
- // so skip the `postRules()`.
- fallbackPlan
- case plan =>
- transformPlan("post", postRules(call), plan)
- }
- }
- transformPlan("final", finalRules(call), finalPlan)
+ val finalPlan = prepareFallback(plan) {
+ p =>
+ val suggestedPlan = transformPlan("transform", transformRules(call),
p)
+ transformPlan("fallback", fallbackPolicies(call), suggestedPlan)
match {
+ case FallbackNode(fallbackPlan) =>
+ // we should use vanilla c2r rather than native c2r,
+ // and there should be no `GlutenPlan` any more,
+ // so skip the `postRules()`.
+ fallbackPlan
+ case plan =>
+ transformPlan("post", postRules(call), plan)
+ }
}
+ transformPlan("final", finalRules(call), finalPlan)
+ }
private def transformPlan(
phase: String,
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
similarity index 100%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
rename to
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
similarity index 97%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
rename to
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
index beb809474..21662f503 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.extension.columnar.transition
-import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.backend.Backend
import
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
import org.apache.gluten.sql.shims.SparkShimLoader
@@ -60,7 +60,7 @@ object ConventionFunc {
return PartialFunction.empty
}
}
- BackendsApiManager.getSparkPlanExecApiInstance.batchTypeFunc()
+ Backend.get().batchTypeFunc()
}
private class BuiltinFunc(o: BatchOverride) extends ConventionFunc {
@@ -86,7 +86,7 @@ object ConventionFunc {
val batchType = if (a.supportsColumnar) {
// By default, we execute columnar AQE with backend batch output.
// See
org.apache.gluten.extension.columnar.transition.InsertTransitions.apply
- BackendsApiManager.getSparkPlanExecApiInstance.batchType
+ Backend.get().batchType
} else {
Convention.BatchType.None
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
similarity index 94%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
rename to
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
index 65422b380..cb76ec4de 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.extension.columnar.transition
-import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.backend.Backend
import org.apache.spark.sql.execution.SparkPlan
@@ -58,7 +58,7 @@ object ConventionReq {
val vanillaBatch: ConventionReq =
Impl(RowType.Any, BatchType.Is(Convention.BatchType.VanillaBatch))
lazy val backendBatch: ConventionReq =
- Impl(RowType.Any,
BatchType.Is(BackendsApiManager.getSparkPlanExecApiInstance.batchType))
+ Impl(RowType.Any, BatchType.Is(Backend.get().batchType))
def get(plan: SparkPlan): ConventionReq =
ConventionFunc.create().conventionReqOf(plan)
def of(rowType: RowType, batchType: BatchType): ConventionReq =
Impl(rowType, batchType)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
similarity index 100%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
rename to
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
similarity index 96%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
rename to
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
index 602f0303c..3ba09efef 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.extension.columnar.transition
-import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.backend.Backend
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
@@ -89,7 +89,7 @@ object Transitions {
}
def toBackendBatchPlan(plan: SparkPlan): SparkPlan = {
- val backendBatchType =
BackendsApiManager.getSparkPlanExecApiInstance.batchType
+ val backendBatchType = Backend.get().batchType
val out = toBatchPlan(plan, backendBatchType)
out
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
similarity index 100%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
rename to
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
similarity index 77%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
rename to
gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
index 728e569cc..ca76e61b7 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
@@ -17,9 +17,9 @@
package org.apache.gluten.extension.injector
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.extension.ColumnarOverrideRules
+import org.apache.gluten.extension.GlutenColumnarRule
import org.apache.gluten.extension.columnar.ColumnarRuleApplier
-import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuilder
+import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.{ColumnarRuleBuilder,
SkipCondition}
import org.apache.gluten.extension.columnar.enumerated.EnumeratedApplier
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
@@ -30,20 +30,25 @@ import scala.collection.mutable
/** Injector used to inject query planner rules into Gluten. */
class GlutenInjector private[injector] {
import GlutenInjector._
+ private val skipConditions: mutable.ListBuffer[SkipCondition] =
mutable.ListBuffer()
val legacy: LegacyInjector = new LegacyInjector()
val ras: RasInjector = new RasInjector()
private[injector] def inject(extensions: SparkSessionExtensions): Unit = {
- val ruleBuilder = (session: SparkSession) => new
ColumnarOverrideRules(session, applier)
+ val ruleBuilder = (session: SparkSession) => new
GlutenColumnarRule(session, applier)
extensions.injectColumnar(session => ruleBuilder(session))
}
private def applier(session: SparkSession): ColumnarRuleApplier = {
val conf = new GlutenConfig(session.sessionState.conf)
if (conf.enableRas) {
- return ras.createApplier(session)
+ return ras.createApplier(session, skipConditions.toSeq)
}
- legacy.createApplier(session)
+ legacy.createApplier(session, skipConditions.toSeq)
+ }
+
+ def skipOn(skipCondition: SkipCondition): Unit = {
+ skipConditions += skipCondition
}
}
@@ -70,9 +75,12 @@ object GlutenInjector {
finalBuilders += builder
}
- private[injector] def createApplier(session: SparkSession):
ColumnarRuleApplier = {
+ private[injector] def createApplier(
+ session: SparkSession,
+ skipConditions: Seq[SkipCondition]): ColumnarRuleApplier = {
new HeuristicApplier(
session,
+ skipConditions,
transformBuilders.toSeq,
fallbackPolicyBuilders.toSeq,
postBuilders.toSeq,
@@ -87,8 +95,10 @@ object GlutenInjector {
ruleBuilders += builder
}
- private[injector] def createApplier(session: SparkSession):
ColumnarRuleApplier = {
- new EnumeratedApplier(session, ruleBuilders.toSeq)
+ private[injector] def createApplier(
+ session: SparkSession,
+ skipConditions: Seq[SkipCondition]): ColumnarRuleApplier = {
+ new EnumeratedApplier(session, skipConditions, ruleBuilders.toSeq)
}
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala
similarity index 100%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala
rename to
gluten-core/src/main/scala/org/apache/gluten/extension/injector/RuleInjector.scala
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
similarity index 100%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
rename to
gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/util/AdaptiveContext.scala
similarity index 98%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala
rename to
gluten-core/src/main/scala/org/apache/gluten/extension/util/AdaptiveContext.scala
index de72bc4bc..b0f42e796 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/util/AdaptiveContext.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.extension.columnar.util
+package org.apache.gluten.extension.util
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.SparkPlan
diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/gluten.scala
b/gluten-core/src/main/scala/org/apache/gluten/gluten.scala
similarity index 100%
rename from gluten-substrait/src/main/scala/org/apache/gluten/gluten.scala
rename to gluten-core/src/main/scala/org/apache/gluten/gluten.scala
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/LogLevelUtil.scala
b/gluten-core/src/main/scala/org/apache/gluten/logging/LogLevelUtil.scala
similarity index 97%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/utils/LogLevelUtil.scala
rename to
gluten-core/src/main/scala/org/apache/gluten/logging/LogLevelUtil.scala
index cbd52b2d0..07bbde2ba 100644
--- a/gluten-substrait/src/main/scala/org/apache/gluten/utils/LogLevelUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/logging/LogLevelUtil.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.utils
+package org.apache.gluten.logging
import org.apache.spark.internal.Logging
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala
b/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala
similarity index 100%
rename from
gluten-substrait/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala
rename to
gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala
diff --git a/gluten-substrait/pom.xml b/gluten-substrait/pom.xml
index 77bb9f3c3..a863b7957 100644
--- a/gluten-substrait/pom.xml
+++ b/gluten-substrait/pom.xml
@@ -32,11 +32,6 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.gluten</groupId>
- <artifactId>gluten-ui</artifactId>
- <version>${project.version}</version>
- </dependency>
<!-- Prevent our dummy JAR from being included in Spark distributions or
uploaded to YARN -->
<dependency>
<groupId>org.apache.spark</groupId>
diff --git
a/gluten-substrait/src/main/java/org/apache/gluten/test/TestStats.java
b/gluten-substrait/src/main/java/org/apache/gluten/test/TestStats.java
index 598ab556f..37d9e09bb 100644
--- a/gluten-substrait/src/main/java/org/apache/gluten/test/TestStats.java
+++ b/gluten-substrait/src/main/java/org/apache/gluten/test/TestStats.java
@@ -16,6 +16,8 @@
*/
package org.apache.gluten.test;
+import org.apache.gluten.GlutenConfig;
+
import java.util.*;
/** Only use in UT Env. It's not thread safe. */
@@ -24,7 +26,6 @@ public class TestStats {
private static final String ROW_FORMAT =
"<tr><td>%s</td><td>%s</td><td>%s</td><td>%s</td><td>%s</td><td>%s</td></tr>";
- private static boolean UT_ENV = false;
private static final Map<String, CaseInfo> caseInfos = new HashMap<>();
private static String currentCase;
public static int offloadGlutenUnitNumber = 0;
@@ -35,8 +36,8 @@ public class TestStats {
public static int suiteTestNumber = 0;
public static int offloadGlutenTestNumber = 0;
- public static void beginStatistic() {
- UT_ENV = true;
+ private static boolean enabled() {
+ return GlutenConfig.getConf().collectUtStats();
}
public static void reset() {
@@ -56,7 +57,7 @@ public class TestStats {
public static int totalOffloadGlutenCaseNumber = 0;
public static void printMarkdown(String suitName) {
- if (!UT_ENV) {
+ if (!enabled()) {
return;
}
@@ -105,7 +106,7 @@ public class TestStats {
}
public static void addFallBackClassName(String className) {
- if (!UT_ENV) {
+ if (!enabled()) {
return;
}
@@ -117,7 +118,7 @@ public class TestStats {
}
public static void addFallBackCase() {
- if (!UT_ENV) {
+ if (!enabled()) {
return;
}
@@ -127,7 +128,7 @@ public class TestStats {
}
public static void addExpressionClassName(String className) {
- if (!UT_ENV) {
+ if (!enabled()) {
return;
}
@@ -138,7 +139,7 @@ public class TestStats {
}
public static Set<String> getFallBackClassName() {
- if (!UT_ENV) {
+ if (!enabled()) {
return Collections.emptySet();
}
@@ -150,7 +151,7 @@ public class TestStats {
}
public static void addIgnoreCaseName(String caseName) {
- if (!UT_ENV) {
+ if (!enabled()) {
return;
}
@@ -160,7 +161,7 @@ public class TestStats {
}
public static void resetCase() {
- if (!UT_ENV) {
+ if (!enabled()) {
return;
}
@@ -171,7 +172,7 @@ public class TestStats {
}
public static void startCase(String caseName) {
- if (!UT_ENV) {
+ if (!enabled()) {
return;
}
@@ -180,7 +181,7 @@ public class TestStats {
}
public static void endCase(boolean status) {
- if (!UT_ENV) {
+ if (!enabled()) {
return;
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 451cb2fd2..0aa5d6ae8 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -20,7 +20,6 @@ import org.apache.gluten.GlutenConfig
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
-import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}
import org.apache.spark.sql.catalyst.plans._
@@ -72,11 +71,6 @@ trait BackendSettingsApi {
// Whether to fallback aggregate at the same time if its empty-output child
is fallen back.
def fallbackAggregateWithEmptyOutputChild(): Boolean = false
- def disableVanillaColumnarReaders(conf: SparkConf): Boolean =
- !conf.getBoolean(
- GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED.key,
- GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED.defaultValue.get)
-
def recreateJoinExecOnFallback(): Boolean = false
/**
@@ -141,7 +135,5 @@ trait BackendSettingsApi {
def supportColumnarArrowUdf(): Boolean = false
- def generateHdfsConfForLibhdfs(): Boolean = false
-
def needPreComputeRangeFrameBoundary(): Boolean = false
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
index e4f5cbdc9..942058cc5 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
@@ -16,28 +16,14 @@
*/
package org.apache.gluten.backendsapi
-import java.util.ServiceLoader
-
-import scala.collection.JavaConverters
+import org.apache.gluten.backend.Backend
object BackendsApiManager {
-
- private lazy val backend: Backend = initializeInternal()
+ private lazy val backend: SubstraitBackend = initializeInternal()
/** Initialize all backends api. */
- private def initializeInternal(): Backend = {
- val discoveredBackends =
-
JavaConverters.iterableAsScalaIterable(ServiceLoader.load(classOf[Backend])).toSeq
- if (discoveredBackends.isEmpty) {
- throw new IllegalStateException("Backend implementation not discovered
from JVM classpath")
- }
- if (discoveredBackends.size != 1) {
- throw new IllegalStateException(
- s"More than one Backend implementation discovered from JVM classpath:
" +
- s"${discoveredBackends.map(_.name()).toList}")
- }
- val backend = discoveredBackends.head
- backend
+ private def initializeInternal(): SubstraitBackend = {
+ Backend.get().asInstanceOf[SubstraitBackend]
}
/**
@@ -55,10 +41,6 @@ object BackendsApiManager {
backend.name()
}
- def getBuildInfo: BackendBuildInfo = {
- backend.buildInfo()
- }
-
def getListenerApiInstance: ListenerApi = {
backend.listenerApi()
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index dd4150806..b82730e72 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -19,7 +19,6 @@ package org.apache.gluten.backendsapi
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution._
import org.apache.gluten.expression._
-import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionFunc}
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.expression.{ExpressionBuilder,
ExpressionNode, WindowFunctionNode}
@@ -53,16 +52,6 @@ import scala.collection.JavaConverters._
trait SparkPlanExecApi {
- /** The columnar-batch type this backend is using. */
- def batchType: Convention.BatchType
-
- /**
- * Overrides
[[org.apache.gluten.extension.columnar.transition.ConventionFunc]] Gluten is
using to
- * determine the convention (its row-based processing / columnar-batch
processing support) of a
- * plan with a user-defined function that accepts a plan then returns batch
type it outputs.
- */
- def batchTypeFunc(): ConventionFunc.BatchOverride = PartialFunction.empty
-
/**
* Generate FilterExecTransformer.
*
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/Backend.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala
similarity index 59%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/Backend.scala
rename to
gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala
index 3a5975522..d7785663d 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/Backend.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala
@@ -16,30 +16,34 @@
*/
package org.apache.gluten.backendsapi
-trait Backend {
- def name(): String
-
- def buildInfo(): BackendBuildInfo
-
+import org.apache.gluten.backend.Backend
+import org.apache.gluten.extension.injector.RuleInjector
+
+import org.apache.spark.SparkContext
+import org.apache.spark.api.plugin.PluginContext
+
+trait SubstraitBackend extends Backend {
+ final override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit
= {
+ listenerApi().onDriverStart(sc, pc)
+ }
+ final override def onDriverShutdown(): Unit = {
+ listenerApi().onDriverShutdown()
+ }
+ final override def onExecutorStart(pc: PluginContext): Unit = {
+ listenerApi().onExecutorStart(pc)
+ }
+ final override def onExecutorShutdown(): Unit = {
+ listenerApi().onExecutorShutdown()
+ }
+ final override def injectRules(injector: RuleInjector): Unit = {
+ ruleApi().injectRules(injector)
+ }
def iteratorApi(): IteratorApi
-
def sparkPlanExecApi(): SparkPlanExecApi
-
def transformerApi(): TransformerApi
-
def validatorApi(): ValidatorApi
-
def metricsApi(): MetricsApi
-
def listenerApi(): ListenerApi
-
def ruleApi(): RuleApi
-
def settings(): BackendSettingsApi
}
-
-case class BackendBuildInfo(
- backend: String,
- backendBranch: String,
- backendRevision: String,
- backendRevisionTime: String)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
index 0c70e1ea7..14d038b15 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
@@ -17,15 +17,16 @@
package org.apache.gluten.extension
import org.apache.gluten.GlutenConfig
+import org.apache.gluten.backend.Backend
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.expression.TransformerState
import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.logging.LogLevelUtil
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.plan.PlanBuilder
import org.apache.gluten.substrait.rel.RelNode
import org.apache.gluten.test.TestStats
-import org.apache.gluten.utils.LogLevelUtil
import org.apache.spark.sql.execution.SparkPlan
@@ -108,7 +109,7 @@ trait GlutenPlan extends SparkPlan with
Convention.KnownBatchType with LogLevelU
}
protected def batchType0(): Convention.BatchType = {
- BackendsApiManager.getSparkPlanExecApiInstance.batchType
+ Backend.get().batchType
}
protected def doValidateInternal(): ValidationResult =
ValidationResult.succeeded
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala
index 29e1caae7..9c0ddac16 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala
@@ -18,12 +18,10 @@ package org.apache.gluten.extension.columnar
import org.apache.gluten.GlutenConfig
import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.columnar.heuristic.FallbackNode
import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike,
RowToColumnarLike, Transitions}
import org.apache.gluten.utils.PlanUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
QueryStageExec}
@@ -275,9 +273,3 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean,
originalPlan: SparkP
def DO_NOT_FALLBACK(): FallbackInfo = FallbackInfo()
}
}
-
-/** A wrapper to specify the plan is fallback plan, the caller side should
unwrap it. */
-case class FallbackNode(fallbackPlan: SparkPlan) extends LeafExecNode {
- override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
- override def output: Seq[Attribute] = fallbackPlan.output
-}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
index 3bfa611ed..69b74f3a6 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
@@ -17,7 +17,8 @@
package org.apache.gluten.extension.columnar
import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike,
Transitions}
-import org.apache.gluten.utils.{LogLevelUtil, PlanUtil}
+import org.apache.gluten.logging.LogLevelUtil
+import org.apache.gluten.utils.PlanUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Expression
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
index 9ca60177c..6f1b1fa3b 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
@@ -21,8 +21,9 @@ import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution._
import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.logging.LogLevelUtil
import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.gluten.utils.{LogLevelUtil, PlanUtil}
+import org.apache.gluten.utils.PlanUtil
import org.apache.spark.api.python.EvalPythonExecTransformer
import org.apache.spark.internal.Logging
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
index 007f18fca..3737dd4af 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
@@ -19,12 +19,12 @@ package org.apache.gluten.extension.columnar.enumerated
import org.apache.gluten.extension.columnar.{OffloadExchange, OffloadJoin,
OffloadOthers}
import org.apache.gluten.extension.columnar.transition.ConventionReq
import org.apache.gluten.extension.columnar.validator.{Validator, Validators}
+import org.apache.gluten.logging.LogLevelUtil
import org.apache.gluten.planner.GlutenOptimization
import org.apache.gluten.planner.cost.GlutenCostModel
import org.apache.gluten.planner.property.Conv
import org.apache.gluten.ras.property.PropertySet
import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.gluten.utils.LogLevelUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
index fe7eb5566..e72677ebf 100644
--- a/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
+++ b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.utils
-import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.backend.Backend
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.spark.sql.execution._
@@ -25,7 +25,7 @@ import
org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
object PlanUtil {
private def isGlutenTableCacheInternal(i: InMemoryTableScanExec): Boolean = {
- Convention.get(i).batchType ==
BackendsApiManager.getSparkPlanExecApiInstance.batchType
+ Convention.get(i).batchType == Backend.get().batchType
}
def isGlutenTableCache(plan: SparkPlan): Boolean = {
@@ -44,6 +44,6 @@ object PlanUtil {
}
def isGlutenColumnarOp(plan: SparkPlan): Boolean = {
- Convention.get(plan).batchType ==
BackendsApiManager.getSparkPlanExecApiInstance.batchType
+ Convention.get(plan).batchType == Backend.get().batchType
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala
index 2cb077d50..cd063ce31 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.utils
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.extension.columnar.ColumnarRuleApplier.SkipCondition
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
@@ -26,6 +27,9 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
object PhysicalPlanSelector extends QueryPlanSelector[SparkPlan] {
+ val skipCond: SkipCondition = (session: SparkSession, plan: SparkPlan) =>
+ !shouldUseGluten(session, plan)
+
override protected def validate(plan: SparkPlan): Boolean = {
BackendsApiManager.getValidatorApiInstance.doSparkPlanValidate(plan)
}
@@ -55,7 +59,7 @@ abstract class QueryPlanSelector[T <: QueryPlan[_]] extends
Logging {
protected def validate(plan: T): Boolean
- private[this] def shouldUseGluten(session: SparkSession, plan: T): Boolean =
{
+ def shouldUseGluten(session: SparkSession, plan: T): Boolean = {
val glutenEnabled = session.conf
.get(GlutenConfig.GLUTEN_ENABLE_KEY,
GlutenConfig.GLUTEN_ENABLE_BY_DEFAULT.toString)
.toBoolean && isGlutenEnabledForCurrentThread(session)
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/listener/GlutenListenerFactory.scala
b/gluten-substrait/src/main/scala/org/apache/spark/listener/GlutenListenerFactory.scala
deleted file mode 100644
index 721711af5..000000000
---
a/gluten-substrait/src/main/scala/org/apache/spark/listener/GlutenListenerFactory.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.listener
-
-import org.apache.gluten.GlutenConfig
-import org.apache.gluten.softaffinity.scheduler.SoftAffinityListener
-
-import org.apache.spark.SparkContext
-
-object GlutenListenerFactory {
- def addToSparkListenerBus(sc: SparkContext): Unit = {
- if (
- sc.getConf.getBoolean(
- GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED,
- GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED_DEFAULT_VALUE
- )
- ) {
- sc.listenerBus.addToStatusQueue(new SoftAffinityListener())
- }
- }
-}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
index e5925e3ac..c57df192c 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution
import org.apache.gluten.GlutenConfig
+import org.apache.gluten.backend.Backend
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution._
import org.apache.gluten.extension.columnar.transition.Convention
@@ -165,7 +166,7 @@ case class ColumnarInputAdapter(child: SparkPlan)
override def output: Seq[Attribute] = child.output
override def supportsColumnar: Boolean = true
override def batchType(): Convention.BatchType =
- BackendsApiManager.getSparkPlanExecApiInstance.batchType
+ Backend.get().batchType
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
child.executeColumnar()
override def outputPartitioning: Partitioning = child.outputPartitioning
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
index 67ecf81b9..f6e23e7cf 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
@@ -20,7 +20,7 @@ import org.apache.gluten.GlutenConfig
import org.apache.gluten.events.GlutenPlanFallbackEvent
import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.extension.columnar.FallbackTags
-import org.apache.gluten.utils.LogLevelUtil
+import org.apache.gluten.logging.LogLevelUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveUDAFInspector.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveUDAFInspector.scala
similarity index 100%
rename from
gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveUDAFInspector.scala
rename to
gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveUDAFInspector.scala
diff --git
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
index ea3e50e81..9337317d9 100644
---
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
+++
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.softaffinity
import org.apache.gluten.GlutenConfig
import org.apache.gluten.execution.GlutenPartition
import org.apache.gluten.softaffinity.SoftAffinityManager
-import org.apache.gluten.softaffinity.scheduler.SoftAffinityListener
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.plan.PlanBuilder
diff --git
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
index 55f25309d..c91df0b6d 100644
---
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
+++
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
@@ -19,12 +19,11 @@ package org.apache.spark.softaffinity
import org.apache.gluten.GlutenConfig
import org.apache.gluten.execution.GlutenPartition
import org.apache.gluten.softaffinity.{AffinityManager, SoftAffinityManager}
-import org.apache.gluten.softaffinity.scheduler.SoftAffinityListener
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.plan.PlanBuilder
import org.apache.spark.SparkConf
-import org.apache.spark.scheduler.{SparkListenerExecutorAdded,
SparkListenerExecutorRemoved, SparkListenerStageCompleted,
SparkListenerStageSubmitted, SparkListenerTaskEnd, StageInfo, TaskInfo,
TaskLocality}
+import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.InternalRow
diff --git
a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenEventUtils.scala
b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenEventUtils.scala
index 2d61f0c4b..245c678fe 100644
---
a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenEventUtils.scala
+++
b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenEventUtils.scala
@@ -26,12 +26,6 @@ object GlutenEventUtils {
sc.listenerBus.post(event)
}
- def registerListener(sc: SparkContext): Unit = {
- val kvStore = sc.statusStore.store.asInstanceOf[ElementTrackingStore]
- val listener = new GlutenSQLAppStatusListener(sc.conf, kvStore)
- sc.listenerBus.addToStatusQueue(listener)
- }
-
def attachUI(sc: SparkContext): Unit = {
val kvStore = sc.statusStore.store.asInstanceOf[ElementTrackingStore]
val statusStore = new GlutenSQLAppStatusStore(kvStore)
diff --git
a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala
b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala
index 7c236b4e8..b9a590f67 100644
---
a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala
+++
b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.ui
import org.apache.gluten.events.{GlutenBuildInfoEvent, GlutenPlanFallbackEvent}
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler._
import org.apache.spark.sql.internal.StaticSQLConf._
@@ -102,3 +102,11 @@ class GlutenSQLAppStatusListener(conf: SparkConf, kvstore:
ElementTrackingStore)
toDelete.foreach(e => kvstore.delete(e.getClass(), e.executionId))
}
}
+
+object GlutenSQLAppStatusListener {
+ def register(sc: SparkContext): Unit = {
+ val kvStore = sc.statusStore.store.asInstanceOf[ElementTrackingStore]
+ val listener = new GlutenSQLAppStatusListener(sc.conf, kvStore)
+ sc.listenerBus.addToStatusQueue(listener)
+ }
+}
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 5d171a36b..82d37b8ca 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -24,7 +24,7 @@ import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuil
import
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
import org.apache.gluten.extension.columnar.transition.InsertTransitions
-import org.apache.gluten.utils.QueryPlanSelector
+import org.apache.gluten.utils.{PhysicalPlanSelector, QueryPlanSelector}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -165,6 +165,7 @@ private object FallbackStrategiesSuite {
transformBuilders: Seq[ColumnarRuleBuilder]): HeuristicApplier = {
new HeuristicApplier(
spark,
+ Seq(PhysicalPlanSelector.skipCond),
transformBuilders,
List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(),
c.ac.originalPlan())),
List(
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index 2ca7429f1..b3b8483c3 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.extension
-import org.apache.gluten.extension.ColumnarOverrideRules
+import org.apache.gluten.extension.GlutenColumnarRule
import org.apache.gluten.utils.BackendTestUtils
import org.apache.spark.SparkConf
@@ -31,8 +31,7 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait
{
}
testGluten("test gluten extensions") {
- assert(
-
spark.sessionState.columnarRules.map(_.getClass).contains(classOf[ColumnarOverrideRules]))
+
assert(spark.sessionState.columnarRules.map(_.getClass).contains(classOf[GlutenColumnarRule]))
assert(spark.sessionState.planner.strategies.contains(MySparkStrategy(spark)))
assert(spark.sessionState.analyzer.extendedResolutionRules.contains(MyRule(spark)))
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index a4da5c127..866c16d52 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -24,7 +24,7 @@ import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuil
import
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
import org.apache.gluten.extension.columnar.transition.InsertTransitions
-import org.apache.gluten.utils.QueryPlanSelector
+import org.apache.gluten.utils.{PhysicalPlanSelector, QueryPlanSelector}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -176,6 +176,7 @@ private object FallbackStrategiesSuite {
transformBuilders: Seq[ColumnarRuleBuilder]): HeuristicApplier = {
new HeuristicApplier(
spark,
+ Seq(PhysicalPlanSelector.skipCond),
transformBuilders,
List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(),
c.ac.originalPlan())),
List(
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index 2ca7429f1..b3b8483c3 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.extension
-import org.apache.gluten.extension.ColumnarOverrideRules
+import org.apache.gluten.extension.GlutenColumnarRule
import org.apache.gluten.utils.BackendTestUtils
import org.apache.spark.SparkConf
@@ -31,8 +31,7 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait
{
}
testGluten("test gluten extensions") {
- assert(
-
spark.sessionState.columnarRules.map(_.getClass).contains(classOf[ColumnarOverrideRules]))
+
assert(spark.sessionState.columnarRules.map(_.getClass).contains(classOf[GlutenColumnarRule]))
assert(spark.sessionState.planner.strategies.contains(MySparkStrategy(spark)))
assert(spark.sessionState.analyzer.extendedResolutionRules.contains(MyRule(spark)))
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index a4da5c127..866c16d52 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -24,7 +24,7 @@ import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuil
import
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
import org.apache.gluten.extension.columnar.transition.InsertTransitions
-import org.apache.gluten.utils.QueryPlanSelector
+import org.apache.gluten.utils.{PhysicalPlanSelector, QueryPlanSelector}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -176,6 +176,7 @@ private object FallbackStrategiesSuite {
transformBuilders: Seq[ColumnarRuleBuilder]): HeuristicApplier = {
new HeuristicApplier(
spark,
+ Seq(PhysicalPlanSelector.skipCond),
transformBuilders,
List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(),
c.ac.originalPlan())),
List(
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index 2ca7429f1..b3b8483c3 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.extension
-import org.apache.gluten.extension.ColumnarOverrideRules
+import org.apache.gluten.extension.GlutenColumnarRule
import org.apache.gluten.utils.BackendTestUtils
import org.apache.spark.SparkConf
@@ -31,8 +31,7 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait
{
}
testGluten("test gluten extensions") {
- assert(
-
spark.sessionState.columnarRules.map(_.getClass).contains(classOf[ColumnarOverrideRules]))
+
assert(spark.sessionState.columnarRules.map(_.getClass).contains(classOf[GlutenColumnarRule]))
assert(spark.sessionState.planner.strategies.contains(MySparkStrategy(spark)))
assert(spark.sessionState.analyzer.extendedResolutionRules.contains(MyRule(spark)))
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index bbdeebfe6..6318c0e06 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -24,7 +24,7 @@ import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleBuil
import
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
import org.apache.gluten.extension.columnar.transition.InsertTransitions
-import org.apache.gluten.utils.QueryPlanSelector
+import org.apache.gluten.utils.{PhysicalPlanSelector, QueryPlanSelector}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -177,6 +177,7 @@ private object FallbackStrategiesSuite {
transformBuilders: Seq[ColumnarRuleBuilder]): HeuristicApplier = {
new HeuristicApplier(
spark,
+ Seq(PhysicalPlanSelector.skipCond),
transformBuilders,
List(c => ExpandFallbackPolicy(c.ac.isAdaptiveContext(),
c.ac.originalPlan())),
List(
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
index 2ca7429f1..b3b8483c3 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenSessionExtensionSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.extension
-import org.apache.gluten.extension.ColumnarOverrideRules
+import org.apache.gluten.extension.GlutenColumnarRule
import org.apache.gluten.utils.BackendTestUtils
import org.apache.spark.SparkConf
@@ -31,8 +31,7 @@ class GlutenSessionExtensionSuite extends GlutenSQLTestsTrait
{
}
testGluten("test gluten extensions") {
- assert(
-
spark.sessionState.columnarRules.map(_.getClass).contains(classOf[ColumnarOverrideRules]))
+
assert(spark.sessionState.columnarRules.map(_.getClass).contains(classOf[GlutenColumnarRule]))
assert(spark.sessionState.planner.strategies.contains(MySparkStrategy(spark)))
assert(spark.sessionState.analyzer.extendedResolutionRules.contains(MyRule(spark)))
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 927a39332..1d3c8e45d 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -406,6 +406,7 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def debug: Boolean = conf.getConf(DEBUG_ENABLED)
def debugKeepJniWorkspace: Boolean = conf.getConf(DEBUG_KEEP_JNI_WORKSPACE)
+ def collectUtStats: Boolean = conf.getConf(UT_STATISTIC)
def taskStageId: Int = conf.getConf(BENCHMARK_TASK_STAGEID)
def taskPartitionId: Int = conf.getConf(BENCHMARK_TASK_PARTITIONID)
def taskId: Long = conf.getConf(BENCHMARK_TASK_TASK_ID)
@@ -835,7 +836,7 @@ object GlutenConfig {
.createWithDefault(true)
val VANILLA_VECTORIZED_READERS_ENABLED =
- buildConf("spark.gluten.sql.columnar.enableVanillaVectorizedReaders")
+ buildStaticConf("spark.gluten.sql.columnar.enableVanillaVectorizedReaders")
.internal()
.doc("Enable or disable vanilla vectorized scan.")
.booleanConf
@@ -1622,6 +1623,12 @@ object GlutenConfig {
.stringConf
.createWithDefault("/tmp")
+ val UT_STATISTIC =
+ buildStaticConf("spark.gluten.sql.ut.statistic")
+ .internal()
+ .booleanConf
+ .createWithDefault(false)
+
val BENCHMARK_TASK_STAGEID =
buildConf("spark.gluten.sql.benchmark_task.stageId")
.internal()
@@ -1680,12 +1687,6 @@ object GlutenConfig {
.booleanConf
.createWithDefault(true)
- val UT_STATISTIC =
- buildConf("spark.gluten.sql.ut.statistic")
- .internal()
- .booleanConf
- .createWithDefault(false)
-
// FIXME: This only works with CH backend.
val EXTENDED_COLUMNAR_TRANSFORM_RULES =
buildConf("spark.gluten.sql.columnar.extended.columnar.transform.rules")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]