This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 9c036bb777 [GLUTEN-7459][VL] Move 3.2 / 3.3 Velox native file writer
code to `backend-velox` / `cpp/velox` (#7461)
9c036bb777 is described below
commit 9c036bb777aebd303a5c68c77e14914c4866b99c
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Oct 11 08:13:29 2024 +0800
[GLUTEN-7459][VL] Move 3.2 / 3.3 Velox native file writer code to
`backend-velox` / `cpp/velox` (#7461)
---
.github/workflows/velox_backend.yml | 2 +-
.../datasource/VeloxDataSourceJniWrapper.java | 15 +-
.../VeloxDataSourceUtil.scala} | 14 +-
.../velox/VeloxFormatWriterInjects.scala | 13 +-
cpp/core/compute/Runtime.h | 5 -
cpp/core/jni/JniWrapper.cc | 112 -----------
cpp/velox/CMakeLists.txt | 2 +-
cpp/velox/benchmarks/ParquetWriteBenchmark.cc | 8 +-
cpp/velox/compute/VeloxRuntime.cc | 21 +-
cpp/velox/compute/VeloxRuntime.h | 5 +-
cpp/velox/jni/VeloxJniWrapper.cc | 213 ++++++++++++++++-----
.../operators/writer/VeloxDataSource.h} | 7 +-
...quetDatasource.cc => VeloxParquetDataSource.cc} | 12 +-
...arquetDatasource.h => VeloxParquetDataSource.h} | 8 +-
...tasourceABFS.h => VeloxParquetDataSourceABFS.h} | 8 +-
...DatasourceGCS.h => VeloxParquetDataSourceGCS.h} | 8 +-
...tasourceHDFS.h => VeloxParquetDataSourceHDFS.h} | 8 +-
...etDatasourceS3.h => VeloxParquetDataSourceS3.h} | 8 +-
cpp/velox/tests/RuntimeTest.cc | 4 -
19 files changed, 230 insertions(+), 243 deletions(-)
diff --git a/.github/workflows/velox_backend.yml
b/.github/workflows/velox_backend.yml
index 9b0fb6dec8..c9460edbf6 100644
--- a/.github/workflows/velox_backend.yml
+++ b/.github/workflows/velox_backend.yml
@@ -332,7 +332,7 @@ jobs:
cd $GITHUB_WORKSPACE/tools/gluten-it
$MVN_CMD clean install -P${{ matrix.spark }}
GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh data-gen-only --local
--benchmark-type=ds -s=30.0 --threads=12
- - name: (TODO)TPC-DS SF30.0 Parquet local spark3.2 Q67/Q95 low memory,
memory isolation off
+ - name: (To be fixed) TPC-DS SF30.0 Parquet local spark3.2 Q67/Q95 low
memory, memory isolation off
continue-on-error: true
run: |
cd tools/gluten-it \
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/datasource/DatasourceJniWrapper.java
b/backends-velox/src/main/scala/org/apache/gluten/datasource/VeloxDataSourceJniWrapper.java
similarity index 75%
rename from
gluten-arrow/src/main/java/org/apache/gluten/datasource/DatasourceJniWrapper.java
rename to
backends-velox/src/main/scala/org/apache/gluten/datasource/VeloxDataSourceJniWrapper.java
index 7e4b98196c..23f071aff1 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/datasource/DatasourceJniWrapper.java
+++
b/backends-velox/src/main/scala/org/apache/gluten/datasource/VeloxDataSourceJniWrapper.java
@@ -25,16 +25,15 @@ import
org.apache.spark.sql.execution.datasources.BlockStripes;
import java.util.Map;
/** The jni file is at `cpp/core/jni/JniWrapper.cc` */
-// FIXME: move to module gluten-data?
-public class DatasourceJniWrapper implements RuntimeAware {
+public class VeloxDataSourceJniWrapper implements RuntimeAware {
private final Runtime runtime;
- private DatasourceJniWrapper(Runtime runtime) {
+ private VeloxDataSourceJniWrapper(Runtime runtime) {
this.runtime = runtime;
}
- public static DatasourceJniWrapper create(Runtime runtime) {
- return new DatasourceJniWrapper(runtime);
+ public static VeloxDataSourceJniWrapper create(Runtime runtime) {
+ return new VeloxDataSourceJniWrapper(runtime);
}
@Override
@@ -42,11 +41,11 @@ public class DatasourceJniWrapper implements RuntimeAware {
return runtime.getHandle();
}
- public long nativeInitDatasource(String filePath, long cSchema, Map<String,
String> options) {
- return nativeInitDatasource(filePath, cSchema,
ConfigUtil.serialize(options));
+ public long init(String filePath, long cSchema, Map<String, String> options)
{
+ return init(filePath, cSchema, ConfigUtil.serialize(options));
}
- public native long nativeInitDatasource(String filePath, long cSchema,
byte[] options);
+ public native long init(String filePath, long cSchema, byte[] options);
public native void inspectSchema(long dsHandle, long cSchemaAddress);
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/utils/DatasourceUtil.scala
b/backends-velox/src/main/scala/org/apache/gluten/datasource/VeloxDataSourceUtil.scala
similarity index 84%
rename from
backends-velox/src/main/scala/org/apache/gluten/utils/DatasourceUtil.scala
rename to
backends-velox/src/main/scala/org/apache/gluten/datasource/VeloxDataSourceUtil.scala
index 8963ce93c1..fe0d0eb0f8 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/utils/DatasourceUtil.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/datasource/VeloxDataSourceUtil.scala
@@ -14,11 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.utils
+package org.apache.gluten.datasource
-import org.apache.gluten.datasource.DatasourceJniWrapper
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.runtime.Runtimes
+import org.apache.gluten.utils.ArrowAbiUtil
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.utils.SparkSchemaUtil
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.FileStatus
import java.util
-object DatasourceUtil {
+object VeloxDataSourceUtil {
def readSchema(files: Seq[FileStatus]): Option[StructType] = {
if (files.isEmpty) {
throw new IllegalArgumentException("No input file specified")
@@ -39,11 +39,9 @@ object DatasourceUtil {
def readSchema(file: FileStatus): Option[StructType] = {
val allocator = ArrowBufferAllocators.contextInstance()
val runtime = Runtimes.contextInstance("VeloxWriter")
- val datasourceJniWrapper = DatasourceJniWrapper.create(runtime)
- val dsHandle = datasourceJniWrapper.nativeInitDatasource(
- file.getPath.toString,
- -1,
- new util.HashMap[String, String]())
+ val datasourceJniWrapper = VeloxDataSourceJniWrapper.create(runtime)
+ val dsHandle =
+ datasourceJniWrapper.init(file.getPath.toString, -1, new
util.HashMap[String, String]())
val cSchema = ArrowSchema.allocateNew(allocator)
datasourceJniWrapper.inspectSchema(dsHandle, cSchema.memoryAddress())
try {
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
index 65eca8a18c..91dde1c4c6 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
@@ -17,12 +17,12 @@
package org.apache.spark.sql.execution.datasources.velox
import org.apache.gluten.columnarbatch.{ColumnarBatches,
ColumnarBatchJniWrapper}
-import org.apache.gluten.datasource.DatasourceJniWrapper
+import org.apache.gluten.datasource.{VeloxDataSourceJniWrapper,
VeloxDataSourceUtil}
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.execution.datasource.GlutenRowSplitter
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.runtime.Runtimes
-import org.apache.gluten.utils.{ArrowAbiUtil, DatasourceUtil}
+import org.apache.gluten.utils.ArrowAbiUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
@@ -59,12 +59,11 @@ trait VeloxFormatWriterInjects extends
GlutenFormatWriterInjectsBase {
val cSchema =
ArrowSchema.allocateNew(ArrowBufferAllocators.contextInstance())
var dsHandle = -1L
val runtime = Runtimes.contextInstance("VeloxWriter")
- val datasourceJniWrapper = DatasourceJniWrapper.create(runtime)
+ val datasourceJniWrapper = VeloxDataSourceJniWrapper.create(runtime)
val allocator = ArrowBufferAllocators.contextInstance()
try {
ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema)
- dsHandle =
- datasourceJniWrapper.nativeInitDatasource(filePath,
cSchema.memoryAddress(), nativeConf)
+ dsHandle = datasourceJniWrapper.init(filePath, cSchema.memoryAddress(),
nativeConf)
} catch {
case e: IOException =>
throw new GlutenException(e)
@@ -105,7 +104,7 @@ trait VeloxFormatWriterInjects extends
GlutenFormatWriterInjectsBase {
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
- DatasourceUtil.readSchema(files)
+ VeloxDataSourceUtil.readSchema(files)
}
}
@@ -117,7 +116,7 @@ class VeloxRowSplitter extends GlutenRowSplitter {
reserve_partition_columns: Boolean = false): BlockStripes = {
val handler = ColumnarBatches.getNativeHandle(row.batch)
val runtime = Runtimes.contextInstance("VeloxPartitionWriter")
- val datasourceJniWrapper = DatasourceJniWrapper.create(runtime)
+ val datasourceJniWrapper = VeloxDataSourceJniWrapper.create(runtime)
val originalColumns: Array[Int] = Array.range(0, row.batch.numCols())
val dataColIndice =
originalColumns.filterNot(partitionColIndice.contains(_))
new VeloxBlockStripes(
diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index 60be778ef1..4aa67dfe7b 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -27,7 +27,6 @@
#include "operators/c2r/ColumnarToRow.h"
#include "operators/r2c/RowToColumnar.h"
#include "operators/serializer/ColumnarBatchSerializer.h"
-#include "operators/writer/Datasource.h"
#include "shuffle/ShuffleReader.h"
#include "shuffle/ShuffleWriter.h"
#include "substrait/plan.pb.h"
@@ -108,10 +107,6 @@ class Runtime : public
std::enable_shared_from_this<Runtime> {
virtual Metrics* getMetrics(ColumnarBatchIterator* rawIter, int64_t
exportNanos) = 0;
- virtual std::shared_ptr<Datasource> createDatasource(
- const std::string& filePath,
- std::shared_ptr<arrow::Schema> schema) = 0;
-
virtual std::shared_ptr<ShuffleReader> createShuffleReader(
std::shared_ptr<arrow::Schema> schema,
ShuffleReaderOptions options) = 0;
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 13ea8492cb..3077e9ed4d 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -25,8 +25,6 @@
#include "jni/JniCommon.h"
#include "jni/JniError.h"
-#include "operators/writer/Datasource.h"
-
#include <arrow/c/bridge.h>
#include <optional>
#include <string>
@@ -70,9 +68,6 @@ static jclass shuffleReaderMetricsClass;
static jmethodID shuffleReaderMetricsSetDecompressTime;
static jmethodID shuffleReaderMetricsSetDeserializeTime;
-static jclass blockStripesClass;
-static jmethodID blockStripesConstructor;
-
class JavaInputStreamAdaptor final : public arrow::io::InputStream {
public:
JavaInputStreamAdaptor(JNIEnv* env, arrow::MemoryPool* pool, jobject jniIn)
: pool_(pool) {
@@ -196,10 +191,6 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
shuffleReaderMetricsSetDeserializeTime =
getMethodIdOrError(env, shuffleReaderMetricsClass, "setDeserializeTime",
"(J)V");
- blockStripesClass =
- createGlobalClassReferenceOrError(env,
"Lorg/apache/spark/sql/execution/datasources/BlockStripes;");
- blockStripesConstructor = env->GetMethodID(blockStripesClass, "<init>",
"(J[J[II[B)V");
-
return jniVersion;
}
@@ -212,7 +203,6 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
env->DeleteGlobalRef(nativeColumnarToRowInfoClass);
env->DeleteGlobalRef(byteArrayClass);
env->DeleteGlobalRef(shuffleReaderMetricsClass);
- env->DeleteGlobalRef(blockStripesClass);
gluten::getJniErrorState()->close();
gluten::getJniCommonState()->close();
@@ -1044,108 +1034,6 @@ JNIEXPORT void JNICALL
Java_org_apache_gluten_vectorized_ShuffleReaderJniWrapper
JNI_METHOD_END()
}
-JNIEXPORT jlong JNICALL
Java_org_apache_gluten_datasource_DatasourceJniWrapper_nativeInitDatasource( //
NOLINT
- JNIEnv* env,
- jobject wrapper,
- jstring filePath,
- jlong cSchema,
- jbyteArray options) {
- JNI_METHOD_START
- auto ctx = gluten::getRuntime(env, wrapper);
-
- ObjectHandle handle = kInvalidObjectHandle;
-
- if (cSchema == -1) {
- // Only inspect the schema and not write
- handle = ctx->saveObject(ctx->createDatasource(jStringToCString(env,
filePath), nullptr));
- } else {
- auto safeArray = gluten::getByteArrayElementsSafe(env, options);
- auto datasourceOptions = gluten::parseConfMap(env, safeArray.elems(),
safeArray.length());
- auto& sparkConf = ctx->getConfMap();
- datasourceOptions.insert(sparkConf.begin(), sparkConf.end());
- auto schema =
gluten::arrowGetOrThrow(arrow::ImportSchema(reinterpret_cast<struct
ArrowSchema*>(cSchema)));
- handle = ctx->saveObject(ctx->createDatasource(jStringToCString(env,
filePath), schema));
- auto datasource = ObjectStore::retrieve<Datasource>(handle);
- datasource->init(datasourceOptions);
- }
-
- return handle;
- JNI_METHOD_END(kInvalidObjectHandle)
-}
-
-JNIEXPORT void JNICALL
Java_org_apache_gluten_datasource_DatasourceJniWrapper_inspectSchema( // NOLINT
- JNIEnv* env,
- jobject wrapper,
- jlong dsHandle,
- jlong cSchema) {
- JNI_METHOD_START
- auto datasource = ObjectStore::retrieve<Datasource>(dsHandle);
- datasource->inspectSchema(reinterpret_cast<struct ArrowSchema*>(cSchema));
- JNI_METHOD_END()
-}
-
-JNIEXPORT void JNICALL
Java_org_apache_gluten_datasource_DatasourceJniWrapper_close( // NOLINT
- JNIEnv* env,
- jobject wrapper,
- jlong dsHandle) {
- JNI_METHOD_START
- auto datasource = ObjectStore::retrieve<Datasource>(dsHandle);
- datasource->close();
- ObjectStore::release(dsHandle);
- JNI_METHOD_END()
-}
-
-JNIEXPORT void JNICALL
Java_org_apache_gluten_datasource_DatasourceJniWrapper_writeBatch( // NOLINT
- JNIEnv* env,
- jobject wrapper,
- jlong dsHandle,
- jlong batchHandle) {
- JNI_METHOD_START
- auto ctx = gluten::getRuntime(env, wrapper);
- auto datasource = ObjectStore::retrieve<Datasource>(dsHandle);
- auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
- datasource->write(batch);
- JNI_METHOD_END()
-}
-
-JNIEXPORT jobject JNICALL
-Java_org_apache_gluten_datasource_DatasourceJniWrapper_splitBlockByPartitionAndBucket(
// NOLINT
- JNIEnv* env,
- jobject wrapper,
- jlong batchHandle,
- jintArray partitionColIndice,
- jboolean hasBucket,
- jlong memoryManagerId) {
- JNI_METHOD_START
- auto ctx = gluten::getRuntime(env, wrapper);
- auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
- auto safeArray = gluten::getIntArrayElementsSafe(env, partitionColIndice);
- int size = env->GetArrayLength(partitionColIndice);
- std::vector<int32_t> partitionColIndiceVec;
- for (int i = 0; i < size; ++i) {
- partitionColIndiceVec.push_back(safeArray.elems()[i]);
- }
-
- auto result = batch->toUnsafeRow(0);
- auto rowBytes = result.data();
- auto newBatchHandle = ctx->saveObject(ctx->select(batch,
partitionColIndiceVec));
-
- auto bytesSize = result.size();
- jbyteArray bytesArray = env->NewByteArray(bytesSize);
- env->SetByteArrayRegion(bytesArray, 0, bytesSize,
reinterpret_cast<jbyte*>(rowBytes));
-
- jlongArray batchArray = env->NewLongArray(1);
- long* cBatchArray = new long[1];
- cBatchArray[0] = newBatchHandle;
- env->SetLongArrayRegion(batchArray, 0, 1, cBatchArray);
- delete[] cBatchArray;
-
- jobject blockStripes = env->NewObject(
- blockStripesClass, blockStripesConstructor, batchHandle, batchArray,
nullptr, batch->numColumns(), bytesArray);
- return blockStripes;
- JNI_METHOD_END(nullptr)
-}
-
JNIEXPORT jobject JNICALL
Java_org_apache_gluten_vectorized_ColumnarBatchSerializerJniWrapper_serialize(
// NOLINT
JNIEnv* env,
jobject wrapper,
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index 62ddecc571..39b3f46b06 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -187,7 +187,7 @@ set(VELOX_SRCS
operators/serializer/VeloxColumnarToRowConverter.cc
operators/serializer/VeloxColumnarBatchSerializer.cc
operators/serializer/VeloxRowToColumnarConverter.cc
- operators/writer/VeloxParquetDatasource.cc
+ operators/writer/VeloxParquetDataSource.cc
shuffle/VeloxShuffleReader.cc
shuffle/VeloxShuffleWriter.cc
shuffle/VeloxHashShuffleWriter.cc
diff --git a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
index b1118b6ae0..2369a13bd2 100644
--- a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
+++ b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
@@ -263,20 +263,20 @@ class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark
: public GoogleBenchmar
for (auto _ : state) {
// Init VeloxParquetDataSource
- auto veloxParquetDatasource =
std::make_unique<gluten::VeloxParquetDatasource>(
+ auto veloxParquetDataSource =
std::make_unique<gluten::VeloxParquetDataSource>(
outputPath_ + "/" + fileName,
veloxPool->addAggregateChild("writer_benchmark"),
veloxPool->addLeafChild("sink_pool"),
localSchema);
- veloxParquetDatasource->init(runtime->getConfMap());
+ veloxParquetDataSource->init(runtime->getConfMap());
auto start = std::chrono::steady_clock::now();
for (const auto& vector : vectors) {
- veloxParquetDatasource->write(vector);
+ veloxParquetDataSource->write(vector);
}
auto end = std::chrono::steady_clock::now();
writeTime += std::chrono::duration_cast<std::chrono::nanoseconds>(end -
start).count();
- veloxParquetDatasource->close();
+ veloxParquetDataSource->close();
}
state.counters["rowgroups"] =
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index 5b93fb7d53..e93dc2367e 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -36,20 +36,20 @@
#ifdef ENABLE_HDFS
-#include "operators/writer/VeloxParquetDatasourceHDFS.h"
+#include "operators/writer/VeloxParquetDataSourceHDFS.h"
#endif
#ifdef ENABLE_S3
-#include "operators/writer/VeloxParquetDatasourceS3.h"
+#include "operators/writer/VeloxParquetDataSourceS3.h"
#endif
#ifdef ENABLE_GCS
-#include "operators/writer/VeloxParquetDatasourceGCS.h"
+#include "operators/writer/VeloxParquetDataSourceGCS.h"
#endif
#ifdef ENABLE_ABFS
-#include "operators/writer/VeloxParquetDatasourceABFS.h"
+#include "operators/writer/VeloxParquetDataSourceABFS.h"
#endif
using namespace facebook;
@@ -204,44 +204,45 @@ std::shared_ptr<ShuffleWriter>
VeloxRuntime::createShuffleWriter(
return shuffleWriter;
}
-std::shared_ptr<Datasource> VeloxRuntime::createDatasource(
+std::shared_ptr<VeloxDataSource> VeloxRuntime::createDataSource(
const std::string& filePath,
std::shared_ptr<arrow::Schema> schema) {
static std::atomic_uint32_t id{0UL};
auto veloxPool =
vmm_->getAggregateMemoryPool()->addAggregateChild("datasource." +
std::to_string(id++));
// Pass a dedicate pool for S3 and GCS sinks as can't share veloxPool
// with parquet writer.
+ // FIXME: Check file formats?
auto sinkPool = vmm_->getLeafMemoryPool();
if (isSupportedHDFSPath(filePath)) {
#ifdef ENABLE_HDFS
- return std::make_shared<VeloxParquetDatasourceHDFS>(filePath, veloxPool,
sinkPool, schema);
+ return std::make_shared<VeloxParquetDataSourceHDFS>(filePath, veloxPool,
sinkPool, schema);
#else
throw std::runtime_error(
"The write path is hdfs path but the HDFS haven't been enabled when
writing parquet data in velox runtime!");
#endif
} else if (isSupportedS3SdkPath(filePath)) {
#ifdef ENABLE_S3
- return std::make_shared<VeloxParquetDatasourceS3>(filePath, veloxPool,
sinkPool, schema);
+ return std::make_shared<VeloxParquetDataSourceS3>(filePath, veloxPool,
sinkPool, schema);
#else
throw std::runtime_error(
"The write path is S3 path but the S3 haven't been enabled when
writing parquet data in velox runtime!");
#endif
} else if (isSupportedGCSPath(filePath)) {
#ifdef ENABLE_GCS
- return std::make_shared<VeloxParquetDatasourceGCS>(filePath, veloxPool,
sinkPool, schema);
+ return std::make_shared<VeloxParquetDataSourceGCS>(filePath, veloxPool,
sinkPool, schema);
#else
throw std::runtime_error(
"The write path is GCS path but the GCS haven't been enabled when
writing parquet data in velox runtime!");
#endif
} else if (isSupportedABFSPath(filePath)) {
#ifdef ENABLE_ABFS
- return std::make_shared<VeloxParquetDatasourceABFS>(filePath, veloxPool,
sinkPool, schema);
+ return std::make_shared<VeloxParquetDataSourceABFS>(filePath, veloxPool,
sinkPool, schema);
#else
throw std::runtime_error(
"The write path is ABFS path but the ABFS haven't been enabled when
writing parquet data in velox runtime!");
#endif
}
- return std::make_shared<VeloxParquetDatasource>(filePath, veloxPool,
sinkPool, schema);
+ return std::make_shared<VeloxParquetDataSource>(filePath, veloxPool,
sinkPool, schema);
}
std::shared_ptr<ShuffleReader> VeloxRuntime::createShuffleReader(
diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h
index 74f59639d7..405bbae635 100644
--- a/cpp/velox/compute/VeloxRuntime.h
+++ b/cpp/velox/compute/VeloxRuntime.h
@@ -22,7 +22,7 @@
#include "memory/VeloxMemoryManager.h"
#include "operators/serializer/VeloxColumnarBatchSerializer.h"
#include "operators/serializer/VeloxColumnarToRowConverter.h"
-#include "operators/writer/VeloxParquetDatasource.h"
+#include "operators/writer/VeloxParquetDataSource.h"
#include "shuffle/ShuffleReader.h"
#include "shuffle/ShuffleWriter.h"
@@ -68,8 +68,7 @@ class VeloxRuntime final : public Runtime {
return iter->getMetrics(exportNanos);
}
- std::shared_ptr<Datasource> createDatasource(const std::string& filePath,
std::shared_ptr<arrow::Schema> schema)
- override;
+ std::shared_ptr<VeloxDataSource> createDataSource(const std::string&
filePath, std::shared_ptr<arrow::Schema> schema);
std::shared_ptr<ShuffleReader> createShuffleReader(
std::shared_ptr<arrow::Schema> schema,
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 22136ad297..b8d2b0c3c2 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -38,22 +38,30 @@
#include <iostream>
+using namespace gluten;
using namespace facebook;
#ifdef __cplusplus
extern "C" {
#endif
+static jclass blockStripesClass;
+static jmethodID blockStripesConstructor;
+
jint JNI_OnLoad(JavaVM* vm, void*) {
JNIEnv* env;
if (vm->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) {
return JNI_ERR;
}
- gluten::getJniCommonState()->ensureInitialized(env);
- gluten::getJniErrorState()->ensureInitialized(env);
- gluten::initVeloxJniFileSystem(env);
- gluten::initVeloxJniUDF(env);
+ getJniCommonState()->ensureInitialized(env);
+ getJniErrorState()->ensureInitialized(env);
+ initVeloxJniFileSystem(env);
+ initVeloxJniUDF(env);
+
+ blockStripesClass =
+ createGlobalClassReferenceOrError(env,
"Lorg/apache/spark/sql/execution/datasources/BlockStripes;");
+ blockStripesConstructor = env->GetMethodID(blockStripesClass, "<init>",
"(J[J[II[B)V");
DLOG(INFO) << "Loaded Velox backend.";
@@ -63,10 +71,13 @@ jint JNI_OnLoad(JavaVM* vm, void*) {
void JNI_OnUnload(JavaVM* vm, void*) {
JNIEnv* env;
vm->GetEnv(reinterpret_cast<void**>(&env), jniVersion);
- gluten::finalizeVeloxJniUDF(env);
- gluten::finalizeVeloxJniFileSystem(env);
- gluten::getJniErrorState()->close();
- gluten::getJniCommonState()->close();
+
+ env->DeleteGlobalRef(blockStripesClass);
+
+ finalizeVeloxJniUDF(env);
+ finalizeVeloxJniFileSystem(env);
+ getJniErrorState()->close();
+ getJniCommonState()->close();
google::ShutdownGoogleLogging();
}
@@ -75,9 +86,9 @@ JNIEXPORT void JNICALL
Java_org_apache_gluten_init_NativeBackendInitializer_init
jclass,
jbyteArray conf) {
JNI_METHOD_START
- auto safeArray = gluten::getByteArrayElementsSafe(env, conf);
- auto sparkConf = gluten::parseConfMap(env, safeArray.elems(),
safeArray.length());
- gluten::VeloxBackend::create(sparkConf);
+ auto safeArray = getByteArrayElementsSafe(env, conf);
+ auto sparkConf = parseConfMap(env, safeArray.elems(), safeArray.length());
+ VeloxBackend::create(sparkConf);
JNI_METHOD_END()
}
@@ -85,7 +96,7 @@ JNIEXPORT void JNICALL
Java_org_apache_gluten_init_NativeBackendInitializer_shut
JNIEnv* env,
jclass) {
JNI_METHOD_START
- gluten::VeloxBackend::get()->tearDown();
+ VeloxBackend::get()->tearDown();
JNI_METHOD_END()
}
@@ -93,7 +104,7 @@ JNIEXPORT void JNICALL
Java_org_apache_gluten_udf_UdfJniWrapper_registerFunction
JNIEnv* env,
jclass) {
JNI_METHOD_START
- gluten::jniRegisterFunctionSignatures(env);
+ jniRegisterFunctionSignatures(env);
JNI_METHOD_END()
}
@@ -103,14 +114,14 @@
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeValidateWithFail
jobject wrapper,
jbyteArray planArray) {
JNI_METHOD_START
- auto ctx = gluten::getRuntime(env, wrapper);
- auto safeArray = gluten::getByteArrayElementsSafe(env, planArray);
+ auto ctx = getRuntime(env, wrapper);
+ auto safeArray = getByteArrayElementsSafe(env, planArray);
auto planData = safeArray.elems();
auto planSize = env->GetArrayLength(planArray);
- auto runtime = dynamic_cast<gluten::VeloxRuntime*>(ctx);
+ auto runtime = dynamic_cast<VeloxRuntime*>(ctx);
if (runtime->debugModeEnabled()) {
try {
- auto jsonPlan = gluten::substraitFromPbToJson("Plan", planData,
planSize, std::nullopt);
+ auto jsonPlan = substraitFromPbToJson("Plan", planData, planSize,
std::nullopt);
LOG(INFO) << std::string(50, '#') << " received substrait::Plan: for
validation";
LOG(INFO) << jsonPlan;
} catch (const std::exception& e) {
@@ -119,21 +130,21 @@
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeValidateWithFail
}
::substrait::Plan subPlan;
- gluten::parseProtobuf(planData, planSize, &subPlan);
+ parseProtobuf(planData, planSize, &subPlan);
// A query context with dummy configs. Used for function validation.
std::unordered_map<std::string, std::string> configs{
{velox::core::QueryConfig::kSparkPartitionId, "0"},
{velox::core::QueryConfig::kSessionTimezone, "GMT"}};
auto queryCtx = velox::core::QueryCtx::create(nullptr,
velox::core::QueryConfig(configs));
- auto pool = gluten::defaultLeafVeloxMemoryPool().get();
+ auto pool = defaultLeafVeloxMemoryPool().get();
// An execution context used for function validation.
velox::core::ExecCtx execCtx(pool, queryCtx.get());
- gluten::SubstraitToVeloxPlanValidator planValidator(pool, &execCtx);
+ SubstraitToVeloxPlanValidator planValidator(pool, &execCtx);
jclass infoCls =
env->FindClass("Lorg/apache/gluten/validate/NativePlanValidationInfo;");
if (infoCls == nullptr) {
std::string errorMessage = "Unable to CreateGlobalClassReferenceOrError
for NativePlanValidationInfo";
- throw gluten::GlutenException(errorMessage);
+ throw GlutenException(errorMessage);
}
jmethodID method = env->GetMethodID(infoCls, "<init>",
"(ILjava/lang/String;)V");
try {
@@ -158,13 +169,13 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_columnarbatch_VeloxColumnarBatchJ
jobject wrapper,
jlong handle) {
JNI_METHOD_START
- auto ctx = gluten::getRuntime(env, wrapper);
- auto runtime = dynamic_cast<gluten::VeloxRuntime*>(ctx);
+ auto ctx = getRuntime(env, wrapper);
+ auto runtime = dynamic_cast<VeloxRuntime*>(ctx);
- auto batch = gluten::ObjectStore::retrieve<gluten::ColumnarBatch>(handle);
- auto newBatch =
gluten::VeloxColumnarBatch::from(runtime->memoryManager()->getLeafMemoryPool().get(),
batch);
+ auto batch = ObjectStore::retrieve<ColumnarBatch>(handle);
+ auto newBatch =
VeloxColumnarBatch::from(runtime->memoryManager()->getLeafMemoryPool().get(),
batch);
return ctx->saveObject(newBatch);
- JNI_METHOD_END(gluten::kInvalidObjectHandle)
+ JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT jlong JNICALL
Java_org_apache_gluten_columnarbatch_VeloxColumnarBatchJniWrapper_compose( //
NOLINT
@@ -172,22 +183,21 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_columnarbatch_VeloxColumnarBatchJ
jobject wrapper,
jlongArray batchHandles) {
JNI_METHOD_START
- auto ctx = gluten::getRuntime(env, wrapper);
- auto runtime = dynamic_cast<gluten::VeloxRuntime*>(ctx);
+ auto ctx = getRuntime(env, wrapper);
+ auto runtime = dynamic_cast<VeloxRuntime*>(ctx);
int handleCount = env->GetArrayLength(batchHandles);
- auto safeArray = gluten::getLongArrayElementsSafe(env, batchHandles);
+ auto safeArray = getLongArrayElementsSafe(env, batchHandles);
- std::vector<std::shared_ptr<gluten::ColumnarBatch>> batches;
+ std::vector<std::shared_ptr<ColumnarBatch>> batches;
for (int i = 0; i < handleCount; ++i) {
int64_t handle = safeArray.elems()[i];
- auto batch = gluten::ObjectStore::retrieve<gluten::ColumnarBatch>(handle);
+ auto batch = ObjectStore::retrieve<ColumnarBatch>(handle);
batches.push_back(batch);
}
- auto newBatch =
-
gluten::VeloxColumnarBatch::compose(runtime->memoryManager()->getLeafMemoryPool().get(),
std::move(batches));
+ auto newBatch =
VeloxColumnarBatch::compose(runtime->memoryManager()->getLeafMemoryPool().get(),
std::move(batches));
return ctx->saveObject(newBatch);
- JNI_METHOD_END(gluten::kInvalidObjectHandle)
+ JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT jlong JNICALL
Java_org_apache_gluten_utils_VeloxBloomFilterJniWrapper_empty( // NOLINT
@@ -195,12 +205,12 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_utils_VeloxBloomFilterJniWrapper_
jobject wrapper,
jint capacity) {
JNI_METHOD_START
- auto ctx = gluten::getRuntime(env, wrapper);
+ auto ctx = getRuntime(env, wrapper);
auto filter =
std::make_shared<velox::BloomFilter<std::allocator<uint64_t>>>();
filter->reset(capacity);
GLUTEN_CHECK(filter->isSet(), "Bloom-filter is not initialized");
return ctx->saveObject(filter);
- JNI_METHOD_END(gluten::kInvalidObjectHandle)
+ JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT jlong JNICALL
Java_org_apache_gluten_utils_VeloxBloomFilterJniWrapper_init( // NOLINT
@@ -208,13 +218,13 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_utils_VeloxBloomFilterJniWrapper_
jobject wrapper,
jbyteArray data) {
JNI_METHOD_START
- auto safeArray = gluten::getByteArrayElementsSafe(env, data);
- auto ctx = gluten::getRuntime(env, wrapper);
+ auto safeArray = getByteArrayElementsSafe(env, data);
+ auto ctx = getRuntime(env, wrapper);
auto filter =
std::make_shared<velox::BloomFilter<std::allocator<uint64_t>>>();
uint8_t* serialized = safeArray.elems();
filter->merge(reinterpret_cast<char*>(serialized));
return ctx->saveObject(filter);
- JNI_METHOD_END(gluten::kInvalidObjectHandle)
+ JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT void JNICALL
Java_org_apache_gluten_utils_VeloxBloomFilterJniWrapper_insertLong( // NOLINT
@@ -223,7 +233,7 @@ JNIEXPORT void JNICALL
Java_org_apache_gluten_utils_VeloxBloomFilterJniWrapper_i
jlong handle,
jlong item) {
JNI_METHOD_START
- auto filter =
gluten::ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(handle);
+ auto filter =
ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(handle);
GLUTEN_CHECK(filter->isSet(), "Bloom-filter is not initialized");
filter->insert(folly::hasher<int64_t>()(item));
JNI_METHOD_END()
@@ -235,7 +245,7 @@ JNIEXPORT jboolean JNICALL
Java_org_apache_gluten_utils_VeloxBloomFilterJniWrapp
jlong handle,
jlong item) {
JNI_METHOD_START
- auto filter =
gluten::ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(handle);
+ auto filter =
ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(handle);
GLUTEN_CHECK(filter->isSet(), "Bloom-filter is not initialized");
bool out = filter->mayContain(folly::hasher<int64_t>()(item));
return out;
@@ -259,8 +269,8 @@ JNIEXPORT void JNICALL
Java_org_apache_gluten_utils_VeloxBloomFilterJniWrapper_m
jlong handle,
jlong other) {
JNI_METHOD_START
- auto to =
gluten::ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(handle);
- auto from =
gluten::ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(other);
+ auto to =
ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(handle);
+ auto from =
ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(other);
GLUTEN_CHECK(to->isSet(), "Bloom-filter is not initialized");
GLUTEN_CHECK(from->isSet(), "Bloom-filter is not initialized");
std::vector<char> serialized = serialize(from.get());
@@ -273,7 +283,7 @@ JNIEXPORT jbyteArray JNICALL
Java_org_apache_gluten_utils_VeloxBloomFilterJniWra
jobject wrapper,
jlong handle) {
JNI_METHOD_START
- auto filter =
gluten::ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(handle);
+ auto filter =
ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(handle);
GLUTEN_CHECK(filter->isSet(), "Bloom-filter is not initialized");
std::vector<char> buffer = serialize(filter.get());
auto size = buffer.capacity();
@@ -290,13 +300,13 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_utils_VeloxBatchResizerJniWrapper
jint maxOutputBatchSize,
jobject jIter) {
JNI_METHOD_START
- auto ctx = gluten::getRuntime(env, wrapper);
- auto pool =
dynamic_cast<gluten::VeloxMemoryManager*>(ctx->memoryManager())->getLeafMemoryPool();
- auto iter = gluten::makeJniColumnarBatchIterator(env, jIter, ctx, nullptr);
- auto appender = std::make_shared<gluten::ResultIterator>(
- std::make_unique<gluten::VeloxBatchResizer>(pool.get(),
minOutputBatchSize, maxOutputBatchSize, std::move(iter)));
+ auto ctx = getRuntime(env, wrapper);
+ auto pool =
dynamic_cast<VeloxMemoryManager*>(ctx->memoryManager())->getLeafMemoryPool();
+ auto iter = makeJniColumnarBatchIterator(env, jIter, ctx, nullptr);
+ auto appender = std::make_shared<ResultIterator>(
+ std::make_unique<VeloxBatchResizer>(pool.get(), minOutputBatchSize,
maxOutputBatchSize, std::move(iter)));
return ctx->saveObject(appender);
- JNI_METHOD_END(gluten::kInvalidObjectHandle)
+ JNI_METHOD_END(kInvalidObjectHandle)
}
JNIEXPORT jboolean JNICALL
@@ -317,6 +327,109 @@
Java_org_apache_gluten_utils_VeloxFileSystemValidationJniWrapper_allSupportedByR
JNI_METHOD_END(false)
}
+JNIEXPORT jlong JNICALL
Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapper_init( // NOLINT
+ JNIEnv* env,
+ jobject wrapper,
+ jstring filePath,
+ jlong cSchema,
+ jbyteArray options) {
+ JNI_METHOD_START
+ auto ctx = gluten::getRuntime(env, wrapper);
+ auto runtime = dynamic_cast<VeloxRuntime*>(ctx);
+
+ ObjectHandle handle = kInvalidObjectHandle;
+
+ if (cSchema == -1) {
+ // Only inspect the schema and not write
+ handle = ctx->saveObject(runtime->createDataSource(jStringToCString(env,
filePath), nullptr));
+ } else {
+ auto safeArray = gluten::getByteArrayElementsSafe(env, options);
+ auto datasourceOptions = gluten::parseConfMap(env, safeArray.elems(),
safeArray.length());
+ auto& sparkConf = ctx->getConfMap();
+ datasourceOptions.insert(sparkConf.begin(), sparkConf.end());
+ auto schema =
gluten::arrowGetOrThrow(arrow::ImportSchema(reinterpret_cast<struct
ArrowSchema*>(cSchema)));
+ handle = ctx->saveObject(runtime->createDataSource(jStringToCString(env,
filePath), schema));
+ auto datasource = ObjectStore::retrieve<VeloxDataSource>(handle);
+ datasource->init(datasourceOptions);
+ }
+
+ return handle;
+ JNI_METHOD_END(kInvalidObjectHandle)
+}
+
+JNIEXPORT void JNICALL
Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapper_inspectSchema( //
NOLINT
+ JNIEnv* env,
+ jobject wrapper,
+ jlong dsHandle,
+ jlong cSchema) {
+ JNI_METHOD_START
+ auto datasource = ObjectStore::retrieve<VeloxDataSource>(dsHandle);
+ datasource->inspectSchema(reinterpret_cast<struct ArrowSchema*>(cSchema));
+ JNI_METHOD_END()
+}
+
+JNIEXPORT void JNICALL
Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapper_close( // NOLINT
+ JNIEnv* env,
+ jobject wrapper,
+ jlong dsHandle) {
+ JNI_METHOD_START
+ auto datasource = ObjectStore::retrieve<VeloxDataSource>(dsHandle);
+ datasource->close();
+ ObjectStore::release(dsHandle);
+ JNI_METHOD_END()
+}
+
+JNIEXPORT void JNICALL
Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapper_writeBatch( //
NOLINT
+ JNIEnv* env,
+ jobject wrapper,
+ jlong dsHandle,
+ jlong batchHandle) {
+ JNI_METHOD_START
+ auto ctx = gluten::getRuntime(env, wrapper);
+ auto datasource = ObjectStore::retrieve<VeloxDataSource>(dsHandle);
+ auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
+ datasource->write(batch);
+ JNI_METHOD_END()
+}
+
+JNIEXPORT jobject JNICALL
+Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapper_splitBlockByPartitionAndBucket(
// NOLINT
+ JNIEnv* env,
+ jobject wrapper,
+ jlong batchHandle,
+ jintArray partitionColIndice,
+ jboolean hasBucket,
+ jlong memoryManagerId) {
+ JNI_METHOD_START
+ auto ctx = gluten::getRuntime(env, wrapper);
+ auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
+ auto safeArray = gluten::getIntArrayElementsSafe(env, partitionColIndice);
+ int size = env->GetArrayLength(partitionColIndice);
+ std::vector<int32_t> partitionColIndiceVec;
+ for (int i = 0; i < size; ++i) {
+ partitionColIndiceVec.push_back(safeArray.elems()[i]);
+ }
+
+ auto result = batch->toUnsafeRow(0);
+ auto rowBytes = result.data();
+ auto newBatchHandle = ctx->saveObject(ctx->select(batch,
partitionColIndiceVec));
+
+ auto bytesSize = result.size();
+ jbyteArray bytesArray = env->NewByteArray(bytesSize);
+ env->SetByteArrayRegion(bytesArray, 0, bytesSize,
reinterpret_cast<jbyte*>(rowBytes));
+
+ jlongArray batchArray = env->NewLongArray(1);
+ long* cBatchArray = new long[1];
+ cBatchArray[0] = newBatchHandle;
+ env->SetLongArrayRegion(batchArray, 0, 1, cBatchArray);
+ delete[] cBatchArray;
+
+ jobject blockStripes = env->NewObject(
+ blockStripesClass, blockStripesConstructor, batchHandle, batchArray,
nullptr, batch->numColumns(), bytesArray);
+ return blockStripes;
+ JNI_METHOD_END(nullptr)
+}
+
#ifdef __cplusplus
}
#endif
diff --git a/cpp/core/operators/writer/Datasource.h
b/cpp/velox/operators/writer/VeloxDataSource.h
similarity index 89%
rename from cpp/core/operators/writer/Datasource.h
rename to cpp/velox/operators/writer/VeloxDataSource.h
index 0a14bcf7bb..043f1b8b17 100644
--- a/cpp/core/operators/writer/Datasource.h
+++ b/cpp/velox/operators/writer/VeloxDataSource.h
@@ -24,16 +24,15 @@
#include <arrow/type_fwd.h>
#include "memory/ColumnarBatch.h"
-#include "operators/writer/Datasource.h"
namespace gluten {
-class Datasource {
+class VeloxDataSource {
public:
- Datasource(const std::string& filePath, std::shared_ptr<arrow::Schema>
schema)
+ VeloxDataSource(const std::string& filePath, std::shared_ptr<arrow::Schema>
schema)
: filePath_(filePath), schema_(schema) {}
- virtual ~Datasource() = default;
+ virtual ~VeloxDataSource() = default;
virtual void init(const std::unordered_map<std::string, std::string>&
sparkConfs) {}
virtual void inspectSchema(struct ArrowSchema* out) = 0;
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasource.cc
b/cpp/velox/operators/writer/VeloxParquetDataSource.cc
similarity index 94%
rename from cpp/velox/operators/writer/VeloxParquetDatasource.cc
rename to cpp/velox/operators/writer/VeloxParquetDataSource.cc
index c538a04370..aeec1b4c82 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasource.cc
+++ b/cpp/velox/operators/writer/VeloxParquetDataSource.cc
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-#include "VeloxParquetDatasource.h"
+#include "VeloxParquetDataSource.h"
#include <arrow/buffer.h>
#include <cstring>
@@ -43,7 +43,7 @@ namespace {
const int32_t kGzipWindowBits4k = 12;
}
-void VeloxParquetDatasource::initSink(const std::unordered_map<std::string,
std::string>& /* sparkConfs */) {
+void VeloxParquetDataSource::initSink(const std::unordered_map<std::string,
std::string>& /* sparkConfs */) {
if (strncmp(filePath_.c_str(), "file:", 5) == 0) {
sink_ = dwio::common::FileSink::create(filePath_, {.pool = pool_.get()});
} else {
@@ -51,7 +51,7 @@ void VeloxParquetDatasource::initSink(const
std::unordered_map<std::string, std:
}
}
-void VeloxParquetDatasource::init(const std::unordered_map<std::string,
std::string>& sparkConfs) {
+void VeloxParquetDataSource::init(const std::unordered_map<std::string,
std::string>& sparkConfs) {
initSink(sparkConfs);
if (sparkConfs.find(kParquetBlockSize) != sparkConfs.end()) {
@@ -103,7 +103,7 @@ void VeloxParquetDatasource::init(const
std::unordered_map<std::string, std::str
parquetWriter_ = std::make_unique<velox::parquet::Writer>(std::move(sink_),
writeOption, pool_, asRowType(schema));
}
-void VeloxParquetDatasource::inspectSchema(struct ArrowSchema* out) {
+void VeloxParquetDataSource::inspectSchema(struct ArrowSchema* out) {
velox::dwio::common::ReaderOptions readerOptions(pool_.get());
auto format = velox::dwio::common::FileFormat::PARQUET;
readerOptions.setFileFormat(format);
@@ -121,13 +121,13 @@ void VeloxParquetDatasource::inspectSchema(struct
ArrowSchema* out) {
toArrowSchema(reader->rowType(), pool_.get(), out);
}
-void VeloxParquetDatasource::close() {
+void VeloxParquetDataSource::close() {
if (parquetWriter_) {
parquetWriter_->close();
}
}
-void VeloxParquetDatasource::write(const std::shared_ptr<ColumnarBatch>& cb) {
+void VeloxParquetDataSource::write(const std::shared_ptr<ColumnarBatch>& cb) {
auto veloxBatch = std::dynamic_pointer_cast<VeloxColumnarBatch>(cb);
VELOX_DCHECK(veloxBatch != nullptr, "Write batch should be
VeloxColumnarBatch");
parquetWriter_->write(veloxBatch->getFlattenedRowVector());
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasource.h
b/cpp/velox/operators/writer/VeloxParquetDataSource.h
similarity index 94%
rename from cpp/velox/operators/writer/VeloxParquetDatasource.h
rename to cpp/velox/operators/writer/VeloxParquetDataSource.h
index 15541f8a87..e7428999f0 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasource.h
+++ b/cpp/velox/operators/writer/VeloxParquetDataSource.h
@@ -29,7 +29,7 @@
#include "memory/ColumnarBatch.h"
#include "memory/VeloxColumnarBatch.h"
-#include "operators/writer/Datasource.h"
+#include "operators/writer/VeloxDataSource.h"
#include "velox/common/file/FileSystems.h"
#ifdef ENABLE_S3
@@ -79,14 +79,14 @@ inline bool isSupportedABFSPath(const std::string&
filePath) {
return strncmp(filePath.c_str(), "abfs:", 5) == 0 ||
strncmp(filePath.c_str(), "abfss:", 6) == 0;
}
-class VeloxParquetDatasource : public Datasource {
+class VeloxParquetDataSource : public VeloxDataSource {
public:
- VeloxParquetDatasource(
+ VeloxParquetDataSource(
const std::string& filePath,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
std::shared_ptr<arrow::Schema> schema)
- : Datasource(filePath, schema), filePath_(filePath), schema_(schema),
pool_(std::move(veloxPool)) {}
+ : VeloxDataSource(filePath, schema), filePath_(filePath),
schema_(schema), pool_(std::move(veloxPool)) {}
void init(const std::unordered_map<std::string, std::string>& sparkConfs)
override;
virtual void initSink(const std::unordered_map<std::string, std::string>&
sparkConfs);
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceABFS.h
b/cpp/velox/operators/writer/VeloxParquetDataSourceABFS.h
similarity index 89%
rename from cpp/velox/operators/writer/VeloxParquetDatasourceABFS.h
rename to cpp/velox/operators/writer/VeloxParquetDataSourceABFS.h
index b542e42bab..63658cc54e 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasourceABFS.h
+++ b/cpp/velox/operators/writer/VeloxParquetDataSourceABFS.h
@@ -17,7 +17,7 @@
#pragma once
-#include "operators/writer/VeloxParquetDatasource.h"
+#include "operators/writer/VeloxParquetDataSource.h"
#include "utils/ConfigExtractor.h"
#include "utils/VeloxArrowUtils.h"
@@ -33,14 +33,14 @@
namespace gluten {
-class VeloxParquetDatasourceABFS final : public VeloxParquetDatasource {
+class VeloxParquetDataSourceABFS final : public VeloxParquetDataSource {
public:
- VeloxParquetDatasourceABFS(
+ VeloxParquetDataSourceABFS(
const std::string& filePath,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
std::shared_ptr<arrow::Schema> schema)
- : VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}
+ : VeloxParquetDataSource(filePath, veloxPool, sinkPool, schema) {}
void initSink(const std::unordered_map<std::string, std::string>&
sparkConfs) override {
auto hiveConf =
getHiveConfig(std::make_shared<facebook::velox::config::ConfigBase>(
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceGCS.h
b/cpp/velox/operators/writer/VeloxParquetDataSourceGCS.h
similarity index 89%
rename from cpp/velox/operators/writer/VeloxParquetDatasourceGCS.h
rename to cpp/velox/operators/writer/VeloxParquetDataSourceGCS.h
index 0c2bfa2138..22d4e96efe 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasourceGCS.h
+++ b/cpp/velox/operators/writer/VeloxParquetDataSourceGCS.h
@@ -17,7 +17,7 @@
#pragma once
-#include "operators/writer/VeloxParquetDatasource.h"
+#include "operators/writer/VeloxParquetDataSource.h"
#include "utils/ConfigExtractor.h"
#include "utils/VeloxArrowUtils.h"
@@ -32,14 +32,14 @@
#include "velox/dwio/common/Options.h"
namespace gluten {
-class VeloxParquetDatasourceGCS final : public VeloxParquetDatasource {
+class VeloxParquetDataSourceGCS final : public VeloxParquetDataSource {
public:
- VeloxParquetDatasourceGCS(
+ VeloxParquetDataSourceGCS(
const std::string& filePath,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
std::shared_ptr<arrow::Schema> schema)
- : VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}
+ : VeloxParquetDataSource(filePath, veloxPool, sinkPool, schema) {}
void initSink(const std::unordered_map<std::string, std::string>& /*
sparkConfs */) override {
auto fileSystem = filesystems::getFileSystem(filePath_, nullptr);
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h
b/cpp/velox/operators/writer/VeloxParquetDataSourceHDFS.h
similarity index 88%
rename from cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h
rename to cpp/velox/operators/writer/VeloxParquetDataSourceHDFS.h
index 19e9e35606..053b3da2ff 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h
+++ b/cpp/velox/operators/writer/VeloxParquetDataSourceHDFS.h
@@ -17,7 +17,7 @@
#pragma once
-#include "operators/writer/VeloxParquetDatasource.h"
+#include "operators/writer/VeloxParquetDataSource.h"
#include "utils/ConfigExtractor.h"
#include "utils/VeloxArrowUtils.h"
@@ -33,14 +33,14 @@
namespace gluten {
-class VeloxParquetDatasourceHDFS final : public VeloxParquetDatasource {
+class VeloxParquetDataSourceHDFS final : public VeloxParquetDataSource {
public:
- VeloxParquetDatasourceHDFS(
+ VeloxParquetDataSourceHDFS(
const std::string& filePath,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
std::shared_ptr<arrow::Schema> schema)
- : VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}
+ : VeloxParquetDataSource(filePath, veloxPool, sinkPool, schema) {}
void initSink(const std::unordered_map<std::string, std::string>&
sparkConfs) override {
auto hiveConf =
getHiveConfig(std::make_shared<facebook::velox::config::ConfigBase>(
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceS3.h
b/cpp/velox/operators/writer/VeloxParquetDataSourceS3.h
similarity index 89%
rename from cpp/velox/operators/writer/VeloxParquetDatasourceS3.h
rename to cpp/velox/operators/writer/VeloxParquetDataSourceS3.h
index 8219fe42a3..3082f82a91 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasourceS3.h
+++ b/cpp/velox/operators/writer/VeloxParquetDataSourceS3.h
@@ -17,7 +17,7 @@
#pragma once
-#include "operators/writer/VeloxParquetDatasource.h"
+#include "operators/writer/VeloxParquetDataSource.h"
#include "utils/ConfigExtractor.h"
#include "utils/VeloxArrowUtils.h"
@@ -33,14 +33,14 @@
namespace gluten {
-class VeloxParquetDatasourceS3 final : public VeloxParquetDatasource {
+class VeloxParquetDataSourceS3 final : public VeloxParquetDataSource {
public:
- VeloxParquetDatasourceS3(
+ VeloxParquetDataSourceS3(
const std::string& filePath,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
std::shared_ptr<arrow::Schema> schema)
- : VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}
+ : VeloxParquetDataSource(filePath, veloxPool, sinkPool, schema) {}
void initSink(const std::unordered_map<std::string, std::string>&
sparkConfs) override {
auto hiveConf =
getHiveConfig(std::make_shared<facebook::velox::config::ConfigBase>(
diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc
index b2aa3e1a86..0135b35dcd 100644
--- a/cpp/velox/tests/RuntimeTest.cc
+++ b/cpp/velox/tests/RuntimeTest.cc
@@ -77,10 +77,6 @@ class DummyRuntime final : public Runtime {
static Metrics m(1);
return &m;
}
- std::shared_ptr<Datasource> createDatasource(const std::string& filePath,
std::shared_ptr<arrow::Schema> schema)
- override {
- throw GlutenException("Not yet implemented");
- }
std::shared_ptr<ShuffleReader> createShuffleReader(
std::shared_ptr<arrow::Schema> schema,
ShuffleReaderOptions options) override {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]