This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 232ce55443 [GLUTEN-6920][CORE][VL] New APIs and refactors to allow
different backends / components to be registered and used (#8143)
232ce55443 is described below
commit 232ce55443a7ea1daa1285a8a402e50f71334418
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Dec 6 16:29:58 2024 +0800
[GLUTEN-6920][CORE][VL] New APIs and refactors to allow different backends
/ components to be registered and used (#8143)
This is the major API layer change for #6920.
The PR introduces a new concept Component int Gluten and adjusts the role
of current concept Backend.
After the change, Backend API will become a special variant of Component
API. Details are as below:
Dependency
Component should have at least one parent. Each parent can either be
another Component or Backend.
Backend doesn't have any parents.
Component and Backend instances are used as exactly the same manner in
Gluten. Gluten treats them all as regular entities that inject custom code into
Gluten.
Having Component's parents defined, Gluten will figure out a fixed order /
priority of using the components. For exmaple:
A component's parent's loading APIs, namely onDriverStart /
onExecutorStart, will be guaranteed to call EARLIER than the same APIs of the
child.
A component's parent's rule injection API, namely injectRules, will be
guaranteed to call LATER than the same API of the child.
For legacy reason, only one SubstraitBackend will be allowed to register at
the same time. Which means, CH backend and Velox backend are still not allowed
to load at the same time. When they are both loaded, error will be thrown. See
code.
In subsequent PRs of #6920, we will:
Make iceberg / hudi / delta implement component API, remove the previous
rule injection points used by their code.
Make uniffle / celeborn implement component API, remove the previous rule
injection points by used their code.
Add an example backend that can work together with Velox backend.
Non-goals:
This doesn't tend to make Velox backend and CH backend work together.
---
.../vectorized/CHNativeExpressionEvaluator.java | 9 +-
.../gluten/backendsapi/clickhouse/CHBackend.scala | 6 +-
.../gluten/columnarbatch/VeloxColumnarBatches.java | 15 +-
.../gluten/metrics/IteratorMetricsJniWrapper.java | 4 +-
.../org/apache/gluten/utils/VeloxBatchResizer.java | 9 +-
.../org/apache/gluten/utils/VeloxBloomFilter.java | 4 +-
.../gluten/backendsapi/velox/VeloxBackend.scala | 7 +-
.../backendsapi/velox/VeloxIteratorApi.scala | 8 +-
.../backendsapi/velox/VeloxListenerApi.scala | 2 +-
.../backendsapi/velox/VeloxTransformerApi.scala | 6 +-
.../backendsapi/velox/VeloxValidatorApi.scala | 4 +-
.../gluten/datasource/VeloxDataSourceUtil.scala | 3 +-
.../execution/ColumnarPartialProjectExec.scala | 3 +-
.../gluten/execution/RowToVeloxColumnarExec.scala | 3 +-
.../gluten/execution/VeloxColumnarToRowExec.scala | 5 +-
.../vectorized/ColumnarBatchSerializer.scala | 6 +-
.../spark/shuffle/ColumnarShuffleWriter.scala | 5 +-
.../spark/sql/execution/BroadcastUtils.scala | 8 +-
.../sql/execution/ColumnarBuildSideRelation.scala | 9 +-
.../execution/ColumnarCachedBatchSerializer.scala | 17 +-
.../velox/VeloxFormatWriterInjects.scala | 10 +-
.../spark/sql/execution/utils/ExecUtil.scala | 6 +-
.../gluten/columnarbatch/ColumnarBatchTest.java | 8 +-
cpp/core/compute/Runtime.h | 56 +++--
cpp/core/jni/JniWrapper.cc | 131 ++++++++---
cpp/core/utils/Registry.h | 6 +-
cpp/velox/jni/VeloxJniWrapper.cc | 8 +-
.../columnarbatch/ColumnarBatchJniWrapper.java | 20 +-
.../gluten/columnarbatch/ColumnarBatches.java | 18 +-
.../gluten/columnarbatch/IndicatorVector.java | 2 +-
.../gluten/columnarbatch/IndicatorVectorBase.java | 11 +-
.../gluten/init/NativeBackendInitializer.java | 28 ++-
.../gluten/vectorized/ColumnarBatchInIterator.java | 6 +-
.../gluten/vectorized/NativePlanEvaluator.java | 14 +-
.../apache/gluten/memory/NativeMemoryManager.scala | 22 +-
.../scala/org/apache/gluten/runtime/Runtime.scala | 16 +-
.../scala/org/apache/gluten/runtime/Runtimes.scala | 9 +-
.../CHCelebornColumnarShuffleWriterFactory.scala | 4 +-
.../VeloxCelebornColumnarBatchSerializer.scala | 4 +-
.../VeloxCelebornColumnarShuffleWriter.scala | 8 +-
.../scala/org/apache/gluten/GlutenPlugin.scala | 26 ++-
.../scala/org/apache/gluten/backend/Backend.scala | 57 +----
.../org/apache/gluten/backend/Component.scala | 251 +++++++++++++++++++++
.../scala/org/apache/gluten/backend/package.scala | 47 ++++
.../gluten/extension/GlutenSessionExtensions.scala | 5 +-
.../columnar/enumerated/EnumeratedTransform.scala | 5 +-
.../columnar/heuristic/HeuristicTransform.scala | 5 +-
.../columnar/transition/ConventionFunc.scala | 21 +-
.../spark/shuffle/GlutenShuffleManager.scala | 4 +
.../spark/shuffle/ShuffleManagerRegistry.scala | 5 +-
.../org/apache/spark/util/SparkTestUtil.scala | 20 +-
.../org/apache/gluten/backend/ComponentSuite.scala | 100 ++++++++
.../gluten/backendsapi/BackendsApiManager.scala | 6 +-
.../writer/VeloxUniffleColumnarShuffleWriter.java | 6 +-
54 files changed, 803 insertions(+), 275 deletions(-)
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java
index 370f434a3c..2099ddbbf5 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java
@@ -17,7 +17,6 @@
package org.apache.gluten.vectorized;
import org.apache.gluten.GlutenConfig;
-import org.apache.gluten.backend.Backend;
import org.apache.gluten.backendsapi.BackendsApiManager;
import org.apache.gluten.execution.ColumnarNativeIterator;
import org.apache.gluten.memory.CHThreadGroup;
@@ -35,11 +34,12 @@ public class CHNativeExpressionEvaluator extends
ExpressionEvaluatorJniWrapper {
// Used to initialize the native computing.
public static void initNative(scala.collection.Map<String, String> conf) {
Map<String, String> nativeConfMap =
- GlutenConfig.getNativeBackendConf(Backend.get().name(), conf);
+ GlutenConfig.getNativeBackendConf(BackendsApiManager.getBackendName(),
conf);
// Get the customer config from SparkConf for each backend
BackendsApiManager.getTransformerApiInstance()
- .postProcessNativeConfig(nativeConfMap,
GlutenConfig.prefixOf(Backend.get().name()));
+ .postProcessNativeConfig(
+ nativeConfMap,
GlutenConfig.prefixOf(BackendsApiManager.getBackendName()));
nativeInitNative(ConfigUtil.serialize(nativeConfMap));
}
@@ -54,7 +54,8 @@ public class CHNativeExpressionEvaluator extends
ExpressionEvaluatorJniWrapper {
}
private static Map<String, String> getNativeBackendConf() {
- return GlutenConfig.getNativeBackendConf(Backend.get().name(),
SQLConf.get().getAllConfs());
+ return GlutenConfig.getNativeBackendConf(
+ BackendsApiManager.getBackendName(), SQLConf.get().getAllConfs());
}
// Used by WholeStageTransform to create the native computing pipeline and
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 823ed74700..83a92db518 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.backendsapi.clickhouse
import org.apache.gluten.GlutenBuildInfo._
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backend.Backend
+import org.apache.gluten.backend.Component.BuildInfo
import org.apache.gluten.backendsapi._
import org.apache.gluten.columnarbatch.CHBatch
import org.apache.gluten.execution.WriteFilesExecTransformer
@@ -49,8 +49,8 @@ import scala.util.control.Breaks.{break, breakable}
class CHBackend extends SubstraitBackend {
import CHBackend._
override def name(): String = CHConf.BACKEND_NAME
- override def buildInfo(): Backend.BuildInfo =
- Backend.BuildInfo("ClickHouse", CH_BRANCH, CH_COMMIT, "UNKNOWN")
+ override def buildInfo(): BuildInfo =
+ BuildInfo("ClickHouse", CH_BRANCH, CH_COMMIT, "UNKNOWN")
override def convFuncOverride(): ConventionFunc.Override = new ConvFunc()
override def iteratorApi(): IteratorApi = new CHIteratorApi
override def sparkPlanExecApi(): SparkPlanExecApi = new CHSparkPlanExecApi
diff --git
a/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java
b/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java
index e2035455fd..db2d08e314 100644
---
a/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java
+++
b/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.columnarbatch;
+import org.apache.gluten.backendsapi.BackendsApiManager;
import org.apache.gluten.runtime.Runtime;
import org.apache.gluten.runtime.Runtimes;
@@ -59,8 +60,10 @@ public final class VeloxColumnarBatches {
return input;
}
Preconditions.checkArgument(!isVeloxBatch(input));
- final Runtime runtime =
Runtimes.contextInstance("VeloxColumnarBatches#toVeloxBatch");
- final long handle = ColumnarBatches.getNativeHandle(input);
+ final Runtime runtime =
+ Runtimes.contextInstance(
+ BackendsApiManager.getBackendName(),
"VeloxColumnarBatches#toVeloxBatch");
+ final long handle =
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName(), input);
final long outHandle =
VeloxColumnarBatchJniWrapper.create(runtime).from(handle);
final ColumnarBatch output = ColumnarBatches.create(outHandle);
@@ -88,9 +91,13 @@ public final class VeloxColumnarBatches {
* Otherwise {@link UnsupportedOperationException} will be thrown.
*/
public static ColumnarBatch compose(ColumnarBatch... batches) {
- final Runtime runtime =
Runtimes.contextInstance("VeloxColumnarBatches#compose");
+ final Runtime runtime =
+ Runtimes.contextInstance(
+ BackendsApiManager.getBackendName(),
"VeloxColumnarBatches#compose");
final long[] handles =
-
Arrays.stream(batches).mapToLong(ColumnarBatches::getNativeHandle).toArray();
+ Arrays.stream(batches)
+ .mapToLong(b ->
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName(), b))
+ .toArray();
final long handle =
VeloxColumnarBatchJniWrapper.create(runtime).compose(handles);
return ColumnarBatches.create(handle);
}
diff --git
a/backends-velox/src/main/java/org/apache/gluten/metrics/IteratorMetricsJniWrapper.java
b/backends-velox/src/main/java/org/apache/gluten/metrics/IteratorMetricsJniWrapper.java
index 7250fde0fe..d03f8816ad 100644
---
a/backends-velox/src/main/java/org/apache/gluten/metrics/IteratorMetricsJniWrapper.java
+++
b/backends-velox/src/main/java/org/apache/gluten/metrics/IteratorMetricsJniWrapper.java
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.metrics;
+import org.apache.gluten.backendsapi.BackendsApiManager;
import org.apache.gluten.runtime.Runtime;
import org.apache.gluten.runtime.RuntimeAware;
import org.apache.gluten.runtime.Runtimes;
@@ -29,7 +30,8 @@ public class IteratorMetricsJniWrapper implements
RuntimeAware {
}
public static IteratorMetricsJniWrapper create() {
- final Runtime runtime = Runtimes.contextInstance("IteratorMetrics");
+ final Runtime runtime =
+ Runtimes.contextInstance(BackendsApiManager.getBackendName(),
"IteratorMetrics");
return new IteratorMetricsJniWrapper(runtime);
}
diff --git
a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizer.java
b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizer.java
index 2a6dcb43a0..73bfec08cf 100644
---
a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizer.java
+++
b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizer.java
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.utils;
+import org.apache.gluten.backendsapi.BackendsApiManager;
import org.apache.gluten.runtime.Runtime;
import org.apache.gluten.runtime.Runtimes;
import org.apache.gluten.vectorized.ColumnarBatchInIterator;
@@ -28,10 +29,14 @@ import java.util.Iterator;
public final class VeloxBatchResizer {
public static ColumnarBatchOutIterator create(
int minOutputBatchSize, int maxOutputBatchSize, Iterator<ColumnarBatch>
in) {
- final Runtime runtime = Runtimes.contextInstance("VeloxBatchResizer");
+ final Runtime runtime =
+ Runtimes.contextInstance(BackendsApiManager.getBackendName(),
"VeloxBatchResizer");
long outHandle =
VeloxBatchResizerJniWrapper.create(runtime)
- .create(minOutputBatchSize, maxOutputBatchSize, new
ColumnarBatchInIterator(in));
+ .create(
+ minOutputBatchSize,
+ maxOutputBatchSize,
+ new
ColumnarBatchInIterator(BackendsApiManager.getBackendName(), in));
return new ColumnarBatchOutIterator(runtime, outHandle);
}
}
diff --git
a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBloomFilter.java
b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBloomFilter.java
index 10179d63ed..cdf2b195cd 100644
--- a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBloomFilter.java
+++ b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBloomFilter.java
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.utils;
+import org.apache.gluten.backendsapi.BackendsApiManager;
import org.apache.gluten.runtime.Runtimes;
import org.apache.commons.io.IOUtils;
@@ -30,7 +31,8 @@ import java.io.OutputStream;
public class VeloxBloomFilter extends BloomFilter {
private final VeloxBloomFilterJniWrapper jni =
-
VeloxBloomFilterJniWrapper.create(Runtimes.contextInstance("VeloxBloomFilter"));
+ VeloxBloomFilterJniWrapper.create(
+ Runtimes.contextInstance(BackendsApiManager.getBackendName(),
"VeloxBloomFilter"));
private final long handle;
private VeloxBloomFilter(byte[] data) {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 45ffd77652..6bf4b6a4e2 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.backendsapi.velox
import org.apache.gluten.GlutenBuildInfo._
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backend.Backend
+import org.apache.gluten.backend.Component.BuildInfo
import org.apache.gluten.backendsapi._
import org.apache.gluten.columnarbatch.VeloxBatch
import org.apache.gluten.exception.GlutenNotSupportException
@@ -52,9 +52,10 @@ import scala.util.control.Breaks.breakable
class VeloxBackend extends SubstraitBackend {
import VeloxBackend._
+
override def name(): String = VeloxBackend.BACKEND_NAME
- override def buildInfo(): Backend.BuildInfo =
- Backend.BuildInfo("Velox", VELOX_BRANCH, VELOX_REVISION,
VELOX_REVISION_TIME)
+ override def buildInfo(): BuildInfo =
+ BuildInfo("Velox", VELOX_BRANCH, VELOX_REVISION, VELOX_REVISION_TIME)
override def convFuncOverride(): ConventionFunc.Override = new ConvFunc()
override def iteratorApi(): IteratorApi = new VeloxIteratorApi
override def sparkPlanExecApi(): SparkPlanExecApi = new VeloxSparkPlanExecApi
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 061daaac0f..26bc108c15 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -183,9 +183,9 @@ class VeloxIteratorApi extends IteratorApi with Logging {
val columnarNativeIterators =
new JArrayList[ColumnarBatchInIterator](inputIterators.map {
- iter => new ColumnarBatchInIterator(iter.asJava)
+ iter => new ColumnarBatchInIterator(BackendsApiManager.getBackendName,
iter.asJava)
}.asJava)
- val transKernel = NativePlanEvaluator.create()
+ val transKernel =
NativePlanEvaluator.create(BackendsApiManager.getBackendName)
val splitInfoByteArray = inputPartition
.asInstanceOf[GlutenPartition]
@@ -235,10 +235,10 @@ class VeloxIteratorApi extends IteratorApi with Logging {
ExecutorManager.tryTaskSet(numaBindingInfo)
- val transKernel = NativePlanEvaluator.create()
+ val transKernel =
NativePlanEvaluator.create(BackendsApiManager.getBackendName)
val columnarNativeIterator =
new JArrayList[ColumnarBatchInIterator](inputIterators.map {
- iter => new ColumnarBatchInIterator(iter.asJava)
+ iter => new ColumnarBatchInIterator(BackendsApiManager.getBackendName,
iter.asJava)
}.asJava)
val spillDirPath = SparkDirectoryUtil
.get()
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
index 3a82abe618..175e34177a 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
@@ -164,7 +164,7 @@ class VeloxListenerApi extends ListenerApi with Logging {
if (isDriver && !inLocalMode(conf)) {
parsed += (GlutenConfig.COLUMNAR_VELOX_CACHE_ENABLED.key -> "false")
}
- NativeBackendInitializer.initializeBackend(parsed)
+
NativeBackendInitializer.forBackend(VeloxBackend.BACKEND_NAME).initialize(parsed)
// Inject backend-specific implementations to override spark classes.
GlutenFormatFactory.register(new VeloxParquetWriterInjects)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
index 1687f24ce3..c6d2bc0658 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxTransformerApi.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.backendsapi.velox
-import org.apache.gluten.backendsapi.TransformerApi
+import org.apache.gluten.backendsapi.{BackendsApiManager, TransformerApi}
import org.apache.gluten.execution.WriteFilesExecTransformer
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.runtime.Runtimes
@@ -87,7 +87,9 @@ class VeloxTransformerApi extends TransformerApi with Logging
{
override def getNativePlanString(substraitPlan: Array[Byte], details:
Boolean): String = {
TaskResources.runUnsafe {
val jniWrapper = PlanEvaluatorJniWrapper.create(
- Runtimes.contextInstance("VeloxTransformerApi#getNativePlanString"))
+ Runtimes.contextInstance(
+ BackendsApiManager.getBackendName,
+ "VeloxTransformerApi#getNativePlanString"))
jniWrapper.nativePlanString(substraitPlan, details)
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
index ddf77e5fa3..9b30013661 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.backendsapi.velox
-import org.apache.gluten.backendsapi.ValidatorApi
+import org.apache.gluten.backendsapi.{BackendsApiManager, ValidatorApi}
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.substrait.plan.PlanNode
import org.apache.gluten.validate.NativePlanValidationInfo
@@ -38,7 +38,7 @@ class VeloxValidatorApi extends ValidatorApi {
override def doNativeValidateWithFailureReason(plan: PlanNode):
ValidationResult = {
TaskResources.runUnsafe {
- val validator = NativePlanEvaluator.create()
+ val validator =
NativePlanEvaluator.create(BackendsApiManager.getBackendName)
asValidationResult(validator.doNativeValidateWithFailureReason(plan.toProtobuf.toByteArray))
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/datasource/VeloxDataSourceUtil.scala
b/backends-velox/src/main/scala/org/apache/gluten/datasource/VeloxDataSourceUtil.scala
index fe0d0eb0f8..7ea67feb1f 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/datasource/VeloxDataSourceUtil.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/datasource/VeloxDataSourceUtil.scala
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.datasource
+import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.runtime.Runtimes
import org.apache.gluten.utils.ArrowAbiUtil
@@ -38,7 +39,7 @@ object VeloxDataSourceUtil {
def readSchema(file: FileStatus): Option[StructType] = {
val allocator = ArrowBufferAllocators.contextInstance()
- val runtime = Runtimes.contextInstance("VeloxWriter")
+ val runtime = Runtimes.contextInstance(BackendsApiManager.getBackendName,
"VeloxWriter")
val datasourceJniWrapper = VeloxDataSourceJniWrapper.create(runtime)
val dsHandle =
datasourceJniWrapper.init(file.getPath.toString, -1, new
util.HashMap[String, String]())
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
index 20744f531b..576f3a2cb2 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
@@ -188,7 +188,8 @@ case class ColumnarPartialProjectExec(original:
ProjectExec, child: SparkPlan)(
Iterator.empty
} else {
val start = System.currentTimeMillis()
- val childData = ColumnarBatches.select(batch,
projectIndexInChild.toArray)
+ val childData = ColumnarBatches
+ .select(BackendsApiManager.getBackendName, batch,
projectIndexInChild.toArray)
val projectedBatch = getProjectedBatchArrow(childData, c2a, a2c)
val batchIterator = projectedBatch.map {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
index 9921ffbfab..9cdcf854db 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
@@ -17,6 +17,7 @@
package org.apache.gluten.execution
import org.apache.gluten.GlutenConfig
+import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
@@ -121,7 +122,7 @@ object RowToVeloxColumnarExec {
val arrowSchema =
SparkArrowUtil.toArrowSchema(schema, SQLConf.get.sessionLocalTimeZone)
- val runtime = Runtimes.contextInstance("RowToColumnar")
+ val runtime = Runtimes.contextInstance(BackendsApiManager.getBackendName,
"RowToColumnar")
val jniWrapper = NativeRowToColumnarJniWrapper.create(runtime)
val arrowAllocator = ArrowBufferAllocators.contextInstance()
val cSchema = ArrowSchema.allocateNew(arrowAllocator)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
index 8aedeb87cb..4c0f79538c 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.execution
+import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches}
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.extension.ValidationResult
@@ -122,7 +123,7 @@ object VeloxColumnarToRowExec {
return Iterator.empty
}
- val runtime = Runtimes.contextInstance("ColumnarToRow")
+ val runtime = Runtimes.contextInstance(BackendsApiManager.getBackendName,
"ColumnarToRow")
// TODO: Pass the jni jniWrapper and arrowSchema and serializeSchema
method by broadcast.
val jniWrapper = NativeColumnarToRowJniWrapper.create(runtime)
val c2rId = jniWrapper.nativeColumnarToRowInit()
@@ -156,7 +157,7 @@ object VeloxColumnarToRowExec {
val cols = batch.numCols()
val rows = batch.numRows()
val beforeConvert = System.currentTimeMillis()
- val batchHandle = ColumnarBatches.getNativeHandle(batch)
+ val batchHandle =
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
var info =
jniWrapper.nativeColumnarToRowConvert(c2rId, batchHandle, 0)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
index 88215e36bb..cd035e3202 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
@@ -17,6 +17,7 @@
package org.apache.gluten.vectorized
import org.apache.gluten.GlutenConfig
+import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.iterator.ClosableIterator
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.runtime.Runtimes
@@ -99,7 +100,7 @@ private class ColumnarBatchSerializerInstance(
GlutenConfig.getConf.columnarShuffleCodecBackend.orNull
val batchSize = GlutenConfig.getConf.maxBatchSize
val bufferSize = GlutenConfig.getConf.columnarShuffleReaderBufferSize
- val runtime = Runtimes.contextInstance("ShuffleReader")
+ val runtime = Runtimes.contextInstance(BackendsApiManager.getBackendName,
"ShuffleReader")
val jniWrapper = ShuffleReaderJniWrapper.create(runtime)
val shuffleReaderHandle = jniWrapper.make(
cSchema.memoryAddress(),
@@ -135,7 +136,8 @@ private class ColumnarBatchSerializerInstance(
extends DeserializationStream
with TaskResource {
private val byteIn: JniByteInputStream = JniByteInputStreams.create(in)
- private val runtime = Runtimes.contextInstance("ShuffleReader")
+ private val runtime =
+ Runtimes.contextInstance(BackendsApiManager.getBackendName,
"ShuffleReader")
private val wrappedOut: ClosableIterator = new ColumnarBatchOutIterator(
runtime,
ShuffleReaderJniWrapper
diff --git
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
index eaf9d99a9e..e9f821512e 100644
---
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
@@ -17,6 +17,7 @@
package org.apache.spark.shuffle
import org.apache.gluten.GlutenConfig
+import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.memory.memtarget.{MemoryTarget, Spiller, Spillers}
import org.apache.gluten.runtime.Runtimes
@@ -99,7 +100,7 @@ class ColumnarShuffleWriter[K, V](
private val reallocThreshold =
GlutenConfig.getConf.columnarShuffleReallocThreshold
- private val runtime = Runtimes.contextInstance("ShuffleWriter")
+ private val runtime =
Runtimes.contextInstance(BackendsApiManager.getBackendName, "ShuffleWriter")
private val jniWrapper = ShuffleWriterJniWrapper.create(runtime)
@@ -135,7 +136,7 @@ class ColumnarShuffleWriter[K, V](
logInfo(s"Skip ColumnarBatch of ${cb.numRows} rows, ${cb.numCols}
cols")
} else {
val rows = cb.numRows()
- val handle = ColumnarBatches.getNativeHandle(cb)
+ val handle =
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, cb)
if (nativeShuffleWriter == -1L) {
nativeShuffleWriter = jniWrapper.make(
dep.nativePartitioning.getShortName,
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala
index 5bc6b7c56d..11a8cc9809 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution
+import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.runtime.Runtimes
import org.apache.gluten.sql.shims.SparkShimLoader
@@ -152,11 +153,14 @@ object BroadcastUtils {
if (filtered.isEmpty) {
return ColumnarBatchSerializeResult.EMPTY
}
- val handleArray = filtered.map(ColumnarBatches.getNativeHandle)
+ val handleArray =
+ filtered.map(b =>
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, b))
val serializeResult =
try {
ColumnarBatchSerializerJniWrapper
- .create(Runtimes.contextInstance("BroadcastUtils#serializeStream"))
+ .create(
+ Runtimes
+ .contextInstance(BackendsApiManager.getBackendName,
"BroadcastUtils#serializeStream"))
.serialize(handleArray)
} finally {
filtered.foreach(ColumnarBatches.release)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
index d5c848dd97..fa3d348967 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution
+import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
@@ -40,7 +41,8 @@ case class ColumnarBuildSideRelation(output: Seq[Attribute],
batches: Array[Arra
extends BuildSideRelation {
override def deserialized: Iterator[ColumnarBatch] = {
- val runtime = Runtimes.contextInstance("BuildSideRelation#deserialized")
+ val runtime =
+ Runtimes.contextInstance(BackendsApiManager.getBackendName,
"BuildSideRelation#deserialized")
val jniWrapper = ColumnarBatchSerializerJniWrapper.create(runtime)
val serializeHandle: Long = {
val allocator = ArrowBufferAllocators.contextInstance()
@@ -86,7 +88,8 @@ case class ColumnarBuildSideRelation(output: Seq[Attribute],
batches: Array[Arra
* was called in Spark Driver, should manage resources carefully.
*/
override def transform(key: Expression): Array[InternalRow] =
TaskResources.runUnsafe {
- val runtime = Runtimes.contextInstance("BuildSideRelation#transform")
+ val runtime =
+ Runtimes.contextInstance(BackendsApiManager.getBackendName,
"BuildSideRelation#transform")
// This transformation happens in Spark driver, thus resources can not be
managed automatically.
val serializerJniWrapper =
ColumnarBatchSerializerJniWrapper.create(runtime)
val serializeHandle = {
@@ -150,7 +153,7 @@ case class ColumnarBuildSideRelation(output:
Seq[Attribute], batches: Array[Arra
var info =
jniWrapper.nativeColumnarToRowConvert(
c2rId,
- ColumnarBatches.getNativeHandle(batch),
+
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch),
0)
batch.close()
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
index 6fd346f64c..64ad105c7f 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
@@ -176,8 +176,12 @@ class ColumnarCachedBatchSerializer extends
CachedBatchSerializer with SQLConfHe
val batch = it.next()
val results =
ColumnarBatchSerializerJniWrapper
-
.create(Runtimes.contextInstance("ColumnarCachedBatchSerializer#serialize"))
- .serialize(Array(ColumnarBatches.getNativeHandle(batch)))
+ .create(
+ Runtimes.contextInstance(
+ BackendsApiManager.getBackendName,
+ "ColumnarCachedBatchSerializer#serialize"))
+ .serialize(
+
Array(ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName,
batch)))
CachedColumnarBatch(
results.getNumRows.toInt,
results.getSerialized.length,
@@ -201,7 +205,9 @@ class ColumnarCachedBatchSerializer extends
CachedBatchSerializer with SQLConfHe
val timezoneId = SQLConf.get.sessionLocalTimeZone
input.mapPartitions {
it =>
- val runtime =
Runtimes.contextInstance("ColumnarCachedBatchSerializer#read")
+ val runtime = Runtimes.contextInstance(
+ BackendsApiManager.getBackendName,
+ "ColumnarCachedBatchSerializer#read")
val jniWrapper = ColumnarBatchSerializerJniWrapper
.create(runtime)
val schema = SparkArrowUtil.toArrowSchema(localSchema, timezoneId)
@@ -224,7 +230,10 @@ class ColumnarCachedBatchSerializer extends
CachedBatchSerializer with SQLConfHe
val batch = ColumnarBatches.create(batchHandle)
if (shouldSelectAttributes) {
try {
- ColumnarBatches.select(batch, requestedColumnIndices.toArray)
+ ColumnarBatches.select(
+ BackendsApiManager.getBackendName,
+ batch,
+ requestedColumnIndices.toArray)
} finally {
batch.close()
}
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
index cd5f442bc7..c9651557fa 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.datasources.velox
+import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.datasource.{VeloxDataSourceJniWrapper,
VeloxDataSourceUtil}
import org.apache.gluten.exception.GlutenException
@@ -57,7 +58,7 @@ trait VeloxFormatWriterInjects extends
GlutenFormatWriterInjectsBase {
SparkArrowUtil.toArrowSchema(dataSchema,
SQLConf.get.sessionLocalTimeZone)
val cSchema =
ArrowSchema.allocateNew(ArrowBufferAllocators.contextInstance())
var dsHandle = -1L
- val runtime = Runtimes.contextInstance("VeloxWriter")
+ val runtime = Runtimes.contextInstance(BackendsApiManager.getBackendName,
"VeloxWriter")
val datasourceJniWrapper = VeloxDataSourceJniWrapper.create(runtime)
val allocator = ArrowBufferAllocators.contextInstance()
try {
@@ -77,7 +78,7 @@ trait VeloxFormatWriterInjects extends
GlutenFormatWriterInjectsBase {
ColumnarBatches.retain(batch)
val batchHandle = {
ColumnarBatches.checkOffloaded(batch)
- ColumnarBatches.getNativeHandle(batch)
+ ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName,
batch)
}
datasourceJniWrapper.writeBatch(dsHandle, batchHandle)
batch.close()
@@ -108,8 +109,9 @@ class VeloxRowSplitter extends GlutenRowSplitter {
partitionColIndice: Array[Int],
hasBucket: Boolean,
reserve_partition_columns: Boolean = false): BlockStripes = {
- val handler = ColumnarBatches.getNativeHandle(row.batch)
- val runtime = Runtimes.contextInstance("VeloxPartitionWriter")
+ val handler =
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, row.batch)
+ val runtime =
+ Runtimes.contextInstance(BackendsApiManager.getBackendName,
"VeloxPartitionWriter")
val datasourceJniWrapper = VeloxDataSourceJniWrapper.create(runtime)
val originalColumns: Array[Int] = Array.range(0, row.batch.numCols())
val dataColIndice =
originalColumns.filterNot(partitionColIndice.contains(_))
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala
index 32bac02045..e3a84d4f07 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.utils
+import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches}
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
@@ -41,10 +42,11 @@ import org.apache.spark.util.MutablePair
object ExecUtil {
def convertColumnarToRow(batch: ColumnarBatch): Iterator[InternalRow] = {
- val runtime = Runtimes.contextInstance("ExecUtil#ColumnarToRow")
+ val runtime =
+ Runtimes.contextInstance(BackendsApiManager.getBackendName,
"ExecUtil#ColumnarToRow")
val jniWrapper = NativeColumnarToRowJniWrapper.create(runtime)
var info: NativeColumnarToRowInfo = null
- val batchHandle = ColumnarBatches.getNativeHandle(batch)
+ val batchHandle =
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
val c2rHandle = jniWrapper.nativeColumnarToRowInit()
info = jniWrapper.nativeColumnarToRowConvert(c2rHandle, batchHandle, 0)
diff --git
a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
index 54803aa193..f02caf8f2d 100644
---
a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
+++
b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.columnarbatch;
+import org.apache.gluten.backendsapi.BackendsApiManager;
import org.apache.gluten.execution.RowToVeloxColumnarExec;
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators;
import org.apache.gluten.test.VeloxBackendTestBase;
@@ -114,9 +115,12 @@ public class ColumnarBatchTest extends
VeloxBackendTestBase {
final ColumnarBatch offloaded =
ColumnarBatches.offload(ArrowBufferAllocators.contextInstance(),
batch);
Assert.assertEquals(1, ColumnarBatches.getRefCnt(offloaded));
- final long handle = ColumnarBatches.getNativeHandle(offloaded);
+ final long handle =
+
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName(), offloaded);
final ColumnarBatch created = ColumnarBatches.create(handle);
- Assert.assertEquals(handle,
ColumnarBatches.getNativeHandle(created));
+ Assert.assertEquals(
+ handle,
+
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName(), created));
Assert.assertEquals(1, ColumnarBatches.getRefCnt(offloaded));
Assert.assertEquals(1, ColumnarBatches.getRefCnt(created));
ColumnarBatches.retain(created);
diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index 3090652b81..16acada54f 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -80,11 +80,17 @@ class Runtime : public
std::enable_shared_from_this<Runtime> {
return kind_;
}
- virtual void parsePlan(const uint8_t* data, int32_t size,
std::optional<std::string> dumpFile) = 0;
+ virtual void parsePlan(const uint8_t* data, int32_t size,
std::optional<std::string> dumpFile) {
+ throw GlutenException("Not implemented");
+ }
- virtual void parseSplitInfo(const uint8_t* data, int32_t size,
std::optional<std::string> dumpFile) = 0;
+ virtual void parseSplitInfo(const uint8_t* data, int32_t size,
std::optional<std::string> dumpFile) {
+ throw GlutenException("Not implemented");
+ }
- virtual std::string planString(bool details, const
std::unordered_map<std::string, std::string>& sessionConf) = 0;
+ virtual std::string planString(bool details, const
std::unordered_map<std::string, std::string>& sessionConf) {
+ throw GlutenException("Not implemented");
+ }
// Just for benchmark
::substrait::Plan& getPlan() {
@@ -94,11 +100,17 @@ class Runtime : public
std::enable_shared_from_this<Runtime> {
virtual std::shared_ptr<ResultIterator> createResultIterator(
const std::string& spillDir,
const std::vector<std::shared_ptr<ResultIterator>>& inputs,
- const std::unordered_map<std::string, std::string>& sessionConf) = 0;
+ const std::unordered_map<std::string, std::string>& sessionConf) {
+ throw GlutenException("Not implemented");
+ }
- virtual std::shared_ptr<ColumnarBatch> createOrGetEmptySchemaBatch(int32_t
numRows) = 0;
+ virtual std::shared_ptr<ColumnarBatch> createOrGetEmptySchemaBatch(int32_t
numRows) {
+ throw GlutenException("Not implemented");
+ }
- virtual std::shared_ptr<ColumnarBatch>
select(std::shared_ptr<ColumnarBatch>, const std::vector<int32_t>&) = 0;
+ virtual std::shared_ptr<ColumnarBatch>
select(std::shared_ptr<ColumnarBatch>, const std::vector<int32_t>&) {
+ throw GlutenException("Not implemented");
+ }
virtual MemoryManager* memoryManager() {
return memoryManager_;
@@ -106,26 +118,42 @@ class Runtime : public
std::enable_shared_from_this<Runtime> {
/// This function is used to create certain converter from the format used by
/// the backend to Spark unsafe row.
- virtual std::shared_ptr<ColumnarToRowConverter>
createColumnar2RowConverter(int64_t column2RowMemThreshold) = 0;
+ virtual std::shared_ptr<ColumnarToRowConverter>
createColumnar2RowConverter(int64_t column2RowMemThreshold) {
+ throw GlutenException("Not implemented");
+ }
- virtual std::shared_ptr<RowToColumnarConverter>
createRow2ColumnarConverter(struct ArrowSchema* cSchema) = 0;
+ virtual std::shared_ptr<RowToColumnarConverter>
createRow2ColumnarConverter(struct ArrowSchema* cSchema) {
+ throw GlutenException("Not implemented");
+ }
virtual std::shared_ptr<ShuffleWriter> createShuffleWriter(
int numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
- ShuffleWriterOptions options) = 0;
+ ShuffleWriterOptions options) {
+ throw GlutenException("Not implemented");
+ }
- virtual Metrics* getMetrics(ColumnarBatchIterator* rawIter, int64_t
exportNanos) = 0;
+ virtual Metrics* getMetrics(ColumnarBatchIterator* rawIter, int64_t
exportNanos) {
+ throw GlutenException("Not implemented");
+ }
virtual std::shared_ptr<ShuffleReader> createShuffleReader(
std::shared_ptr<arrow::Schema> schema,
- ShuffleReaderOptions options) = 0;
+ ShuffleReaderOptions options) {
+ throw GlutenException("Not implemented");
+ }
- virtual std::unique_ptr<ColumnarBatchSerializer>
createColumnarBatchSerializer(struct ArrowSchema* cSchema) = 0;
+ virtual std::unique_ptr<ColumnarBatchSerializer>
createColumnarBatchSerializer(struct ArrowSchema* cSchema) {
+ throw GlutenException("Not implemented");
+ }
- virtual void dumpConf(const std::string& path) = 0;
+ virtual void dumpConf(const std::string& path) {
+ throw GlutenException("Not implemented");
+ }
- virtual std::shared_ptr<ArrowWriter> createArrowWriter(const std::string&
path) = 0;
+ virtual std::shared_ptr<ArrowWriter> createArrowWriter(const std::string&
path) {
+ throw GlutenException("Not implemented");
+ };
const std::unordered_map<std::string, std::string>& getConfMap() {
return confMap_;
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index b7bdae8175..f75b16b461 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -41,32 +41,33 @@
using namespace gluten;
-static jclass javaReservationListenerClass;
+namespace {
+jclass javaReservationListenerClass;
-static jmethodID reserveMemoryMethod;
-static jmethodID unreserveMemoryMethod;
+jmethodID reserveMemoryMethod;
+jmethodID unreserveMemoryMethod;
-static jclass byteArrayClass;
+jclass byteArrayClass;
-static jclass jniByteInputStreamClass;
-static jmethodID jniByteInputStreamRead;
-static jmethodID jniByteInputStreamTell;
-static jmethodID jniByteInputStreamClose;
+jclass jniByteInputStreamClass;
+jmethodID jniByteInputStreamRead;
+jmethodID jniByteInputStreamTell;
+jmethodID jniByteInputStreamClose;
-static jclass splitResultClass;
-static jmethodID splitResultConstructor;
+jclass splitResultClass;
+jmethodID splitResultConstructor;
-static jclass columnarBatchSerializeResultClass;
-static jmethodID columnarBatchSerializeResultConstructor;
+jclass columnarBatchSerializeResultClass;
+jmethodID columnarBatchSerializeResultConstructor;
-static jclass metricsBuilderClass;
-static jmethodID metricsBuilderConstructor;
-static jclass nativeColumnarToRowInfoClass;
-static jmethodID nativeColumnarToRowInfoConstructor;
+jclass metricsBuilderClass;
+jmethodID metricsBuilderConstructor;
+jclass nativeColumnarToRowInfoClass;
+jmethodID nativeColumnarToRowInfoConstructor;
-static jclass shuffleReaderMetricsClass;
-static jmethodID shuffleReaderMetricsSetDecompressTime;
-static jmethodID shuffleReaderMetricsSetDeserializeTime;
+jclass shuffleReaderMetricsClass;
+jmethodID shuffleReaderMetricsSetDecompressTime;
+jmethodID shuffleReaderMetricsSetDeserializeTime;
class JavaInputStreamAdaptor final : public arrow::io::InputStream {
public:
@@ -140,6 +141,61 @@ class JavaInputStreamAdaptor final : public
arrow::io::InputStream {
bool closed_ = false;
};
+/// Internal backend consists of empty implementations of Runtime API and
MemoryManager API.
+/// The backend is used for saving contextual objects only.
+///
+/// It's also possible to extend the implementation for handling Arrow-based
requests either in the future.
+inline static const std::string kInternalBackendKind{"internal"};
+
+class InternalMemoryManager : public MemoryManager {
+ public:
+ InternalMemoryManager(const std::string& kind) : MemoryManager(kind) {}
+
+ arrow::MemoryPool* getArrowMemoryPool() override {
+ throw GlutenException("Not implemented");
+ }
+
+ const MemoryUsageStats collectMemoryUsageStats() const override {
+ return MemoryUsageStats();
+ }
+
+ const int64_t shrink(int64_t size) override {
+ return 0;
+ }
+
+ void hold() override {}
+};
+
+class InternalRuntime : public Runtime {
+ public:
+ InternalRuntime(
+ const std::string& kind,
+ MemoryManager* memoryManager,
+ const std::unordered_map<std::string, std::string>& confMap)
+ : Runtime(kind, memoryManager, confMap) {}
+};
+
+MemoryManager* internalMemoryManagerFactory(const std::string& kind,
std::unique_ptr<AllocationListener> listener) {
+ return new InternalMemoryManager(kind);
+}
+
+void internalMemoryManagerReleaser(MemoryManager* memoryManager) {
+ delete memoryManager;
+}
+
+Runtime* internalRuntimeFactory(
+ const std::string& kind,
+ MemoryManager* memoryManager,
+ const std::unordered_map<std::string, std::string>& sessionConf) {
+ return new InternalRuntime(kind, memoryManager, sessionConf);
+}
+
+void internalRuntimeReleaser(Runtime* runtime) {
+ delete runtime;
+}
+
+} // namespace
+
#ifdef __cplusplus
extern "C" {
#endif
@@ -152,6 +208,9 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
getJniCommonState()->ensureInitialized(env);
getJniErrorState()->ensureInitialized(env);
+ MemoryManager::registerFactory(kInternalBackendKind,
internalMemoryManagerFactory, internalMemoryManagerReleaser);
+ Runtime::registerFactory(kInternalBackendKind, internalRuntimeFactory,
internalRuntimeReleaser);
+
byteArrayClass = createGlobalClassReferenceOrError(env, "[B");
jniByteInputStreamClass = createGlobalClassReferenceOrError(env,
"Lorg/apache/gluten/vectorized/JniByteInputStream;");
@@ -275,11 +334,11 @@ JNIEXPORT jbyteArray JNICALL
Java_org_apache_gluten_memory_NativeMemoryManagerJn
const MemoryUsageStats& stats = memoryManager->collectMemoryUsageStats();
auto size = stats.ByteSizeLong();
jbyteArray out = env->NewByteArray(size);
- uint8_t buffer[size];
+ std::vector<uint8_t> buffer(size);
GLUTEN_CHECK(
- stats.SerializeToArray(reinterpret_cast<void*>(buffer), size),
+ stats.SerializeToArray(reinterpret_cast<void*>(buffer.data()), size),
"Serialization failed when collecting memory usage stats");
- env->SetByteArrayRegion(out, 0, size, reinterpret_cast<jbyte*>(buffer));
+ env->SetByteArrayRegion(out, 0, size,
reinterpret_cast<jbyte*>(buffer.data()));
return out;
JNI_METHOD_END(nullptr)
}
@@ -650,7 +709,7 @@ JNIEXPORT void JNICALL
Java_org_apache_gluten_vectorized_NativeRowToColumnarJniW
JNIEXPORT jstring JNICALL
Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_getType( // NOLINT
JNIEnv* env,
- jobject wrapper,
+ jclass,
jlong batchHandle) {
JNI_METHOD_START
auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
@@ -660,7 +719,7 @@ JNIEXPORT jstring JNICALL
Java_org_apache_gluten_columnarbatch_ColumnarBatchJniW
JNIEXPORT jlong JNICALL
Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_numBytes( // NOLINT
JNIEnv* env,
- jobject wrapper,
+ jclass,
jlong batchHandle) {
JNI_METHOD_START
auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
@@ -670,7 +729,7 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWra
JNIEXPORT jlong JNICALL
Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_numColumns( //
NOLINT
JNIEnv* env,
- jobject wrapper,
+ jclass,
jlong batchHandle) {
JNI_METHOD_START
auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
@@ -680,7 +739,7 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWra
JNIEXPORT jlong JNICALL
Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_numRows( // NOLINT
JNIEnv* env,
- jobject wrapper,
+ jclass,
jlong batchHandle) {
JNI_METHOD_START
auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
@@ -690,7 +749,7 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWra
JNIEXPORT void JNICALL
Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_exportToArrow( //
NOLINT
JNIEnv* env,
- jobject wrapper,
+ jclass,
jlong batchHandle,
jlong cSchema,
jlong cArray) {
@@ -703,6 +762,15 @@ JNIEXPORT void JNICALL
Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrap
JNI_METHOD_END()
}
+JNIEXPORT void JNICALL
Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_close( // NOLINT
+ JNIEnv* env,
+ jclass,
+ jlong batchHandle) {
+ JNI_METHOD_START
+ ObjectStore::release(batchHandle);
+ JNI_METHOD_END()
+}
+
JNIEXPORT jlong JNICALL
Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_createWithArrowArray(
// NOLINT
JNIEnv* env,
jobject wrapper,
@@ -752,15 +820,6 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWra
JNI_METHOD_END(kInvalidObjectHandle)
}
-JNIEXPORT void JNICALL
Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_close( // NOLINT
- JNIEnv* env,
- jobject wrapper,
- jlong batchHandle) {
- JNI_METHOD_START
- ObjectStore::release(batchHandle);
- JNI_METHOD_END()
-}
-
// Shuffle
JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrapper_nativeMake( // NOLINT
JNIEnv* env,
diff --git a/cpp/core/utils/Registry.h b/cpp/core/utils/Registry.h
index e50eb6763d..a325a05e5a 100644
--- a/cpp/core/utils/Registry.h
+++ b/cpp/core/utils/Registry.h
@@ -29,19 +29,19 @@ class Registry {
public:
void registerObj(const std::string& kind, T t) {
std::lock_guard<std::mutex> l(mutex_);
- GLUTEN_CHECK(map_.find(kind) == map_.end(), "Already registered for " +
kind);
+ GLUTEN_CHECK(map_.find(kind) == map_.end(), "Required object already
registered for " + kind);
map_[kind] = std::move(t);
}
T& get(const std::string& kind) {
std::lock_guard<std::mutex> l(mutex_);
- GLUTEN_CHECK(map_.find(kind) != map_.end(), "Not registered for " + kind);
+ GLUTEN_CHECK(map_.find(kind) != map_.end(), "Required object not
registered for " + kind);
return map_[kind];
}
bool unregisterObj(const std::string& kind) {
std::lock_guard<std::mutex> l(mutex_);
- GLUTEN_CHECK(map_.find(kind) != map_.end(), "Not registered for " + kind);
+ GLUTEN_CHECK(map_.find(kind) != map_.end(), "Required object not
registered for " + kind);
return map_.erase(kind);
}
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 6ea60d651a..9d6ad157ff 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -41,13 +41,15 @@
using namespace gluten;
using namespace facebook;
+namespace {
+jclass blockStripesClass;
+jmethodID blockStripesConstructor;
+} // namespace
+
#ifdef __cplusplus
extern "C" {
#endif
-static jclass blockStripesClass;
-static jmethodID blockStripesConstructor;
-
jint JNI_OnLoad(JavaVM* vm, void*) {
JNIEnv* env;
if (vm->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) {
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatchJniWrapper.java
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatchJniWrapper.java
index 94312a2cf5..d69e84c3a1 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatchJniWrapper.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatchJniWrapper.java
@@ -30,24 +30,26 @@ public class ColumnarBatchJniWrapper implements
RuntimeAware {
return new ColumnarBatchJniWrapper(runtime);
}
- public native long createWithArrowArray(long cSchema, long cArray);
+ // Static methods.
+ public static native String getType(long batch);
- public native long getForEmptySchema(int numRows);
+ public static native long numColumns(long batch);
+
+ public static native long numRows(long batch);
- public native String getType(long batch);
+ public static native long numBytes(long batch);
- public native long numColumns(long batch);
+ public static native void exportToArrow(long batch, long cSchema, long
cArray);
- public native long numRows(long batch);
+ public static native void close(long batch);
- public native long numBytes(long batch);
+ // Member methods in which native code relies on the backend's runtime API
implementation.
+ public native long createWithArrowArray(long cSchema, long cArray);
- public native void exportToArrow(long batch, long cSchema, long cArray);
+ public native long getForEmptySchema(int numRows);
public native long select(long batch, int[] columnIndices);
- public native void close(long batch);
-
@Override
public long rtHandle() {
return runtime.getHandle();
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
index 04236884a1..3914fb155e 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java
@@ -45,6 +45,7 @@ import java.util.NoSuchElementException;
import scala.collection.JavaConverters;
public final class ColumnarBatches {
+ private static final String INTERNAL_BACKEND_KIND = "internal";
private ColumnarBatches() {}
@@ -108,8 +109,8 @@ public final class ColumnarBatches {
* This method will always return a velox based ColumnarBatch. This method
will close the input
* column batch.
*/
- public static ColumnarBatch select(ColumnarBatch batch, int[] columnIndices)
{
- final Runtime runtime = Runtimes.contextInstance("ColumnarBatches#select");
+ public static ColumnarBatch select(String backendName, ColumnarBatch batch,
int[] columnIndices) {
+ final Runtime runtime = Runtimes.contextInstance(backendName,
"ColumnarBatches#select");
switch (identifyBatchType(batch)) {
case LIGHT:
final IndicatorVector iv = getIndicatorVector(batch);
@@ -188,8 +189,8 @@ public final class ColumnarBatches {
ArrowArray cArray = ArrowArray.allocateNew(allocator);
ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator);
CDataDictionaryProvider provider = new CDataDictionaryProvider()) {
-
ColumnarBatchJniWrapper.create(Runtimes.contextInstance("ColumnarBatches#load"))
- .exportToArrow(iv.handle(), cSchema.memoryAddress(),
cArray.memoryAddress());
+ ColumnarBatchJniWrapper.exportToArrow(
+ iv.handle(), cSchema.memoryAddress(), cArray.memoryAddress());
Data.exportSchema(
allocator, ArrowUtil.toArrowSchema(cSchema, allocator, provider),
provider, arrowSchema);
@@ -229,7 +230,10 @@ public final class ColumnarBatches {
if (input.numCols() == 0) {
throw new IllegalArgumentException("batch with zero columns cannot be
offloaded");
}
- final Runtime runtime =
Runtimes.contextInstance("ColumnarBatches#offload");
+ // Batch-offloading doesn't involve any backend-specific native code. Use
the internal
+ // backend to store native batch references only.
+ final Runtime runtime =
+ Runtimes.contextInstance(INTERNAL_BACKEND_KIND,
"ColumnarBatches#offload");
try (ArrowArray cArray = ArrowArray.allocateNew(allocator);
ArrowSchema cSchema = ArrowSchema.allocateNew(allocator)) {
ArrowAbiUtil.exportFromSparkColumnarBatch(allocator, input, cSchema,
cArray);
@@ -383,11 +387,11 @@ public final class ColumnarBatches {
return (IndicatorVector) input.column(0);
}
- public static long getNativeHandle(ColumnarBatch batch) {
+ public static long getNativeHandle(String backendName, ColumnarBatch batch) {
if (isZeroColumnBatch(batch)) {
final ColumnarBatchJniWrapper jniWrapper =
ColumnarBatchJniWrapper.create(
- Runtimes.contextInstance("ColumnarBatches#getNativeHandle"));
+ Runtimes.contextInstance(backendName,
"ColumnarBatches#getNativeHandle"));
return jniWrapper.getForEmptySchema(batch.numRows());
}
return getIndicatorVector(batch).handle();
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/IndicatorVector.java
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/IndicatorVector.java
index 3f09a3619b..251925d035 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/IndicatorVector.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/IndicatorVector.java
@@ -54,7 +54,7 @@ public class IndicatorVector extends IndicatorVectorBase {
}
if (refCnt.decrementAndGet() == 0) {
pool.remove(handle);
- jniWrapper.close(handle);
+ ColumnarBatchJniWrapper.close(handle);
}
}
}
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorBase.java
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorBase.java
index 700eb3cade..e0e266ca04 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorBase.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorBase.java
@@ -16,8 +16,6 @@
*/
package org.apache.gluten.columnarbatch;
-import org.apache.gluten.runtime.Runtimes;
-
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.vectorized.ColumnVector;
@@ -26,26 +24,23 @@ import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.UTF8String;
public abstract class IndicatorVectorBase extends ColumnVector {
- protected final ColumnarBatchJniWrapper jniWrapper;
protected final long handle;
protected IndicatorVectorBase(long handle) {
super(DataTypes.NullType);
- this.jniWrapper =
-
ColumnarBatchJniWrapper.create(Runtimes.contextInstance("IndicatorVectorBase#init"));
this.handle = handle;
}
public String getType() {
- return jniWrapper.getType(handle);
+ return ColumnarBatchJniWrapper.getType(handle);
}
public long getNumColumns() {
- return jniWrapper.numColumns(handle);
+ return ColumnarBatchJniWrapper.numColumns(handle);
}
public long getNumRows() {
- return jniWrapper.numRows(handle);
+ return ColumnarBatchJniWrapper.numRows(handle);
}
abstract long refCnt();
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java
b/gluten-arrow/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java
index f85187fae6..fe1ebc7634 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java
@@ -17,7 +17,6 @@
package org.apache.gluten.init;
import org.apache.gluten.GlutenConfig;
-import org.apache.gluten.backend.Backend;
import org.apache.gluten.utils.ConfigUtil;
import org.apache.spark.util.SparkShutdownManagerUtil;
@@ -25,6 +24,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import scala.runtime.BoxedUnit;
@@ -32,7 +32,18 @@ import scala.runtime.BoxedUnit;
// Initialize native backend before calling any native methods from Java side.
public final class NativeBackendInitializer {
private static final Logger LOG =
LoggerFactory.getLogger(NativeBackendInitializer.class);
- private static final AtomicBoolean initialized = new AtomicBoolean(false);
+ private static final Map<String, NativeBackendInitializer> instances = new
ConcurrentHashMap<>();
+
+ private final AtomicBoolean initialized = new AtomicBoolean(false);
+ private final String backendName;
+
+ private NativeBackendInitializer(String backendName) {
+ this.backendName = backendName;
+ }
+
+ public static NativeBackendInitializer forBackend(String backendName) {
+ return instances.computeIfAbsent(backendName, k -> new
NativeBackendInitializer(backendName));
+ }
// Spark DriverPlugin/ExecutorPlugin will only invoke
NativeBackendInitializer#initializeBackend
// method once in its init method.
@@ -41,7 +52,7 @@ public final class NativeBackendInitializer {
// In local mode, NativeBackendInitializer#initializeBackend will be invoked
twice in same
// thread, driver first then executor, initialized flag ensure only invoke
initializeBackend once,
// so there are no race condition here.
- public static void initializeBackend(scala.collection.Map<String, String>
conf) {
+ public void initialize(scala.collection.Map<String, String> conf) {
if (!initialized.compareAndSet(false, true)) {
// Already called.
return;
@@ -54,10 +65,9 @@ public final class NativeBackendInitializer {
});
}
- private static void initialize0(scala.collection.Map<String, String> conf) {
+ private void initialize0(scala.collection.Map<String, String> conf) {
try {
- Map<String, String> nativeConfMap =
- GlutenConfig.getNativeBackendConf(Backend.get().name(), conf);
+ Map<String, String> nativeConfMap =
GlutenConfig.getNativeBackendConf(backendName, conf);
initialize(ConfigUtil.serialize(nativeConfMap));
} catch (Exception e) {
LOG.error("Failed to call native backend's initialize method", e);
@@ -65,9 +75,7 @@ public final class NativeBackendInitializer {
}
}
- private static native void initialize(byte[] configPlan);
-
- private static native void shutdown();
+ private native void initialize(byte[] configPlan);
- private NativeBackendInitializer() {}
+ private native void shutdown();
}
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchInIterator.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchInIterator.java
index f95324fad9..378927bec4 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchInIterator.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchInIterator.java
@@ -23,9 +23,11 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
import java.util.Iterator;
public class ColumnarBatchInIterator {
+ private final String backendName;
private final Iterator<ColumnarBatch> delegated;
- public ColumnarBatchInIterator(Iterator<ColumnarBatch> delegated) {
+ public ColumnarBatchInIterator(String backendName, Iterator<ColumnarBatch>
delegated) {
+ this.backendName = backendName;
this.delegated = delegated;
}
@@ -38,6 +40,6 @@ public class ColumnarBatchInIterator {
public long next() {
final ColumnarBatch next = delegated.next();
ColumnarBatches.checkOffloaded(next);
- return ColumnarBatches.getNativeHandle(next);
+ return ColumnarBatches.getNativeHandle(backendName, next);
}
}
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
index 1c03c415ed..bfa059b5ae 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
@@ -32,17 +32,19 @@ import java.util.concurrent.atomic.AtomicInteger;
public class NativePlanEvaluator {
private static final AtomicInteger id = new AtomicInteger(0);
- private final Runtime runtime =
- Runtimes.contextInstance(String.format("NativePlanEvaluator-%d",
id.getAndIncrement()));
+ private final Runtime runtime;
private final PlanEvaluatorJniWrapper jniWrapper;
- private NativePlanEvaluator() {
- jniWrapper = PlanEvaluatorJniWrapper.create(runtime);
+ private NativePlanEvaluator(Runtime runtime) {
+ this.runtime = runtime;
+ this.jniWrapper = PlanEvaluatorJniWrapper.create(runtime);
}
- public static NativePlanEvaluator create() {
- return new NativePlanEvaluator();
+ public static NativePlanEvaluator create(String backendName) {
+ return new NativePlanEvaluator(
+ Runtimes.contextInstance(
+ backendName, String.format("NativePlanEvaluator-%d",
id.getAndIncrement())));
}
public NativePlanValidationInfo doNativeValidateWithFailureReason(byte[]
subPlan) {
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
index 409c4297fa..e65d3c4d95 100644
---
a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
+++
b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
@@ -17,7 +17,6 @@
package org.apache.gluten.memory
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backend.Backend
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.memory.listener.ReservationListeners
import org.apache.gluten.memory.memtarget.{KnownNameAndStats, MemoryTarget,
Spiller, Spillers}
@@ -42,18 +41,19 @@ trait NativeMemoryManager {
}
object NativeMemoryManager {
- private class Impl(name: String) extends NativeMemoryManager with
TaskResource {
+ private class Impl(backendName: String, name: String)
+ extends NativeMemoryManager
+ with TaskResource {
private val LOGGER = LoggerFactory.getLogger(classOf[NativeMemoryManager])
private val spillers = Spillers.appendable()
private val mutableStats: mutable.Map[String, MemoryUsageStatsBuilder] =
mutable.Map()
private val rl = ReservationListeners.create(name, spillers,
mutableStats.asJava)
private val handle = NativeMemoryManagerJniWrapper.create(
- Backend.get().name(),
+ backendName,
rl,
ConfigUtil.serialize(
- GlutenConfig.getNativeSessionConf(
- Backend.get().name(),
- GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs)))
+ GlutenConfig
+ .getNativeSessionConf(backendName,
GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs)))
)
spillers.append(new Spiller() {
override def spill(self: MemoryTarget, phase: Spiller.Phase, size:
Long): Long = {
@@ -109,11 +109,15 @@ object NativeMemoryManager {
))
}
}
- override def priority(): Int = 0
+ override def priority(): Int = {
+ // Memory managers should be released after all runtimes are released.
+ // So lower the priority to 0.
+ 0
+ }
override def resourceName(): String = "nmm"
}
- def apply(name: String): NativeMemoryManager = {
- TaskResources.addAnonymousResource(new Impl(name))
+ def apply(backendName: String, name: String): NativeMemoryManager = {
+ TaskResources.addAnonymousResource(new Impl(backendName, name))
}
}
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
index efd4928cfd..8741c12474 100644
--- a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
+++ b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
@@ -17,7 +17,6 @@
package org.apache.gluten.runtime
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backend.Backend
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.memory.NativeMemoryManager
import org.apache.gluten.utils.ConfigUtil
@@ -35,21 +34,20 @@ trait Runtime {
}
object Runtime {
- private[runtime] def apply(name: String): Runtime with TaskResource = {
- new RuntimeImpl(name)
+ private[runtime] def apply(backendName: String, name: String): Runtime with
TaskResource = {
+ new RuntimeImpl(backendName, name)
}
- private class RuntimeImpl(name: String) extends Runtime with TaskResource {
+ private class RuntimeImpl(backendName: String, name: String) extends Runtime
with TaskResource {
private val LOGGER = LoggerFactory.getLogger(classOf[Runtime])
- private val nmm: NativeMemoryManager = NativeMemoryManager(name)
+ private val nmm: NativeMemoryManager = NativeMemoryManager(backendName,
name)
private val handle = RuntimeJniWrapper.createRuntime(
- Backend.get().name(),
+ backendName,
nmm.getHandle(),
ConfigUtil.serialize(
- GlutenConfig.getNativeSessionConf(
- Backend.get().name(),
- GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs)))
+ GlutenConfig
+ .getNativeSessionConf(backendName,
GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs)))
)
private val released: AtomicBoolean = new AtomicBoolean(false)
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala
index bfb2465b12..1aca275d5c 100644
--- a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala
+++ b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala
@@ -21,15 +21,16 @@ import org.apache.spark.task.{TaskResource, TaskResources}
object Runtimes {
/** Get or create the runtime which bound with Spark TaskContext. */
- def contextInstance(name: String): Runtime = {
+ def contextInstance(backendName: String, name: String): Runtime = {
if (!TaskResources.inSparkTask()) {
throw new IllegalStateException("This method must be called in a Spark
task.")
}
- TaskResources.addResourceIfNotRegistered(name, () => create(name))
+ val resourceName = String.format("%s:%s", backendName, name)
+ TaskResources.addResourceIfNotRegistered(resourceName, () =>
create(backendName, name))
}
- private def create(name: String): Runtime with TaskResource = {
- Runtime(name)
+ private def create(backendName: String, name: String): Runtime with
TaskResource = {
+ Runtime(backendName, name)
}
}
diff --git
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriterFactory.scala
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriterFactory.scala
index 42dcd9bed5..e4652c5415 100644
---
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriterFactory.scala
+++
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriterFactory.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.shuffle
-import org.apache.gluten.backend.Backend
+import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.spark.TaskContext
import org.apache.spark.shuffle.celeborn.CelebornShuffleHandle
@@ -26,7 +26,7 @@ import org.apache.celeborn.client.ShuffleClient
import org.apache.celeborn.common.CelebornConf
class CHCelebornColumnarShuffleWriterFactory extends
CelebornShuffleWriterFactory {
- override def backendName(): String = Backend.get().name()
+ override def backendName(): String = BackendsApiManager.getBackendName
override def createShuffleWriterInstance[K, V](
shuffleId: Int,
diff --git
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
index d5f20c8dea..a4a97d43de 100644
---
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
+++
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
@@ -18,6 +18,7 @@ package org.apache.spark.shuffle
import org.apache.gluten.GlutenConfig
import org.apache.gluten.GlutenConfig.{GLUTEN_RSS_SORT_SHUFFLE_WRITER,
GLUTEN_SORT_SHUFFLE_WRITER}
+import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.runtime.Runtimes
import org.apache.gluten.utils.ArrowAbiUtil
@@ -65,7 +66,8 @@ private class CelebornColumnarBatchSerializerInstance(
extends SerializerInstance
with Logging {
- private val runtime = Runtimes.contextInstance("CelebornShuffleReader")
+ private val runtime =
+ Runtimes.contextInstance(BackendsApiManager.getBackendName,
"CelebornShuffleReader")
private val shuffleReaderHandle = {
val allocator: BufferAllocator = ArrowBufferAllocators
diff --git
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
index 1a0bc475d3..165d68785d 100644
---
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
+++
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
@@ -17,6 +17,7 @@
package org.apache.spark.shuffle
import org.apache.gluten.GlutenConfig
+import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.memory.memtarget.{MemoryTarget, Spiller, Spillers}
import org.apache.gluten.runtime.Runtimes
@@ -51,7 +52,8 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
writeMetrics) {
private val isSort =
!GlutenConfig.GLUTEN_HASH_SHUFFLE_WRITER.equals(shuffleWriterType)
- private val runtime = Runtimes.contextInstance("CelebornShuffleWriter")
+ private val runtime =
+ Runtimes.contextInstance(BackendsApiManager.getBackendName,
"CelebornShuffleWriter")
private val jniWrapper = ShuffleWriterJniWrapper.create(runtime)
@@ -75,7 +77,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
logInfo(s"Skip ColumnarBatch of ${cb.numRows} rows, ${cb.numCols}
cols")
} else {
initShuffleWriter(cb)
- val handle = ColumnarBatches.getNativeHandle(cb)
+ val handle =
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, cb)
val startTime = System.nanoTime()
jniWrapper.write(nativeShuffleWriter, cb.numRows, handle,
availableOffHeapPerTask())
dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime)
@@ -131,7 +133,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
clientPushBufferMaxSize,
clientPushSortMemoryThreshold,
celebornPartitionPusher,
- ColumnarBatches.getNativeHandle(columnarBatch),
+ ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName,
columnarBatch),
context.taskAttemptId(),
GlutenShuffleUtils.getStartPartitionId(dep.nativePartitioning,
context.partitionId),
"celeborn",
diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
index e52c53d1d5..03d16c41c7 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
@@ -18,7 +18,7 @@ package org.apache.gluten
import org.apache.gluten.GlutenBuildInfo._
import org.apache.gluten.GlutenConfig._
-import org.apache.gluten.backend.Backend
+import org.apache.gluten.backend.Component
import org.apache.gluten.events.GlutenBuildInfoEvent
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.extension.GlutenSessionExtensions
@@ -68,7 +68,7 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin
with Logging {
setPredefinedConfigs(conf)
// Initialize Backend.
- Backend.get().onDriverStart(sc, pluginContext)
+ Component.sorted().foreach(_.onDriverStart(sc, pluginContext))
Collections.emptyMap()
}
@@ -84,12 +84,10 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
}
override def shutdown(): Unit = {
- Backend.get().onDriverShutdown()
+ Component.sorted().reverse.foreach(_.onDriverShutdown())
}
private def postBuildInfoEvent(sc: SparkContext): Unit = {
- val buildInfo = Backend.get().buildInfo()
-
// export gluten version to property to spark
System.setProperty("gluten.version", VERSION)
@@ -105,10 +103,16 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
glutenBuildInfo.put("Gluten Revision Time", REVISION_TIME)
glutenBuildInfo.put("Gluten Build Time", BUILD_DATE)
glutenBuildInfo.put("Gluten Repo URL", REPO_URL)
- glutenBuildInfo.put("Backend", buildInfo.name)
- glutenBuildInfo.put("Backend Branch", buildInfo.branch)
- glutenBuildInfo.put("Backend Revision", buildInfo.revision)
- glutenBuildInfo.put("Backend Revision Time", buildInfo.revisionTime)
+
+ Component.sorted().foreach {
+ comp =>
+ val buildInfo = comp.buildInfo()
+ glutenBuildInfo.put("Component", buildInfo.name)
+ glutenBuildInfo.put("Component Branch", buildInfo.branch)
+ glutenBuildInfo.put("Component Revision", buildInfo.revision)
+ glutenBuildInfo.put("Component Revision Time", buildInfo.revisionTime)
+ }
+
val infoMap = glutenBuildInfo.toMap
val loggingInfo = infoMap.toSeq
.sortBy(_._1)
@@ -254,12 +258,12 @@ private[gluten] class GlutenExecutorPlugin extends
ExecutorPlugin {
/** Initialize the executor plugin. */
override def init(ctx: PluginContext, extraConf: util.Map[String, String]):
Unit = {
// Initialize Backend.
- Backend.get().onExecutorStart(ctx)
+ Component.sorted().foreach(_.onExecutorStart(ctx))
}
/** Clean up and terminate this plugin. For example: close the native
engine. */
override def shutdown(): Unit = {
- Backend.get().onExecutorShutdown()
+ Component.sorted().reverse.foreach(_.onExecutorShutdown())
super.shutdown()
}
diff --git a/gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala
b/gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala
index f406a6ac4d..02a2a44349 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala
@@ -16,60 +16,11 @@
*/
package org.apache.gluten.backend
-import org.apache.gluten.extension.columnar.transition.ConventionFunc
-import org.apache.gluten.extension.injector.Injector
-
-import org.apache.spark.SparkContext
-import org.apache.spark.api.plugin.PluginContext
-
-import java.util.ServiceLoader
-
-import scala.collection.JavaConverters
-
-trait Backend {
- import Backend._
-
- /** Base information. */
- def name(): String
- def buildInfo(): BuildInfo
-
- /** Spark listeners. */
- def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {}
- def onDriverShutdown(): Unit = {}
- def onExecutorStart(pc: PluginContext): Unit = {}
- def onExecutorShutdown(): Unit = {}
+trait Backend extends Component {
/**
- * Overrides
[[org.apache.gluten.extension.columnar.transition.ConventionFunc]] Gluten is
using to
- * determine the convention (its row-based processing / columnar-batch
processing support) of a
- * plan with a user-defined function that accepts a plan then returns
convention type it outputs,
- * and input conventions it requires.
+ * Backends don't have dependencies. They are all considered root components
in the component DAG
+ * and will be loaded at the beginning.
*/
- def convFuncOverride(): ConventionFunc.Override =
ConventionFunc.Override.Empty
-
- /** Query planner rules. */
- def injectRules(injector: Injector): Unit
-}
-
-object Backend {
- private val backend: Backend = {
- val discoveredBackends =
-
JavaConverters.iterableAsScalaIterable(ServiceLoader.load(classOf[Backend])).toList
- discoveredBackends match {
- case Nil =>
- throw new IllegalStateException("Backend implementation not discovered
from JVM classpath")
- case head :: Nil =>
- head
- case backends =>
- val backendNames = backends.map(_.name())
- throw new IllegalStateException(
- s"More than one Backend implementation discovered from JVM
classpath: $backendNames")
- }
- }
-
- def get(): Backend = {
- backend
- }
-
- case class BuildInfo(name: String, branch: String, revision: String,
revisionTime: String)
+ final override def dependencies(): Seq[Class[_ <: Component]] = Nil
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backend/Component.scala
b/gluten-core/src/main/scala/org/apache/gluten/backend/Component.scala
new file mode 100644
index 0000000000..8670bede87
--- /dev/null
+++ b/gluten-core/src/main/scala/org/apache/gluten/backend/Component.scala
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backend
+
+import org.apache.gluten.extension.columnar.transition.ConventionFunc
+import org.apache.gluten.extension.injector.Injector
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.plugin.PluginContext
+
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
+import scala.collection.mutable
+
+/**
+ * The base API to inject user-defined logic to Gluten. To register a
component, its implementations
+ * should be placed to Gluten's classpath with a Java service file. Gluten
will discover all the
+ * component implementations then register them at the booting time.
+ *
+ * Experimental: This is not expected to be used in production yet. Use
[[Backend]] instead.
+ */
+@Experimental
+trait Component {
+ import Component._
+
+ private val uid = nextUid.getAndIncrement()
+ private val isRegistered = new AtomicBoolean(false)
+
+ def ensureRegistered(): Unit = {
+ if (!isRegistered.compareAndSet(false, true)) {
+ return
+ }
+ graph.add(this)
+ dependencies().foreach(req => graph.declareDependency(this, req))
+ }
+
+ /** Base information. */
+ def name(): String
+ def buildInfo(): BuildInfo
+ def dependencies(): Seq[Class[_ <: Component]]
+
+ /** Spark listeners. */
+ def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {}
+ def onDriverShutdown(): Unit = {}
+ def onExecutorStart(pc: PluginContext): Unit = {}
+ def onExecutorShutdown(): Unit = {}
+
+ /**
+ * Overrides
[[org.apache.gluten.extension.columnar.transition.ConventionFunc]] Gluten is
using to
+ * determine the convention (its row-based processing / columnar-batch
processing support) of a
+ * plan with a user-defined function that accepts a plan then returns
convention type it outputs,
+ * and input conventions it requires.
+ */
+ def convFuncOverride(): ConventionFunc.Override =
ConventionFunc.Override.Empty
+
+ /** Query planner rules. */
+ def injectRules(injector: Injector): Unit
+}
+
+object Component {
+ private val nextUid = new AtomicInteger()
+ private val graph: Graph = new Graph()
+
+ // format: off
+ /**
+ * Apply topology sort on all registered components in graph to get an
ordered list of
+ * components. The root nodes will be on the head side of the list, while
leaf nodes
+ * will be on the tail side of the list.
+ *
+ * Say if component-A depends on component-B while component-C requires
nothing, then the
+ * output order will be one of the following:
+ *
+ * 1. [component-B, component-A, component-C]
+ * 2. [component-C, component-B, component-A]
+ * 3. [component-B, component-C, component-A]
+ *
+ * By all means component B will be placed before component A because of the
declared
+ * dependency from component A to component B.
+ *
+ * @throws UnsupportedOperationException When cycles in dependency graph are
found.
+ */
+ // format: on
+ def sorted(): Seq[Component] = {
+ ensureAllComponentsRegistered()
+ graph.sorted()
+ }
+
+ private[backend] def sortedUnsafe(): Seq[Component] = {
+ graph.sorted()
+ }
+
+ private class Registry {
+ private val lookupByUid: mutable.Map[Int, Component] = mutable.Map()
+ private val lookupByClass: mutable.Map[Class[_ <: Component], Component] =
mutable.Map()
+
+ def register(comp: Component): Unit = synchronized {
+ val uid = comp.uid
+ val clazz = comp.getClass
+ require(!lookupByUid.contains(uid), s"Component UID $uid already
registered: ${comp.name()}")
+ require(
+ !lookupByClass.contains(clazz),
+ s"Component class $clazz already registered: ${comp.name()}")
+ lookupByUid += uid -> comp
+ lookupByClass += clazz -> comp
+ }
+
+ def isUidRegistered(uid: Int): Boolean = synchronized {
+ lookupByUid.contains(uid)
+ }
+
+ def isClassRegistered(clazz: Class[_ <: Component]): Boolean =
synchronized {
+ lookupByClass.contains(clazz)
+ }
+
+ def findByClass(clazz: Class[_ <: Component]): Component = synchronized {
+ require(lookupByClass.contains(clazz))
+ lookupByClass(clazz)
+ }
+
+ def findByUid(uid: Int): Component = synchronized {
+ require(lookupByUid.contains(uid))
+ lookupByUid(uid)
+ }
+
+ def allUids(): Seq[Int] = synchronized {
+ return lookupByUid.keys.toSeq
+ }
+ }
+
+ private class Graph {
+ import Graph._
+ private val registry: Registry = new Registry()
+ private val dependencies: mutable.Buffer[(Int, Class[_ <: Component])] =
mutable.Buffer()
+
+ private var sortedComponents: Option[Seq[Component]] = None
+
+ def add(comp: Component): Unit = synchronized {
+ require(
+ !registry.isUidRegistered(comp.uid),
+ s"Component UID ${comp.uid} already registered: ${comp.name()}")
+ require(
+ !registry.isClassRegistered(comp.getClass),
+ s"Component class ${comp.getClass} already registered: ${comp.name()}")
+ registry.register(comp)
+ sortedComponents = None
+ }
+
+ def declareDependency(comp: Component, dependencyCompClass: Class[_ <:
Component]): Unit =
+ synchronized {
+ require(registry.isUidRegistered(comp.uid))
+ require(registry.isClassRegistered(comp.getClass))
+ dependencies += comp.uid -> dependencyCompClass
+ sortedComponents = None
+ }
+
+ private def newLookup(): mutable.Map[Int, Node] = {
+ val lookup: mutable.Map[Int, Node] = mutable.Map()
+
+ registry.allUids().foreach {
+ uid =>
+ require(!lookup.contains(uid))
+ val n = new Node(uid)
+ lookup += uid -> n
+ }
+
+ dependencies.foreach {
+ case (uid, dependencyCompClass) =>
+ val dependencyUid = registry.findByClass(dependencyCompClass).uid
+ require(uid != dependencyUid)
+ require(lookup.contains(uid))
+ require(lookup.contains(dependencyUid))
+ val n = lookup(uid)
+ val r = lookup(dependencyUid)
+ require(!n.parents.contains(r.uid))
+ require(!r.children.contains(n.uid))
+ n.parents += r.uid -> r
+ r.children += n.uid -> n
+ }
+
+ lookup
+ }
+
+ def sorted(): Seq[Component] = synchronized {
+ if (sortedComponents.isDefined) {
+ return sortedComponents.get
+ }
+
+ val lookup: mutable.Map[Int, Node] = newLookup()
+
+ val out = mutable.Buffer[Component]()
+ val uidToNumParents = lookup.map { case (uid, node) => uid ->
node.parents.size }
+ val removalQueue = mutable.Queue[Int]()
+
+ // 1. Find out all nodes with zero parents then enqueue them.
+ uidToNumParents.filter(_._2 == 0).foreach(kv =>
removalQueue.enqueue(kv._1))
+
+ // 2. Loop to dequeue and remove nodes from the uid-to-num-parents map.
+ while (removalQueue.nonEmpty) {
+ val parentUid = removalQueue.dequeue()
+ val node = lookup(parentUid)
+ out += registry.findByUid(parentUid)
+ node.children.keys.foreach {
+ childUid =>
+ uidToNumParents += childUid -> (uidToNumParents(childUid) - 1)
+ val updatedNumParents = uidToNumParents(childUid)
+ assert(updatedNumParents >= 0)
+ if (updatedNumParents == 0) {
+ removalQueue.enqueue(childUid)
+ }
+ }
+ }
+
+ // 3. If there are still outstanding nodes (those are with more non-zero
parents) in the
+ // uid-to-num-parents map, then it means at least one cycle is found.
Report error if so.
+ if (uidToNumParents.exists(_._2 != 0)) {
+ val cycleNodes = uidToNumParents.filter(_._2 !=
0).keys.map(registry.findByUid)
+ val cycleNodeNames = cycleNodes.map(_.name()).mkString(", ")
+ throw new UnsupportedOperationException(
+ s"Cycle detected in the component graph: $cycleNodeNames")
+ }
+
+ // 4. Return the ordered nodes.
+ sortedComponents = Some(out.toSeq)
+ sortedComponents.get
+ }
+ }
+
+ private object Graph {
+ class Node(val uid: Int) {
+ val parents: mutable.Map[Int, Node] = mutable.Map()
+ val children: mutable.Map[Int, Node] = mutable.Map()
+ }
+ }
+
+ case class BuildInfo(name: String, branch: String, revision: String,
revisionTime: String)
+}
diff --git a/gluten-core/src/main/scala/org/apache/gluten/backend/package.scala
b/gluten-core/src/main/scala/org/apache/gluten/backend/package.scala
new file mode 100644
index 0000000000..a9981719a3
--- /dev/null
+++ b/gluten-core/src/main/scala/org/apache/gluten/backend/package.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten
+
+import org.apache.spark.internal.Logging
+
+import java.util.ServiceLoader
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+package object backend extends Logging {
+ private[backend] val allComponentsLoaded: AtomicBoolean = new
AtomicBoolean(false)
+
+ private[backend] def ensureAllComponentsRegistered(): Unit = {
+ if (!allComponentsLoaded.compareAndSet(false, true)) {
+ return
+ }
+
+ // Load all components in classpath.
+ val discoveredBackends = ServiceLoader.load(classOf[Backend]).asScala
+ val discoveredComponents = ServiceLoader.load(classOf[Component]).asScala
+ val all = discoveredBackends ++ discoveredComponents
+
+ // Register all components.
+ all.foreach(_.ensureRegistered())
+
+ // Output log so user could view the component loading order.
+ // Call #sortedUnsafe than on #sorted to avoid unnecessary recursion.
+ val components = Component.sortedUnsafe()
+ logInfo(s"Components registered within order:
${components.map(_.name()).mkString(", ")}")
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
index addcad8dd0..794f38365a 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala
@@ -17,7 +17,7 @@
package org.apache.gluten.extension
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backend.Backend
+import org.apache.gluten.backend.Component
import org.apache.gluten.extension.injector.Injector
import org.apache.spark.internal.Logging
@@ -47,7 +47,8 @@ private[gluten] class GlutenSessionExtensions
logDebug(s"Gluten is disabled by variable: glutenEnabledForThread:
$glutenEnabledForThread")
disabled
}
- Backend.get().injectRules(injector)
+ // Components should override Backend's rules. Hence, reversed injection
order is applied.
+ Component.sorted().reverse.foreach(_.injectRules(injector))
injector.inject()
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
index 67399e25d4..f1a325bc43 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.extension.columnar.enumerated
-import org.apache.gluten.backend.Backend
+import org.apache.gluten.backend.Component
import org.apache.gluten.exception.GlutenException
import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
import
org.apache.gluten.extension.columnar.enumerated.planner.GlutenOptimization
@@ -74,7 +74,8 @@ object EnumeratedTransform {
def static(): EnumeratedTransform = {
val exts = new SparkSessionExtensions()
val dummyInjector = new Injector(exts)
- Backend.get().injectRules(dummyInjector)
+ // Components should override Backend's rules. Hence, reversed injection
order is applied.
+ Component.sorted().reverse.foreach(_.injectRules(dummyInjector))
val session = SparkSession.getActiveSession.getOrElse(
throw new GlutenException(
"HeuristicTransform#static can only be called when an active Spark
session exists"))
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicTransform.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicTransform.scala
index aa13cd958c..e53c4cbf80 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicTransform.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicTransform.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.extension.columnar.heuristic
-import org.apache.gluten.backend.Backend
+import org.apache.gluten.backend.Component
import org.apache.gluten.exception.GlutenException
import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
@@ -88,7 +88,8 @@ object HeuristicTransform {
def static(): HeuristicTransform = {
val exts = new SparkSessionExtensions()
val dummyInjector = new Injector(exts)
- Backend.get().injectRules(dummyInjector)
+ // Components should override Backend's rules. Hence, reversed injection
order is applied.
+ Component.sorted().reverse.foreach(_.injectRules(dummyInjector))
val session = SparkSession.getActiveSession.getOrElse(
throw new GlutenException(
"HeuristicTransform#static can only be called when an active Spark
session exists"))
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
index 2be9271a1f..bb894c2af0 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.extension.columnar.transition
-import org.apache.gluten.backend.Backend
+import org.apache.gluten.backend.Component
import
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildConvention
import org.apache.gluten.sql.shims.SparkShimLoader
@@ -24,6 +24,7 @@ import org.apache.spark.sql.execution.{ColumnarToRowExec,
SparkPlan, UnionExec}
import org.apache.spark.sql.execution.adaptive.QueryStageExec
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
+import org.apache.spark.util.SparkTestUtil
/** ConventionFunc is a utility to derive [[Convention]] or [[ConventionReq]]
from a query plan. */
sealed trait ConventionFunc {
@@ -45,8 +46,9 @@ object ConventionFunc {
// For testing, to make things work without a backend loaded.
private var ignoreBackend: Boolean = false
- // Visible for testing
+ // Visible for testing.
def ignoreBackend[T](body: => T): T = synchronized {
+ assert(SparkTestUtil.isTesting)
assert(!ignoreBackend)
ignoreBackend = true
try {
@@ -68,7 +70,20 @@ object ConventionFunc {
return Override.Empty
}
}
- Backend.get().convFuncOverride()
+ // Components should override Backend's convention function. Hence,
reversed injection order
+ // is applied.
+ val overrides = Component.sorted().reverse.map(_.convFuncOverride())
+ new Override {
+ override val rowTypeOf: PartialFunction[SparkPlan, Convention.RowType] =
{
+ overrides.map(_.rowTypeOf).reduce((l, r) => l.orElse(r))
+ }
+ override val batchTypeOf: PartialFunction[SparkPlan,
Convention.BatchType] = {
+ overrides.map(_.batchTypeOf).reduce((l, r) => l.orElse(r))
+ }
+ override val conventionReqOf: PartialFunction[SparkPlan,
Seq[ConventionReq]] = {
+ overrides.map(_.conventionReqOf).reduce((l, r) => l.orElse(r))
+ }
+ }
}
private class BuiltinFunc(o: Override) extends ConventionFunc {
diff --git
a/gluten-core/src/main/scala/org/apache/spark/shuffle/GlutenShuffleManager.scala
b/gluten-core/src/main/scala/org/apache/spark/shuffle/GlutenShuffleManager.scala
index d38781675b..aa3f4e0958 100644
---
a/gluten-core/src/main/scala/org/apache/spark/shuffle/GlutenShuffleManager.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/shuffle/GlutenShuffleManager.scala
@@ -25,6 +25,10 @@ import org.apache.spark.annotation.Experimental
*
* A SPIP may cause refactoring of this class in the future:
* https://issues.apache.org/jira/browse/SPARK-45792
+ *
+ * Experimental: This is not expected to be used in production yet. Use
backend shuffle manager
+ * (e.g., ColumnarShuffleManager or other RSS shuffle manager provided in
Gluten's code
+ * base)instead.
*/
@Experimental
class GlutenShuffleManager(conf: SparkConf, isDriver: Boolean) extends
ShuffleManager {
diff --git
a/gluten-core/src/main/scala/org/apache/spark/shuffle/ShuffleManagerRegistry.scala
b/gluten-core/src/main/scala/org/apache/spark/shuffle/ShuffleManagerRegistry.scala
index 4310054caa..5b621a755d 100644
---
a/gluten-core/src/main/scala/org/apache/spark/shuffle/ShuffleManagerRegistry.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/shuffle/ShuffleManagerRegistry.scala
@@ -17,7 +17,7 @@
package org.apache.spark.shuffle
import org.apache.spark.SparkConf
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SparkTestUtil, Utils}
import scala.collection.mutable
@@ -49,8 +49,9 @@ class ShuffleManagerRegistry private[ShuffleManagerRegistry] {
}
}
- // Visible for testing
+ // Visible for testing.
private[shuffle] def clear(): Unit = {
+ assert(SparkTestUtil.isTesting)
this.synchronized {
classDeDup.clear()
all.clear()
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala
b/gluten-core/src/main/scala/org/apache/spark/util/SparkTestUtil.scala
similarity index 60%
copy from gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala
copy to gluten-core/src/main/scala/org/apache/spark/util/SparkTestUtil.scala
index bfb2465b12..4fc09cf17c 100644
--- a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkTestUtil.scala
@@ -14,22 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.runtime
+package org.apache.spark.util
-import org.apache.spark.task.{TaskResource, TaskResources}
-
-object Runtimes {
-
- /** Get or create the runtime which bound with Spark TaskContext. */
- def contextInstance(name: String): Runtime = {
- if (!TaskResources.inSparkTask()) {
- throw new IllegalStateException("This method must be called in a Spark
task.")
- }
-
- TaskResources.addResourceIfNotRegistered(name, () => create(name))
- }
-
- private def create(name: String): Runtime with TaskResource = {
- Runtime(name)
+object SparkTestUtil {
+ def isTesting: Boolean = {
+ Utils.isTesting
}
}
diff --git
a/gluten-core/src/test/scala/org/apache/gluten/backend/ComponentSuite.scala
b/gluten-core/src/test/scala/org/apache/gluten/backend/ComponentSuite.scala
new file mode 100644
index 0000000000..a6f8bf2a0c
--- /dev/null
+++ b/gluten-core/src/test/scala/org/apache/gluten/backend/ComponentSuite.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backend
+
+import org.apache.gluten.extension.injector.Injector
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.funsuite.AnyFunSuite
+
+class ComponentSuite extends AnyFunSuite with BeforeAndAfterAll {
+ import ComponentSuite._
+
+ private val d = new DummyComponentD()
+ d.ensureRegistered()
+ private val b = new DummyBackendB()
+ b.ensureRegistered()
+ private val a = new DummyBackendA()
+ a.ensureRegistered()
+ private val c = new DummyComponentC()
+ c.ensureRegistered()
+ private val e = new DummyComponentE()
+ e.ensureRegistered()
+
+ test("Load order - sanity") {
+ val possibleOrders =
+ Set(
+ Seq(a, b, c, d, e),
+ Seq(a, b, d, c, e),
+ Seq(b, a, c, d, e),
+ Seq(b, a, d, c, e)
+ )
+
+ assert(possibleOrders.contains(Component.sorted()))
+ }
+
+ test("Register again") {
+ assertThrows[IllegalArgumentException] {
+ new DummyBackendA().ensureRegistered()
+ }
+ }
+}
+
+object ComponentSuite {
+ private class DummyBackendA extends Backend {
+ override def name(): String = "dummy-backend-a"
+ override def buildInfo(): Component.BuildInfo =
+ Component.BuildInfo("DUMMY_BACKEND_A", "N/A", "N/A", "N/A")
+ override def injectRules(injector: Injector): Unit = {}
+ }
+
+ private class DummyBackendB extends Backend {
+ override def name(): String = "dummy-backend-b"
+ override def buildInfo(): Component.BuildInfo =
+ Component.BuildInfo("DUMMY_BACKEND_B", "N/A", "N/A", "N/A")
+ override def injectRules(injector: Injector): Unit = {}
+ }
+
+ private class DummyComponentC extends Component {
+ override def dependencies(): Seq[Class[_ <: Component]] =
classOf[DummyBackendA] :: Nil
+
+ override def name(): String = "dummy-component-c"
+ override def buildInfo(): Component.BuildInfo =
+ Component.BuildInfo("DUMMY_COMPONENT_C", "N/A", "N/A", "N/A")
+ override def injectRules(injector: Injector): Unit = {}
+ }
+
+ private class DummyComponentD extends Component {
+ override def dependencies(): Seq[Class[_ <: Component]] =
+ Seq(classOf[DummyBackendA], classOf[DummyBackendB])
+
+ override def name(): String = "dummy-component-d"
+ override def buildInfo(): Component.BuildInfo =
+ Component.BuildInfo("DUMMY_COMPONENT_D", "N/A", "N/A", "N/A")
+ override def injectRules(injector: Injector): Unit = {}
+ }
+
+ private class DummyComponentE extends Component {
+ override def dependencies(): Seq[Class[_ <: Component]] =
+ Seq(classOf[DummyBackendA], classOf[DummyComponentD])
+
+ override def name(): String = "dummy-component-e"
+ override def buildInfo(): Component.BuildInfo =
+ Component.BuildInfo("DUMMY_COMPONENT_E", "N/A", "N/A", "N/A")
+ override def injectRules(injector: Injector): Unit = {}
+ }
+}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
index 942058cc54..ab8ab36889 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala
@@ -16,14 +16,16 @@
*/
package org.apache.gluten.backendsapi
-import org.apache.gluten.backend.Backend
+import org.apache.gluten.backend.Component
object BackendsApiManager {
private lazy val backend: SubstraitBackend = initializeInternal()
/** Initialize all backends api. */
private def initializeInternal(): SubstraitBackend = {
- Backend.get().asInstanceOf[SubstraitBackend]
+ val loadedSubstraitBackends =
Component.sorted().filter(_.isInstanceOf[SubstraitBackend])
+ assert(loadedSubstraitBackends.size == 1, "More than one Substrait
backends are loaded")
+ loadedSubstraitBackends.head.asInstanceOf[SubstraitBackend]
}
/**
diff --git
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
index 84fac8ace6..d32da8d93b 100644
---
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
+++
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
@@ -17,6 +17,7 @@
package org.apache.spark.shuffle.writer;
import org.apache.gluten.GlutenConfig;
+import org.apache.gluten.backendsapi.BackendsApiManager;
import org.apache.gluten.columnarbatch.ColumnarBatches;
import org.apache.gluten.memory.memtarget.MemoryTarget;
import org.apache.gluten.memory.memtarget.Spiller;
@@ -70,7 +71,8 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> extends
RssShuffleWriter<K,
private int compressionBufferSize;
private final int partitionId;
- private final Runtime runtime =
Runtimes.contextInstance("UniffleShuffleWriter");
+ private final Runtime runtime =
+ Runtimes.contextInstance(BackendsApiManager.getBackendName(),
"UniffleShuffleWriter");
private final ShuffleWriterJniWrapper jniWrapper =
ShuffleWriterJniWrapper.create(runtime);
private final int nativeBufferSize = GlutenConfig.getConf().maxBatchSize();
private final int bufferSize;
@@ -144,7 +146,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
if (cb.numRows() == 0 || cb.numCols() == 0) {
LOG.info("Skip ColumnarBatch of 0 rows or 0 cols");
} else {
- long handle = ColumnarBatches.getNativeHandle(cb);
+ long handle =
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName(), cb);
if (nativeShuffleWriter == -1) {
nativeShuffleWriter =
jniWrapper.makeForRSS(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]