This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 9c036bb777 [GLUTEN-7459][VL] Move 3.2 / 3.3 Velox native file writer 
code to `backend-velox` / `cpp/velox` (#7461)
9c036bb777 is described below

commit 9c036bb777aebd303a5c68c77e14914c4866b99c
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Oct 11 08:13:29 2024 +0800

    [GLUTEN-7459][VL] Move 3.2 / 3.3 Velox native file writer code to 
`backend-velox` / `cpp/velox` (#7461)
---
 .github/workflows/velox_backend.yml                |   2 +-
 .../datasource/VeloxDataSourceJniWrapper.java      |  15 +-
 .../VeloxDataSourceUtil.scala}                     |  14 +-
 .../velox/VeloxFormatWriterInjects.scala           |  13 +-
 cpp/core/compute/Runtime.h                         |   5 -
 cpp/core/jni/JniWrapper.cc                         | 112 -----------
 cpp/velox/CMakeLists.txt                           |   2 +-
 cpp/velox/benchmarks/ParquetWriteBenchmark.cc      |   8 +-
 cpp/velox/compute/VeloxRuntime.cc                  |  21 +-
 cpp/velox/compute/VeloxRuntime.h                   |   5 +-
 cpp/velox/jni/VeloxJniWrapper.cc                   | 213 ++++++++++++++++-----
 .../operators/writer/VeloxDataSource.h}            |   7 +-
 ...quetDatasource.cc => VeloxParquetDataSource.cc} |  12 +-
 ...arquetDatasource.h => VeloxParquetDataSource.h} |   8 +-
 ...tasourceABFS.h => VeloxParquetDataSourceABFS.h} |   8 +-
 ...DatasourceGCS.h => VeloxParquetDataSourceGCS.h} |   8 +-
 ...tasourceHDFS.h => VeloxParquetDataSourceHDFS.h} |   8 +-
 ...etDatasourceS3.h => VeloxParquetDataSourceS3.h} |   8 +-
 cpp/velox/tests/RuntimeTest.cc                     |   4 -
 19 files changed, 230 insertions(+), 243 deletions(-)

diff --git a/.github/workflows/velox_backend.yml 
b/.github/workflows/velox_backend.yml
index 9b0fb6dec8..c9460edbf6 100644
--- a/.github/workflows/velox_backend.yml
+++ b/.github/workflows/velox_backend.yml
@@ -332,7 +332,7 @@ jobs:
           cd $GITHUB_WORKSPACE/tools/gluten-it
           $MVN_CMD clean install -P${{ matrix.spark }}
           GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh data-gen-only --local 
--benchmark-type=ds -s=30.0 --threads=12
-      - name: (TODO)TPC-DS SF30.0 Parquet local spark3.2 Q67/Q95 low memory, 
memory isolation off
+      - name: (To be fixed) TPC-DS SF30.0 Parquet local spark3.2 Q67/Q95 low 
memory, memory isolation off
         continue-on-error: true
         run: |
           cd tools/gluten-it \
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/datasource/DatasourceJniWrapper.java
 
b/backends-velox/src/main/scala/org/apache/gluten/datasource/VeloxDataSourceJniWrapper.java
similarity index 75%
rename from 
gluten-arrow/src/main/java/org/apache/gluten/datasource/DatasourceJniWrapper.java
rename to 
backends-velox/src/main/scala/org/apache/gluten/datasource/VeloxDataSourceJniWrapper.java
index 7e4b98196c..23f071aff1 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/datasource/DatasourceJniWrapper.java
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/datasource/VeloxDataSourceJniWrapper.java
@@ -25,16 +25,15 @@ import 
org.apache.spark.sql.execution.datasources.BlockStripes;
 import java.util.Map;
 
 /** The jni file is at `cpp/core/jni/JniWrapper.cc` */
-// FIXME: move to module gluten-data?
-public class DatasourceJniWrapper implements RuntimeAware {
+public class VeloxDataSourceJniWrapper implements RuntimeAware {
   private final Runtime runtime;
 
-  private DatasourceJniWrapper(Runtime runtime) {
+  private VeloxDataSourceJniWrapper(Runtime runtime) {
     this.runtime = runtime;
   }
 
-  public static DatasourceJniWrapper create(Runtime runtime) {
-    return new DatasourceJniWrapper(runtime);
+  public static VeloxDataSourceJniWrapper create(Runtime runtime) {
+    return new VeloxDataSourceJniWrapper(runtime);
   }
 
   @Override
@@ -42,11 +41,11 @@ public class DatasourceJniWrapper implements RuntimeAware {
     return runtime.getHandle();
   }
 
-  public long nativeInitDatasource(String filePath, long cSchema, Map<String, 
String> options) {
-    return nativeInitDatasource(filePath, cSchema, 
ConfigUtil.serialize(options));
+  public long init(String filePath, long cSchema, Map<String, String> options) 
{
+    return init(filePath, cSchema, ConfigUtil.serialize(options));
   }
 
-  public native long nativeInitDatasource(String filePath, long cSchema, 
byte[] options);
+  public native long init(String filePath, long cSchema, byte[] options);
 
   public native void inspectSchema(long dsHandle, long cSchemaAddress);
 
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/utils/DatasourceUtil.scala 
b/backends-velox/src/main/scala/org/apache/gluten/datasource/VeloxDataSourceUtil.scala
similarity index 84%
rename from 
backends-velox/src/main/scala/org/apache/gluten/utils/DatasourceUtil.scala
rename to 
backends-velox/src/main/scala/org/apache/gluten/datasource/VeloxDataSourceUtil.scala
index 8963ce93c1..fe0d0eb0f8 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/utils/DatasourceUtil.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/datasource/VeloxDataSourceUtil.scala
@@ -14,11 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.utils
+package org.apache.gluten.datasource
 
-import org.apache.gluten.datasource.DatasourceJniWrapper
 import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 import org.apache.gluten.runtime.Runtimes
+import org.apache.gluten.utils.ArrowAbiUtil
 
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.utils.SparkSchemaUtil
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.FileStatus
 
 import java.util
 
-object DatasourceUtil {
+object VeloxDataSourceUtil {
   def readSchema(files: Seq[FileStatus]): Option[StructType] = {
     if (files.isEmpty) {
       throw new IllegalArgumentException("No input file specified")
@@ -39,11 +39,9 @@ object DatasourceUtil {
   def readSchema(file: FileStatus): Option[StructType] = {
     val allocator = ArrowBufferAllocators.contextInstance()
     val runtime = Runtimes.contextInstance("VeloxWriter")
-    val datasourceJniWrapper = DatasourceJniWrapper.create(runtime)
-    val dsHandle = datasourceJniWrapper.nativeInitDatasource(
-      file.getPath.toString,
-      -1,
-      new util.HashMap[String, String]())
+    val datasourceJniWrapper = VeloxDataSourceJniWrapper.create(runtime)
+    val dsHandle =
+      datasourceJniWrapper.init(file.getPath.toString, -1, new 
util.HashMap[String, String]())
     val cSchema = ArrowSchema.allocateNew(allocator)
     datasourceJniWrapper.inspectSchema(dsHandle, cSchema.memoryAddress())
     try {
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
index 65eca8a18c..91dde1c4c6 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxFormatWriterInjects.scala
@@ -17,12 +17,12 @@
 package org.apache.spark.sql.execution.datasources.velox
 
 import org.apache.gluten.columnarbatch.{ColumnarBatches, 
ColumnarBatchJniWrapper}
-import org.apache.gluten.datasource.DatasourceJniWrapper
+import org.apache.gluten.datasource.{VeloxDataSourceJniWrapper, 
VeloxDataSourceUtil}
 import org.apache.gluten.exception.GlutenException
 import org.apache.gluten.execution.datasource.GlutenRowSplitter
 import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 import org.apache.gluten.runtime.Runtimes
-import org.apache.gluten.utils.{ArrowAbiUtil, DatasourceUtil}
+import org.apache.gluten.utils.ArrowAbiUtil
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
@@ -59,12 +59,11 @@ trait VeloxFormatWriterInjects extends 
GlutenFormatWriterInjectsBase {
     val cSchema = 
ArrowSchema.allocateNew(ArrowBufferAllocators.contextInstance())
     var dsHandle = -1L
     val runtime = Runtimes.contextInstance("VeloxWriter")
-    val datasourceJniWrapper = DatasourceJniWrapper.create(runtime)
+    val datasourceJniWrapper = VeloxDataSourceJniWrapper.create(runtime)
     val allocator = ArrowBufferAllocators.contextInstance()
     try {
       ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema)
-      dsHandle =
-        datasourceJniWrapper.nativeInitDatasource(filePath, 
cSchema.memoryAddress(), nativeConf)
+      dsHandle = datasourceJniWrapper.init(filePath, cSchema.memoryAddress(), 
nativeConf)
     } catch {
       case e: IOException =>
         throw new GlutenException(e)
@@ -105,7 +104,7 @@ trait VeloxFormatWriterInjects extends 
GlutenFormatWriterInjectsBase {
       sparkSession: SparkSession,
       options: Map[String, String],
       files: Seq[FileStatus]): Option[StructType] = {
-    DatasourceUtil.readSchema(files)
+    VeloxDataSourceUtil.readSchema(files)
   }
 }
 
@@ -117,7 +116,7 @@ class VeloxRowSplitter extends GlutenRowSplitter {
       reserve_partition_columns: Boolean = false): BlockStripes = {
     val handler = ColumnarBatches.getNativeHandle(row.batch)
     val runtime = Runtimes.contextInstance("VeloxPartitionWriter")
-    val datasourceJniWrapper = DatasourceJniWrapper.create(runtime)
+    val datasourceJniWrapper = VeloxDataSourceJniWrapper.create(runtime)
     val originalColumns: Array[Int] = Array.range(0, row.batch.numCols())
     val dataColIndice = 
originalColumns.filterNot(partitionColIndice.contains(_))
     new VeloxBlockStripes(
diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index 60be778ef1..4aa67dfe7b 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -27,7 +27,6 @@
 #include "operators/c2r/ColumnarToRow.h"
 #include "operators/r2c/RowToColumnar.h"
 #include "operators/serializer/ColumnarBatchSerializer.h"
-#include "operators/writer/Datasource.h"
 #include "shuffle/ShuffleReader.h"
 #include "shuffle/ShuffleWriter.h"
 #include "substrait/plan.pb.h"
@@ -108,10 +107,6 @@ class Runtime : public 
std::enable_shared_from_this<Runtime> {
 
   virtual Metrics* getMetrics(ColumnarBatchIterator* rawIter, int64_t 
exportNanos) = 0;
 
-  virtual std::shared_ptr<Datasource> createDatasource(
-      const std::string& filePath,
-      std::shared_ptr<arrow::Schema> schema) = 0;
-
   virtual std::shared_ptr<ShuffleReader> createShuffleReader(
       std::shared_ptr<arrow::Schema> schema,
       ShuffleReaderOptions options) = 0;
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 13ea8492cb..3077e9ed4d 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -25,8 +25,6 @@
 #include "jni/JniCommon.h"
 #include "jni/JniError.h"
 
-#include "operators/writer/Datasource.h"
-
 #include <arrow/c/bridge.h>
 #include <optional>
 #include <string>
@@ -70,9 +68,6 @@ static jclass shuffleReaderMetricsClass;
 static jmethodID shuffleReaderMetricsSetDecompressTime;
 static jmethodID shuffleReaderMetricsSetDeserializeTime;
 
-static jclass blockStripesClass;
-static jmethodID blockStripesConstructor;
-
 class JavaInputStreamAdaptor final : public arrow::io::InputStream {
  public:
   JavaInputStreamAdaptor(JNIEnv* env, arrow::MemoryPool* pool, jobject jniIn) 
: pool_(pool) {
@@ -196,10 +191,6 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
   shuffleReaderMetricsSetDeserializeTime =
       getMethodIdOrError(env, shuffleReaderMetricsClass, "setDeserializeTime", 
"(J)V");
 
-  blockStripesClass =
-      createGlobalClassReferenceOrError(env, 
"Lorg/apache/spark/sql/execution/datasources/BlockStripes;");
-  blockStripesConstructor = env->GetMethodID(blockStripesClass, "<init>", 
"(J[J[II[B)V");
-
   return jniVersion;
 }
 
@@ -212,7 +203,6 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
   env->DeleteGlobalRef(nativeColumnarToRowInfoClass);
   env->DeleteGlobalRef(byteArrayClass);
   env->DeleteGlobalRef(shuffleReaderMetricsClass);
-  env->DeleteGlobalRef(blockStripesClass);
 
   gluten::getJniErrorState()->close();
   gluten::getJniCommonState()->close();
@@ -1044,108 +1034,6 @@ JNIEXPORT void JNICALL 
Java_org_apache_gluten_vectorized_ShuffleReaderJniWrapper
   JNI_METHOD_END()
 }
 
-JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_datasource_DatasourceJniWrapper_nativeInitDatasource( // 
NOLINT
-    JNIEnv* env,
-    jobject wrapper,
-    jstring filePath,
-    jlong cSchema,
-    jbyteArray options) {
-  JNI_METHOD_START
-  auto ctx = gluten::getRuntime(env, wrapper);
-
-  ObjectHandle handle = kInvalidObjectHandle;
-
-  if (cSchema == -1) {
-    // Only inspect the schema and not write
-    handle = ctx->saveObject(ctx->createDatasource(jStringToCString(env, 
filePath), nullptr));
-  } else {
-    auto safeArray = gluten::getByteArrayElementsSafe(env, options);
-    auto datasourceOptions = gluten::parseConfMap(env, safeArray.elems(), 
safeArray.length());
-    auto& sparkConf = ctx->getConfMap();
-    datasourceOptions.insert(sparkConf.begin(), sparkConf.end());
-    auto schema = 
gluten::arrowGetOrThrow(arrow::ImportSchema(reinterpret_cast<struct 
ArrowSchema*>(cSchema)));
-    handle = ctx->saveObject(ctx->createDatasource(jStringToCString(env, 
filePath), schema));
-    auto datasource = ObjectStore::retrieve<Datasource>(handle);
-    datasource->init(datasourceOptions);
-  }
-
-  return handle;
-  JNI_METHOD_END(kInvalidObjectHandle)
-}
-
-JNIEXPORT void JNICALL 
Java_org_apache_gluten_datasource_DatasourceJniWrapper_inspectSchema( // NOLINT
-    JNIEnv* env,
-    jobject wrapper,
-    jlong dsHandle,
-    jlong cSchema) {
-  JNI_METHOD_START
-  auto datasource = ObjectStore::retrieve<Datasource>(dsHandle);
-  datasource->inspectSchema(reinterpret_cast<struct ArrowSchema*>(cSchema));
-  JNI_METHOD_END()
-}
-
-JNIEXPORT void JNICALL 
Java_org_apache_gluten_datasource_DatasourceJniWrapper_close( // NOLINT
-    JNIEnv* env,
-    jobject wrapper,
-    jlong dsHandle) {
-  JNI_METHOD_START
-  auto datasource = ObjectStore::retrieve<Datasource>(dsHandle);
-  datasource->close();
-  ObjectStore::release(dsHandle);
-  JNI_METHOD_END()
-}
-
-JNIEXPORT void JNICALL 
Java_org_apache_gluten_datasource_DatasourceJniWrapper_writeBatch( // NOLINT
-    JNIEnv* env,
-    jobject wrapper,
-    jlong dsHandle,
-    jlong batchHandle) {
-  JNI_METHOD_START
-  auto ctx = gluten::getRuntime(env, wrapper);
-  auto datasource = ObjectStore::retrieve<Datasource>(dsHandle);
-  auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
-  datasource->write(batch);
-  JNI_METHOD_END()
-}
-
-JNIEXPORT jobject JNICALL
-Java_org_apache_gluten_datasource_DatasourceJniWrapper_splitBlockByPartitionAndBucket(
 // NOLINT
-    JNIEnv* env,
-    jobject wrapper,
-    jlong batchHandle,
-    jintArray partitionColIndice,
-    jboolean hasBucket,
-    jlong memoryManagerId) {
-  JNI_METHOD_START
-  auto ctx = gluten::getRuntime(env, wrapper);
-  auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
-  auto safeArray = gluten::getIntArrayElementsSafe(env, partitionColIndice);
-  int size = env->GetArrayLength(partitionColIndice);
-  std::vector<int32_t> partitionColIndiceVec;
-  for (int i = 0; i < size; ++i) {
-    partitionColIndiceVec.push_back(safeArray.elems()[i]);
-  }
-
-  auto result = batch->toUnsafeRow(0);
-  auto rowBytes = result.data();
-  auto newBatchHandle = ctx->saveObject(ctx->select(batch, 
partitionColIndiceVec));
-
-  auto bytesSize = result.size();
-  jbyteArray bytesArray = env->NewByteArray(bytesSize);
-  env->SetByteArrayRegion(bytesArray, 0, bytesSize, 
reinterpret_cast<jbyte*>(rowBytes));
-
-  jlongArray batchArray = env->NewLongArray(1);
-  long* cBatchArray = new long[1];
-  cBatchArray[0] = newBatchHandle;
-  env->SetLongArrayRegion(batchArray, 0, 1, cBatchArray);
-  delete[] cBatchArray;
-
-  jobject blockStripes = env->NewObject(
-      blockStripesClass, blockStripesConstructor, batchHandle, batchArray, 
nullptr, batch->numColumns(), bytesArray);
-  return blockStripes;
-  JNI_METHOD_END(nullptr)
-}
-
 JNIEXPORT jobject JNICALL 
Java_org_apache_gluten_vectorized_ColumnarBatchSerializerJniWrapper_serialize( 
// NOLINT
     JNIEnv* env,
     jobject wrapper,
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index 62ddecc571..39b3f46b06 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -187,7 +187,7 @@ set(VELOX_SRCS
     operators/serializer/VeloxColumnarToRowConverter.cc
     operators/serializer/VeloxColumnarBatchSerializer.cc
     operators/serializer/VeloxRowToColumnarConverter.cc
-    operators/writer/VeloxParquetDatasource.cc
+    operators/writer/VeloxParquetDataSource.cc
     shuffle/VeloxShuffleReader.cc
     shuffle/VeloxShuffleWriter.cc
     shuffle/VeloxHashShuffleWriter.cc
diff --git a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc 
b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
index b1118b6ae0..2369a13bd2 100644
--- a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
+++ b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc
@@ -263,20 +263,20 @@ class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark 
: public GoogleBenchmar
 
     for (auto _ : state) {
       // Init VeloxParquetDataSource
-      auto veloxParquetDatasource = 
std::make_unique<gluten::VeloxParquetDatasource>(
+      auto veloxParquetDataSource = 
std::make_unique<gluten::VeloxParquetDataSource>(
           outputPath_ + "/" + fileName,
           veloxPool->addAggregateChild("writer_benchmark"),
           veloxPool->addLeafChild("sink_pool"),
           localSchema);
 
-      veloxParquetDatasource->init(runtime->getConfMap());
+      veloxParquetDataSource->init(runtime->getConfMap());
       auto start = std::chrono::steady_clock::now();
       for (const auto& vector : vectors) {
-        veloxParquetDatasource->write(vector);
+        veloxParquetDataSource->write(vector);
       }
       auto end = std::chrono::steady_clock::now();
       writeTime += std::chrono::duration_cast<std::chrono::nanoseconds>(end - 
start).count();
-      veloxParquetDatasource->close();
+      veloxParquetDataSource->close();
     }
 
     state.counters["rowgroups"] =
diff --git a/cpp/velox/compute/VeloxRuntime.cc 
b/cpp/velox/compute/VeloxRuntime.cc
index 5b93fb7d53..e93dc2367e 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -36,20 +36,20 @@
 
 #ifdef ENABLE_HDFS
 
-#include "operators/writer/VeloxParquetDatasourceHDFS.h"
+#include "operators/writer/VeloxParquetDataSourceHDFS.h"
 
 #endif
 
 #ifdef ENABLE_S3
-#include "operators/writer/VeloxParquetDatasourceS3.h"
+#include "operators/writer/VeloxParquetDataSourceS3.h"
 #endif
 
 #ifdef ENABLE_GCS
-#include "operators/writer/VeloxParquetDatasourceGCS.h"
+#include "operators/writer/VeloxParquetDataSourceGCS.h"
 #endif
 
 #ifdef ENABLE_ABFS
-#include "operators/writer/VeloxParquetDatasourceABFS.h"
+#include "operators/writer/VeloxParquetDataSourceABFS.h"
 #endif
 
 using namespace facebook;
@@ -204,44 +204,45 @@ std::shared_ptr<ShuffleWriter> 
VeloxRuntime::createShuffleWriter(
   return shuffleWriter;
 }
 
-std::shared_ptr<Datasource> VeloxRuntime::createDatasource(
+std::shared_ptr<VeloxDataSource> VeloxRuntime::createDataSource(
     const std::string& filePath,
     std::shared_ptr<arrow::Schema> schema) {
   static std::atomic_uint32_t id{0UL};
   auto veloxPool = 
vmm_->getAggregateMemoryPool()->addAggregateChild("datasource." + 
std::to_string(id++));
   // Pass a dedicate pool for S3 and GCS sinks as can't share veloxPool
   // with parquet writer.
+  // FIXME: Check file formats?
   auto sinkPool = vmm_->getLeafMemoryPool();
   if (isSupportedHDFSPath(filePath)) {
 #ifdef ENABLE_HDFS
-    return std::make_shared<VeloxParquetDatasourceHDFS>(filePath, veloxPool, 
sinkPool, schema);
+    return std::make_shared<VeloxParquetDataSourceHDFS>(filePath, veloxPool, 
sinkPool, schema);
 #else
     throw std::runtime_error(
         "The write path is hdfs path but the HDFS haven't been enabled when 
writing parquet data in velox runtime!");
 #endif
   } else if (isSupportedS3SdkPath(filePath)) {
 #ifdef ENABLE_S3
-    return std::make_shared<VeloxParquetDatasourceS3>(filePath, veloxPool, 
sinkPool, schema);
+    return std::make_shared<VeloxParquetDataSourceS3>(filePath, veloxPool, 
sinkPool, schema);
 #else
     throw std::runtime_error(
         "The write path is S3 path but the S3 haven't been enabled when 
writing parquet data in velox runtime!");
 #endif
   } else if (isSupportedGCSPath(filePath)) {
 #ifdef ENABLE_GCS
-    return std::make_shared<VeloxParquetDatasourceGCS>(filePath, veloxPool, 
sinkPool, schema);
+    return std::make_shared<VeloxParquetDataSourceGCS>(filePath, veloxPool, 
sinkPool, schema);
 #else
     throw std::runtime_error(
         "The write path is GCS path but the GCS haven't been enabled when 
writing parquet data in velox runtime!");
 #endif
   } else if (isSupportedABFSPath(filePath)) {
 #ifdef ENABLE_ABFS
-    return std::make_shared<VeloxParquetDatasourceABFS>(filePath, veloxPool, 
sinkPool, schema);
+    return std::make_shared<VeloxParquetDataSourceABFS>(filePath, veloxPool, 
sinkPool, schema);
 #else
     throw std::runtime_error(
         "The write path is ABFS path but the ABFS haven't been enabled when 
writing parquet data in velox runtime!");
 #endif
   }
-  return std::make_shared<VeloxParquetDatasource>(filePath, veloxPool, 
sinkPool, schema);
+  return std::make_shared<VeloxParquetDataSource>(filePath, veloxPool, 
sinkPool, schema);
 }
 
 std::shared_ptr<ShuffleReader> VeloxRuntime::createShuffleReader(
diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h
index 74f59639d7..405bbae635 100644
--- a/cpp/velox/compute/VeloxRuntime.h
+++ b/cpp/velox/compute/VeloxRuntime.h
@@ -22,7 +22,7 @@
 #include "memory/VeloxMemoryManager.h"
 #include "operators/serializer/VeloxColumnarBatchSerializer.h"
 #include "operators/serializer/VeloxColumnarToRowConverter.h"
-#include "operators/writer/VeloxParquetDatasource.h"
+#include "operators/writer/VeloxParquetDataSource.h"
 #include "shuffle/ShuffleReader.h"
 #include "shuffle/ShuffleWriter.h"
 
@@ -68,8 +68,7 @@ class VeloxRuntime final : public Runtime {
     return iter->getMetrics(exportNanos);
   }
 
-  std::shared_ptr<Datasource> createDatasource(const std::string& filePath, 
std::shared_ptr<arrow::Schema> schema)
-      override;
+  std::shared_ptr<VeloxDataSource> createDataSource(const std::string& 
filePath, std::shared_ptr<arrow::Schema> schema);
 
   std::shared_ptr<ShuffleReader> createShuffleReader(
       std::shared_ptr<arrow::Schema> schema,
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 22136ad297..b8d2b0c3c2 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -38,22 +38,30 @@
 
 #include <iostream>
 
+using namespace gluten;
 using namespace facebook;
 
 #ifdef __cplusplus
 extern "C" {
 #endif
 
+static jclass blockStripesClass;
+static jmethodID blockStripesConstructor;
+
 jint JNI_OnLoad(JavaVM* vm, void*) {
   JNIEnv* env;
   if (vm->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) {
     return JNI_ERR;
   }
 
-  gluten::getJniCommonState()->ensureInitialized(env);
-  gluten::getJniErrorState()->ensureInitialized(env);
-  gluten::initVeloxJniFileSystem(env);
-  gluten::initVeloxJniUDF(env);
+  getJniCommonState()->ensureInitialized(env);
+  getJniErrorState()->ensureInitialized(env);
+  initVeloxJniFileSystem(env);
+  initVeloxJniUDF(env);
+
+  blockStripesClass =
+      createGlobalClassReferenceOrError(env, 
"Lorg/apache/spark/sql/execution/datasources/BlockStripes;");
+  blockStripesConstructor = env->GetMethodID(blockStripesClass, "<init>", 
"(J[J[II[B)V");
 
   DLOG(INFO) << "Loaded Velox backend.";
 
@@ -63,10 +71,13 @@ jint JNI_OnLoad(JavaVM* vm, void*) {
 void JNI_OnUnload(JavaVM* vm, void*) {
   JNIEnv* env;
   vm->GetEnv(reinterpret_cast<void**>(&env), jniVersion);
-  gluten::finalizeVeloxJniUDF(env);
-  gluten::finalizeVeloxJniFileSystem(env);
-  gluten::getJniErrorState()->close();
-  gluten::getJniCommonState()->close();
+
+  env->DeleteGlobalRef(blockStripesClass);
+
+  finalizeVeloxJniUDF(env);
+  finalizeVeloxJniFileSystem(env);
+  getJniErrorState()->close();
+  getJniCommonState()->close();
   google::ShutdownGoogleLogging();
 }
 
@@ -75,9 +86,9 @@ JNIEXPORT void JNICALL 
Java_org_apache_gluten_init_NativeBackendInitializer_init
     jclass,
     jbyteArray conf) {
   JNI_METHOD_START
-  auto safeArray = gluten::getByteArrayElementsSafe(env, conf);
-  auto sparkConf = gluten::parseConfMap(env, safeArray.elems(), 
safeArray.length());
-  gluten::VeloxBackend::create(sparkConf);
+  auto safeArray = getByteArrayElementsSafe(env, conf);
+  auto sparkConf = parseConfMap(env, safeArray.elems(), safeArray.length());
+  VeloxBackend::create(sparkConf);
   JNI_METHOD_END()
 }
 
@@ -85,7 +96,7 @@ JNIEXPORT void JNICALL 
Java_org_apache_gluten_init_NativeBackendInitializer_shut
     JNIEnv* env,
     jclass) {
   JNI_METHOD_START
-  gluten::VeloxBackend::get()->tearDown();
+  VeloxBackend::get()->tearDown();
   JNI_METHOD_END()
 }
 
@@ -93,7 +104,7 @@ JNIEXPORT void JNICALL 
Java_org_apache_gluten_udf_UdfJniWrapper_registerFunction
     JNIEnv* env,
     jclass) {
   JNI_METHOD_START
-  gluten::jniRegisterFunctionSignatures(env);
+  jniRegisterFunctionSignatures(env);
   JNI_METHOD_END()
 }
 
@@ -103,14 +114,14 @@ 
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeValidateWithFail
     jobject wrapper,
     jbyteArray planArray) {
   JNI_METHOD_START
-  auto ctx = gluten::getRuntime(env, wrapper);
-  auto safeArray = gluten::getByteArrayElementsSafe(env, planArray);
+  auto ctx = getRuntime(env, wrapper);
+  auto safeArray = getByteArrayElementsSafe(env, planArray);
   auto planData = safeArray.elems();
   auto planSize = env->GetArrayLength(planArray);
-  auto runtime = dynamic_cast<gluten::VeloxRuntime*>(ctx);
+  auto runtime = dynamic_cast<VeloxRuntime*>(ctx);
   if (runtime->debugModeEnabled()) {
     try {
-      auto jsonPlan = gluten::substraitFromPbToJson("Plan", planData, 
planSize, std::nullopt);
+      auto jsonPlan = substraitFromPbToJson("Plan", planData, planSize, 
std::nullopt);
       LOG(INFO) << std::string(50, '#') << " received substrait::Plan: for 
validation";
       LOG(INFO) << jsonPlan;
     } catch (const std::exception& e) {
@@ -119,21 +130,21 @@ 
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeValidateWithFail
   }
 
   ::substrait::Plan subPlan;
-  gluten::parseProtobuf(planData, planSize, &subPlan);
+  parseProtobuf(planData, planSize, &subPlan);
 
   // A query context with dummy configs. Used for function validation.
   std::unordered_map<std::string, std::string> configs{
       {velox::core::QueryConfig::kSparkPartitionId, "0"}, 
{velox::core::QueryConfig::kSessionTimezone, "GMT"}};
   auto queryCtx = velox::core::QueryCtx::create(nullptr, 
velox::core::QueryConfig(configs));
-  auto pool = gluten::defaultLeafVeloxMemoryPool().get();
+  auto pool = defaultLeafVeloxMemoryPool().get();
   // An execution context used for function validation.
   velox::core::ExecCtx execCtx(pool, queryCtx.get());
 
-  gluten::SubstraitToVeloxPlanValidator planValidator(pool, &execCtx);
+  SubstraitToVeloxPlanValidator planValidator(pool, &execCtx);
   jclass infoCls = 
env->FindClass("Lorg/apache/gluten/validate/NativePlanValidationInfo;");
   if (infoCls == nullptr) {
     std::string errorMessage = "Unable to CreateGlobalClassReferenceOrError 
for NativePlanValidationInfo";
-    throw gluten::GlutenException(errorMessage);
+    throw GlutenException(errorMessage);
   }
   jmethodID method = env->GetMethodID(infoCls, "<init>", 
"(ILjava/lang/String;)V");
   try {
@@ -158,13 +169,13 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_columnarbatch_VeloxColumnarBatchJ
     jobject wrapper,
     jlong handle) {
   JNI_METHOD_START
-  auto ctx = gluten::getRuntime(env, wrapper);
-  auto runtime = dynamic_cast<gluten::VeloxRuntime*>(ctx);
+  auto ctx = getRuntime(env, wrapper);
+  auto runtime = dynamic_cast<VeloxRuntime*>(ctx);
 
-  auto batch = gluten::ObjectStore::retrieve<gluten::ColumnarBatch>(handle);
-  auto newBatch = 
gluten::VeloxColumnarBatch::from(runtime->memoryManager()->getLeafMemoryPool().get(),
 batch);
+  auto batch = ObjectStore::retrieve<ColumnarBatch>(handle);
+  auto newBatch = 
VeloxColumnarBatch::from(runtime->memoryManager()->getLeafMemoryPool().get(), 
batch);
   return ctx->saveObject(newBatch);
-  JNI_METHOD_END(gluten::kInvalidObjectHandle)
+  JNI_METHOD_END(kInvalidObjectHandle)
 }
 
 JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_columnarbatch_VeloxColumnarBatchJniWrapper_compose( // 
NOLINT
@@ -172,22 +183,21 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_columnarbatch_VeloxColumnarBatchJ
     jobject wrapper,
     jlongArray batchHandles) {
   JNI_METHOD_START
-  auto ctx = gluten::getRuntime(env, wrapper);
-  auto runtime = dynamic_cast<gluten::VeloxRuntime*>(ctx);
+  auto ctx = getRuntime(env, wrapper);
+  auto runtime = dynamic_cast<VeloxRuntime*>(ctx);
 
   int handleCount = env->GetArrayLength(batchHandles);
-  auto safeArray = gluten::getLongArrayElementsSafe(env, batchHandles);
+  auto safeArray = getLongArrayElementsSafe(env, batchHandles);
 
-  std::vector<std::shared_ptr<gluten::ColumnarBatch>> batches;
+  std::vector<std::shared_ptr<ColumnarBatch>> batches;
   for (int i = 0; i < handleCount; ++i) {
     int64_t handle = safeArray.elems()[i];
-    auto batch = gluten::ObjectStore::retrieve<gluten::ColumnarBatch>(handle);
+    auto batch = ObjectStore::retrieve<ColumnarBatch>(handle);
     batches.push_back(batch);
   }
-  auto newBatch =
-      
gluten::VeloxColumnarBatch::compose(runtime->memoryManager()->getLeafMemoryPool().get(),
 std::move(batches));
+  auto newBatch = 
VeloxColumnarBatch::compose(runtime->memoryManager()->getLeafMemoryPool().get(),
 std::move(batches));
   return ctx->saveObject(newBatch);
-  JNI_METHOD_END(gluten::kInvalidObjectHandle)
+  JNI_METHOD_END(kInvalidObjectHandle)
 }
 
 JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_utils_VeloxBloomFilterJniWrapper_empty( // NOLINT
@@ -195,12 +205,12 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_utils_VeloxBloomFilterJniWrapper_
     jobject wrapper,
     jint capacity) {
   JNI_METHOD_START
-  auto ctx = gluten::getRuntime(env, wrapper);
+  auto ctx = getRuntime(env, wrapper);
   auto filter = 
std::make_shared<velox::BloomFilter<std::allocator<uint64_t>>>();
   filter->reset(capacity);
   GLUTEN_CHECK(filter->isSet(), "Bloom-filter is not initialized");
   return ctx->saveObject(filter);
-  JNI_METHOD_END(gluten::kInvalidObjectHandle)
+  JNI_METHOD_END(kInvalidObjectHandle)
 }
 
 JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_utils_VeloxBloomFilterJniWrapper_init( // NOLINT
@@ -208,13 +218,13 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_utils_VeloxBloomFilterJniWrapper_
     jobject wrapper,
     jbyteArray data) {
   JNI_METHOD_START
-  auto safeArray = gluten::getByteArrayElementsSafe(env, data);
-  auto ctx = gluten::getRuntime(env, wrapper);
+  auto safeArray = getByteArrayElementsSafe(env, data);
+  auto ctx = getRuntime(env, wrapper);
   auto filter = 
std::make_shared<velox::BloomFilter<std::allocator<uint64_t>>>();
   uint8_t* serialized = safeArray.elems();
   filter->merge(reinterpret_cast<char*>(serialized));
   return ctx->saveObject(filter);
-  JNI_METHOD_END(gluten::kInvalidObjectHandle)
+  JNI_METHOD_END(kInvalidObjectHandle)
 }
 
 JNIEXPORT void JNICALL 
Java_org_apache_gluten_utils_VeloxBloomFilterJniWrapper_insertLong( // NOLINT
@@ -223,7 +233,7 @@ JNIEXPORT void JNICALL 
Java_org_apache_gluten_utils_VeloxBloomFilterJniWrapper_i
     jlong handle,
     jlong item) {
   JNI_METHOD_START
-  auto filter = 
gluten::ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(handle);
+  auto filter = 
ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(handle);
   GLUTEN_CHECK(filter->isSet(), "Bloom-filter is not initialized");
   filter->insert(folly::hasher<int64_t>()(item));
   JNI_METHOD_END()
@@ -235,7 +245,7 @@ JNIEXPORT jboolean JNICALL 
Java_org_apache_gluten_utils_VeloxBloomFilterJniWrapp
     jlong handle,
     jlong item) {
   JNI_METHOD_START
-  auto filter = 
gluten::ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(handle);
+  auto filter = 
ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(handle);
   GLUTEN_CHECK(filter->isSet(), "Bloom-filter is not initialized");
   bool out = filter->mayContain(folly::hasher<int64_t>()(item));
   return out;
@@ -259,8 +269,8 @@ JNIEXPORT void JNICALL 
Java_org_apache_gluten_utils_VeloxBloomFilterJniWrapper_m
     jlong handle,
     jlong other) {
   JNI_METHOD_START
-  auto to = 
gluten::ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(handle);
-  auto from = 
gluten::ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(other);
+  auto to = 
ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(handle);
+  auto from = 
ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(other);
   GLUTEN_CHECK(to->isSet(), "Bloom-filter is not initialized");
   GLUTEN_CHECK(from->isSet(), "Bloom-filter is not initialized");
   std::vector<char> serialized = serialize(from.get());
@@ -273,7 +283,7 @@ JNIEXPORT jbyteArray JNICALL 
Java_org_apache_gluten_utils_VeloxBloomFilterJniWra
     jobject wrapper,
     jlong handle) {
   JNI_METHOD_START
-  auto filter = 
gluten::ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(handle);
+  auto filter = 
ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(handle);
   GLUTEN_CHECK(filter->isSet(), "Bloom-filter is not initialized");
   std::vector<char> buffer = serialize(filter.get());
   auto size = buffer.capacity();
@@ -290,13 +300,13 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_utils_VeloxBatchResizerJniWrapper
     jint maxOutputBatchSize,
     jobject jIter) {
   JNI_METHOD_START
-  auto ctx = gluten::getRuntime(env, wrapper);
-  auto pool = 
dynamic_cast<gluten::VeloxMemoryManager*>(ctx->memoryManager())->getLeafMemoryPool();
-  auto iter = gluten::makeJniColumnarBatchIterator(env, jIter, ctx, nullptr);
-  auto appender = std::make_shared<gluten::ResultIterator>(
-      std::make_unique<gluten::VeloxBatchResizer>(pool.get(), 
minOutputBatchSize, maxOutputBatchSize, std::move(iter)));
+  auto ctx = getRuntime(env, wrapper);
+  auto pool = 
dynamic_cast<VeloxMemoryManager*>(ctx->memoryManager())->getLeafMemoryPool();
+  auto iter = makeJniColumnarBatchIterator(env, jIter, ctx, nullptr);
+  auto appender = std::make_shared<ResultIterator>(
+      std::make_unique<VeloxBatchResizer>(pool.get(), minOutputBatchSize, 
maxOutputBatchSize, std::move(iter)));
   return ctx->saveObject(appender);
-  JNI_METHOD_END(gluten::kInvalidObjectHandle)
+  JNI_METHOD_END(kInvalidObjectHandle)
 }
 
 JNIEXPORT jboolean JNICALL
@@ -317,6 +327,109 @@ 
Java_org_apache_gluten_utils_VeloxFileSystemValidationJniWrapper_allSupportedByR
   JNI_METHOD_END(false)
 }
 
+JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapper_init( // NOLINT
+    JNIEnv* env,
+    jobject wrapper,
+    jstring filePath,
+    jlong cSchema,
+    jbyteArray options) {
+  JNI_METHOD_START
+  auto ctx = gluten::getRuntime(env, wrapper);
+  auto runtime = dynamic_cast<VeloxRuntime*>(ctx);
+
+  ObjectHandle handle = kInvalidObjectHandle;
+
+  if (cSchema == -1) {
+    // Only inspect the schema and not write
+    handle = ctx->saveObject(runtime->createDataSource(jStringToCString(env, 
filePath), nullptr));
+  } else {
+    auto safeArray = gluten::getByteArrayElementsSafe(env, options);
+    auto datasourceOptions = gluten::parseConfMap(env, safeArray.elems(), 
safeArray.length());
+    auto& sparkConf = ctx->getConfMap();
+    datasourceOptions.insert(sparkConf.begin(), sparkConf.end());
+    auto schema = 
gluten::arrowGetOrThrow(arrow::ImportSchema(reinterpret_cast<struct 
ArrowSchema*>(cSchema)));
+    handle = ctx->saveObject(runtime->createDataSource(jStringToCString(env, 
filePath), schema));
+    auto datasource = ObjectStore::retrieve<VeloxDataSource>(handle);
+    datasource->init(datasourceOptions);
+  }
+
+  return handle;
+  JNI_METHOD_END(kInvalidObjectHandle)
+}
+
+JNIEXPORT void JNICALL 
Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapper_inspectSchema( // 
NOLINT
+    JNIEnv* env,
+    jobject wrapper,
+    jlong dsHandle,
+    jlong cSchema) {
+  JNI_METHOD_START
+  auto datasource = ObjectStore::retrieve<VeloxDataSource>(dsHandle);
+  datasource->inspectSchema(reinterpret_cast<struct ArrowSchema*>(cSchema));
+  JNI_METHOD_END()
+}
+
+JNIEXPORT void JNICALL 
Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapper_close( // NOLINT
+    JNIEnv* env,
+    jobject wrapper,
+    jlong dsHandle) {
+  JNI_METHOD_START
+  auto datasource = ObjectStore::retrieve<VeloxDataSource>(dsHandle);
+  datasource->close();
+  ObjectStore::release(dsHandle);
+  JNI_METHOD_END()
+}
+
+JNIEXPORT void JNICALL 
Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapper_writeBatch( // 
NOLINT
+    JNIEnv* env,
+    jobject wrapper,
+    jlong dsHandle,
+    jlong batchHandle) {
+  JNI_METHOD_START
+  auto ctx = gluten::getRuntime(env, wrapper);
+  auto datasource = ObjectStore::retrieve<VeloxDataSource>(dsHandle);
+  auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
+  datasource->write(batch);
+  JNI_METHOD_END()
+}
+
+JNIEXPORT jobject JNICALL
+Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapper_splitBlockByPartitionAndBucket(
 // NOLINT
+    JNIEnv* env,
+    jobject wrapper,
+    jlong batchHandle,
+    jintArray partitionColIndice,
+    jboolean hasBucket,
+    jlong memoryManagerId) {
+  JNI_METHOD_START
+  auto ctx = gluten::getRuntime(env, wrapper);
+  auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
+  auto safeArray = gluten::getIntArrayElementsSafe(env, partitionColIndice);
+  int size = env->GetArrayLength(partitionColIndice);
+  std::vector<int32_t> partitionColIndiceVec;
+  for (int i = 0; i < size; ++i) {
+    partitionColIndiceVec.push_back(safeArray.elems()[i]);
+  }
+
+  auto result = batch->toUnsafeRow(0);
+  auto rowBytes = result.data();
+  auto newBatchHandle = ctx->saveObject(ctx->select(batch, 
partitionColIndiceVec));
+
+  auto bytesSize = result.size();
+  jbyteArray bytesArray = env->NewByteArray(bytesSize);
+  env->SetByteArrayRegion(bytesArray, 0, bytesSize, 
reinterpret_cast<jbyte*>(rowBytes));
+
+  jlongArray batchArray = env->NewLongArray(1);
+  long* cBatchArray = new long[1];
+  cBatchArray[0] = newBatchHandle;
+  env->SetLongArrayRegion(batchArray, 0, 1, cBatchArray);
+  delete[] cBatchArray;
+
+  jobject blockStripes = env->NewObject(
+      blockStripesClass, blockStripesConstructor, batchHandle, batchArray, 
nullptr, batch->numColumns(), bytesArray);
+  return blockStripes;
+  JNI_METHOD_END(nullptr)
+}
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/cpp/core/operators/writer/Datasource.h 
b/cpp/velox/operators/writer/VeloxDataSource.h
similarity index 89%
rename from cpp/core/operators/writer/Datasource.h
rename to cpp/velox/operators/writer/VeloxDataSource.h
index 0a14bcf7bb..043f1b8b17 100644
--- a/cpp/core/operators/writer/Datasource.h
+++ b/cpp/velox/operators/writer/VeloxDataSource.h
@@ -24,16 +24,15 @@
 #include <arrow/type_fwd.h>
 
 #include "memory/ColumnarBatch.h"
-#include "operators/writer/Datasource.h"
 
 namespace gluten {
 
-class Datasource {
+class VeloxDataSource {
  public:
-  Datasource(const std::string& filePath, std::shared_ptr<arrow::Schema> 
schema)
+  VeloxDataSource(const std::string& filePath, std::shared_ptr<arrow::Schema> 
schema)
       : filePath_(filePath), schema_(schema) {}
 
-  virtual ~Datasource() = default;
+  virtual ~VeloxDataSource() = default;
 
   virtual void init(const std::unordered_map<std::string, std::string>& 
sparkConfs) {}
   virtual void inspectSchema(struct ArrowSchema* out) = 0;
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasource.cc 
b/cpp/velox/operators/writer/VeloxParquetDataSource.cc
similarity index 94%
rename from cpp/velox/operators/writer/VeloxParquetDatasource.cc
rename to cpp/velox/operators/writer/VeloxParquetDataSource.cc
index c538a04370..aeec1b4c82 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasource.cc
+++ b/cpp/velox/operators/writer/VeloxParquetDataSource.cc
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-#include "VeloxParquetDatasource.h"
+#include "VeloxParquetDataSource.h"
 
 #include <arrow/buffer.h>
 #include <cstring>
@@ -43,7 +43,7 @@ namespace {
 const int32_t kGzipWindowBits4k = 12;
 }
 
-void VeloxParquetDatasource::initSink(const std::unordered_map<std::string, 
std::string>& /* sparkConfs */) {
+void VeloxParquetDataSource::initSink(const std::unordered_map<std::string, 
std::string>& /* sparkConfs */) {
   if (strncmp(filePath_.c_str(), "file:", 5) == 0) {
     sink_ = dwio::common::FileSink::create(filePath_, {.pool = pool_.get()});
   } else {
@@ -51,7 +51,7 @@ void VeloxParquetDatasource::initSink(const 
std::unordered_map<std::string, std:
   }
 }
 
-void VeloxParquetDatasource::init(const std::unordered_map<std::string, 
std::string>& sparkConfs) {
+void VeloxParquetDataSource::init(const std::unordered_map<std::string, 
std::string>& sparkConfs) {
   initSink(sparkConfs);
 
   if (sparkConfs.find(kParquetBlockSize) != sparkConfs.end()) {
@@ -103,7 +103,7 @@ void VeloxParquetDatasource::init(const 
std::unordered_map<std::string, std::str
   parquetWriter_ = std::make_unique<velox::parquet::Writer>(std::move(sink_), 
writeOption, pool_, asRowType(schema));
 }
 
-void VeloxParquetDatasource::inspectSchema(struct ArrowSchema* out) {
+void VeloxParquetDataSource::inspectSchema(struct ArrowSchema* out) {
   velox::dwio::common::ReaderOptions readerOptions(pool_.get());
   auto format = velox::dwio::common::FileFormat::PARQUET;
   readerOptions.setFileFormat(format);
@@ -121,13 +121,13 @@ void VeloxParquetDatasource::inspectSchema(struct 
ArrowSchema* out) {
   toArrowSchema(reader->rowType(), pool_.get(), out);
 }
 
-void VeloxParquetDatasource::close() {
+void VeloxParquetDataSource::close() {
   if (parquetWriter_) {
     parquetWriter_->close();
   }
 }
 
-void VeloxParquetDatasource::write(const std::shared_ptr<ColumnarBatch>& cb) {
+void VeloxParquetDataSource::write(const std::shared_ptr<ColumnarBatch>& cb) {
   auto veloxBatch = std::dynamic_pointer_cast<VeloxColumnarBatch>(cb);
   VELOX_DCHECK(veloxBatch != nullptr, "Write batch should be 
VeloxColumnarBatch");
   parquetWriter_->write(veloxBatch->getFlattenedRowVector());
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasource.h 
b/cpp/velox/operators/writer/VeloxParquetDataSource.h
similarity index 94%
rename from cpp/velox/operators/writer/VeloxParquetDatasource.h
rename to cpp/velox/operators/writer/VeloxParquetDataSource.h
index 15541f8a87..e7428999f0 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasource.h
+++ b/cpp/velox/operators/writer/VeloxParquetDataSource.h
@@ -29,7 +29,7 @@
 
 #include "memory/ColumnarBatch.h"
 #include "memory/VeloxColumnarBatch.h"
-#include "operators/writer/Datasource.h"
+#include "operators/writer/VeloxDataSource.h"
 
 #include "velox/common/file/FileSystems.h"
 #ifdef ENABLE_S3
@@ -79,14 +79,14 @@ inline bool isSupportedABFSPath(const std::string& 
filePath) {
   return strncmp(filePath.c_str(), "abfs:", 5) == 0 || 
strncmp(filePath.c_str(), "abfss:", 6) == 0;
 }
 
-class VeloxParquetDatasource : public Datasource {
+class VeloxParquetDataSource : public VeloxDataSource {
  public:
-  VeloxParquetDatasource(
+  VeloxParquetDataSource(
       const std::string& filePath,
       std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
       std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
       std::shared_ptr<arrow::Schema> schema)
-      : Datasource(filePath, schema), filePath_(filePath), schema_(schema), 
pool_(std::move(veloxPool)) {}
+      : VeloxDataSource(filePath, schema), filePath_(filePath), 
schema_(schema), pool_(std::move(veloxPool)) {}
 
   void init(const std::unordered_map<std::string, std::string>& sparkConfs) 
override;
   virtual void initSink(const std::unordered_map<std::string, std::string>& 
sparkConfs);
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceABFS.h 
b/cpp/velox/operators/writer/VeloxParquetDataSourceABFS.h
similarity index 89%
rename from cpp/velox/operators/writer/VeloxParquetDatasourceABFS.h
rename to cpp/velox/operators/writer/VeloxParquetDataSourceABFS.h
index b542e42bab..63658cc54e 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasourceABFS.h
+++ b/cpp/velox/operators/writer/VeloxParquetDataSourceABFS.h
@@ -17,7 +17,7 @@
 
 #pragma once
 
-#include "operators/writer/VeloxParquetDatasource.h"
+#include "operators/writer/VeloxParquetDataSource.h"
 #include "utils/ConfigExtractor.h"
 #include "utils/VeloxArrowUtils.h"
 
@@ -33,14 +33,14 @@
 
 namespace gluten {
 
-class VeloxParquetDatasourceABFS final : public VeloxParquetDatasource {
+class VeloxParquetDataSourceABFS final : public VeloxParquetDataSource {
  public:
-  VeloxParquetDatasourceABFS(
+  VeloxParquetDataSourceABFS(
       const std::string& filePath,
       std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
       std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
       std::shared_ptr<arrow::Schema> schema)
-      : VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}
+      : VeloxParquetDataSource(filePath, veloxPool, sinkPool, schema) {}
 
   void initSink(const std::unordered_map<std::string, std::string>& 
sparkConfs) override {
     auto hiveConf = 
getHiveConfig(std::make_shared<facebook::velox::config::ConfigBase>(
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceGCS.h 
b/cpp/velox/operators/writer/VeloxParquetDataSourceGCS.h
similarity index 89%
rename from cpp/velox/operators/writer/VeloxParquetDatasourceGCS.h
rename to cpp/velox/operators/writer/VeloxParquetDataSourceGCS.h
index 0c2bfa2138..22d4e96efe 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasourceGCS.h
+++ b/cpp/velox/operators/writer/VeloxParquetDataSourceGCS.h
@@ -17,7 +17,7 @@
 
 #pragma once
 
-#include "operators/writer/VeloxParquetDatasource.h"
+#include "operators/writer/VeloxParquetDataSource.h"
 #include "utils/ConfigExtractor.h"
 #include "utils/VeloxArrowUtils.h"
 
@@ -32,14 +32,14 @@
 #include "velox/dwio/common/Options.h"
 
 namespace gluten {
-class VeloxParquetDatasourceGCS final : public VeloxParquetDatasource {
+class VeloxParquetDataSourceGCS final : public VeloxParquetDataSource {
  public:
-  VeloxParquetDatasourceGCS(
+  VeloxParquetDataSourceGCS(
       const std::string& filePath,
       std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
       std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
       std::shared_ptr<arrow::Schema> schema)
-      : VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}
+      : VeloxParquetDataSource(filePath, veloxPool, sinkPool, schema) {}
 
   void initSink(const std::unordered_map<std::string, std::string>& /* 
sparkConfs */) override {
     auto fileSystem = filesystems::getFileSystem(filePath_, nullptr);
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h 
b/cpp/velox/operators/writer/VeloxParquetDataSourceHDFS.h
similarity index 88%
rename from cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h
rename to cpp/velox/operators/writer/VeloxParquetDataSourceHDFS.h
index 19e9e35606..053b3da2ff 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasourceHDFS.h
+++ b/cpp/velox/operators/writer/VeloxParquetDataSourceHDFS.h
@@ -17,7 +17,7 @@
 
 #pragma once
 
-#include "operators/writer/VeloxParquetDatasource.h"
+#include "operators/writer/VeloxParquetDataSource.h"
 #include "utils/ConfigExtractor.h"
 #include "utils/VeloxArrowUtils.h"
 
@@ -33,14 +33,14 @@
 
 namespace gluten {
 
-class VeloxParquetDatasourceHDFS final : public VeloxParquetDatasource {
+class VeloxParquetDataSourceHDFS final : public VeloxParquetDataSource {
  public:
-  VeloxParquetDatasourceHDFS(
+  VeloxParquetDataSourceHDFS(
       const std::string& filePath,
       std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
       std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
       std::shared_ptr<arrow::Schema> schema)
-      : VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}
+      : VeloxParquetDataSource(filePath, veloxPool, sinkPool, schema) {}
 
   void initSink(const std::unordered_map<std::string, std::string>& 
sparkConfs) override {
     auto hiveConf = 
getHiveConfig(std::make_shared<facebook::velox::config::ConfigBase>(
diff --git a/cpp/velox/operators/writer/VeloxParquetDatasourceS3.h 
b/cpp/velox/operators/writer/VeloxParquetDataSourceS3.h
similarity index 89%
rename from cpp/velox/operators/writer/VeloxParquetDatasourceS3.h
rename to cpp/velox/operators/writer/VeloxParquetDataSourceS3.h
index 8219fe42a3..3082f82a91 100644
--- a/cpp/velox/operators/writer/VeloxParquetDatasourceS3.h
+++ b/cpp/velox/operators/writer/VeloxParquetDataSourceS3.h
@@ -17,7 +17,7 @@
 
 #pragma once
 
-#include "operators/writer/VeloxParquetDatasource.h"
+#include "operators/writer/VeloxParquetDataSource.h"
 #include "utils/ConfigExtractor.h"
 #include "utils/VeloxArrowUtils.h"
 
@@ -33,14 +33,14 @@
 
 namespace gluten {
 
-class VeloxParquetDatasourceS3 final : public VeloxParquetDatasource {
+class VeloxParquetDataSourceS3 final : public VeloxParquetDataSource {
  public:
-  VeloxParquetDatasourceS3(
+  VeloxParquetDataSourceS3(
       const std::string& filePath,
       std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
       std::shared_ptr<facebook::velox::memory::MemoryPool> sinkPool,
       std::shared_ptr<arrow::Schema> schema)
-      : VeloxParquetDatasource(filePath, veloxPool, sinkPool, schema) {}
+      : VeloxParquetDataSource(filePath, veloxPool, sinkPool, schema) {}
 
   void initSink(const std::unordered_map<std::string, std::string>& 
sparkConfs) override {
     auto hiveConf = 
getHiveConfig(std::make_shared<facebook::velox::config::ConfigBase>(
diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc
index b2aa3e1a86..0135b35dcd 100644
--- a/cpp/velox/tests/RuntimeTest.cc
+++ b/cpp/velox/tests/RuntimeTest.cc
@@ -77,10 +77,6 @@ class DummyRuntime final : public Runtime {
     static Metrics m(1);
     return &m;
   }
-  std::shared_ptr<Datasource> createDatasource(const std::string& filePath, 
std::shared_ptr<arrow::Schema> schema)
-      override {
-    throw GlutenException("Not yet implemented");
-  }
   std::shared_ptr<ShuffleReader> createShuffleReader(
       std::shared_ptr<arrow::Schema> schema,
       ShuffleReaderOptions options) override {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to