This is an automated email from the ASF dual-hosted git repository.

loneylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 65ad57add7 [Gluten-9254][CH] Support RDDScanExec (#9270)
65ad57add7 is described below

commit 65ad57add72e665b4e07b2d994347c9881d9fc13
Author: Shuai li <loney...@live.cn>
AuthorDate: Tue Apr 15 16:35:27 2025 +0800

    [Gluten-9254][CH] Support RDDScanExec (#9270)
    
    * [Gluten-9254][CH] Support RDDScanExec
---
 .../merge/MergeIntoMaterializeSource.scala         | 499 +++++++++++++++++++++
 .../apache/spark/sql/execution/RDDScanSuite.scala  |  86 ++++
 .../apache/gluten/component/CHKafkaComponent.scala |   2 +
 .../columnar/KafkaMiscColumnarRules.scala          |  47 ++
 .../gluten/vectorized/BlockOutputStream.java       |   2 +
 .../apache/gluten/vectorized/CHNativeBlock.java    |   6 +
 .../apache/gluten/vectorized/CHStreamReader.java   |   2 +
 .../gluten/backendsapi/clickhouse/CHRuleApi.scala  |   1 +
 .../clickhouse/CHSparkPlanExecApi.scala            |  30 +-
 .../extension/CHRemoveTopmostColumnarToRow.scala   |  71 +++
 .../spark/sql/execution/CHRDDScanTransformer.scala | 133 ++++++
 .../execution/GlutenFunctionValidateSuite.scala    |  14 +-
 .../gluten/execution/GlutenNothingValueCheck.scala |  18 +-
 .../GlutenClickhouseFunctionSuite.scala            |  12 +-
 .../GlutenClickHouseTPCHSaltNullParquetSuite.scala |  51 +--
 .../AggregateFunctionDVRoaringBitmap.h             |   1 -
 cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp  |   2 +-
 cpp-ch/local-engine/Shuffle/ShuffleReader.cpp      |  14 +-
 cpp-ch/local-engine/Shuffle/ShuffleReader.h        |  12 +-
 cpp-ch/local-engine/Storages/IO/NativeReader.cpp   |  59 ++-
 cpp-ch/local-engine/Storages/IO/NativeReader.h     |  24 +-
 .../Storages/SubstraitSource/Delta/DeltaWriter.cpp |  15 +-
 .../Storages/SubstraitSource/Delta/DeltaWriter.h   |   2 +-
 cpp-ch/local-engine/local_engine_jni.cpp           |  59 ++-
 .../gluten/extension/caller/CallerInfo.scala       |  22 +-
 .../gluten/backendsapi/SparkPlanExecApi.scala      |  15 +
 .../columnar/offload/OffloadSingleNodeRules.scala  |   4 +
 .../spark/sql/execution/RDDScanTransformer.scala   |  52 +++
 .../execution/datasources/FakeRowEnhancement.scala |  44 ++
 .../datasources/GlutenWriterColumnarRules.scala    |   2 +-
 .../utils/clickhouse/ClickHouseTestSettings.scala  |   1 +
 .../utils/clickhouse/ClickHouseTestSettings.scala  |   3 +
 .../spark/sql/execution/datasources/FakeRow.scala  |   2 +-
 33 files changed, 1198 insertions(+), 109 deletions(-)

diff --git 
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala
 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala
new file mode 100644
index 0000000000..f1efab98df
--- /dev/null
+++ 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.delta.commands.merge
+
+import org.apache.gluten.extension.CHRemoveTopmostColumnarToRow
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression}
+import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.delta.metering.DeltaLogging
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.util.DeltaSparkPlanUtils
+import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog}
+import org.apache.spark.sql.execution.LogicalRDD
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.internal.SQLConf._
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.storage.StorageLevel
+
+import scala.annotation.tailrec
+import scala.util.control.NonFatal
+
+/**
+ * Trait with logic and utilities used for materializing a snapshot of MERGE 
source in case we can't
+ * guarantee deterministic repeated reads from it.
+ *
+ * We materialize source if it is not safe to assume that it's deterministic 
(override with
+ * MERGE_SOURCE_MATERIALIZATION). Otherwise, if source changes between the 
phases of the MERGE, it
+ * can produce wrong results. We use local checkpointing for the 
materialization, which saves the
+ * source as a materialized RDD[InternalRow] on the executor local disks.
+ *
+ * 1st concern is that if an executor is lost, this data can be lost. When 
Spark executor
+ * decommissioning API is used, it should attempt to move this materialized 
data safely out before
+ * removing the executor.
+ *
+ * 2nd concern is that if an executor is lost for another reason (e.g. spot 
kill), we will still
+ * lose that data. To mitigate that, we implement a retry loop. The whole 
Merge operation needs to
+ * be restarted from the beginning in this case. When we retry, we increase 
the replication level of
+ * the materialized data from 1 to 2. (override with
+ * MERGE_SOURCE_MATERIALIZATION_RDD_STORAGE_LEVEL_RETRY). If it still fails 
after the maximum number
+ * of attempts (MERGE_MATERIALIZE_SOURCE_MAX_ATTEMPTS), we record the failure 
for tracking purposes.
+ *
+ * 3rd concern is that executors run out of disk space with the extra 
materialization. We record
+ * such failures for tracking purposes.
+ */
+trait MergeIntoMaterializeSource extends DeltaLogging with DeltaSparkPlanUtils 
{
+
+  import MergeIntoMaterializeSource._
+
+  /**
+   * Prepared Dataframe with source data. If needed, it is materialized, @see 
prepareMergeSource
+   */
+  private var mergeSource: Option[MergeSource] = None
+
+  /** If the source was materialized, reference to the checkpointed RDD. */
+  protected var materializedSourceRDD: Option[RDD[InternalRow]] = None
+
+  /** Track which attempt or retry it is in 
runWithMaterializedSourceAndRetries */
+  protected var attempt: Int = 0
+
+  /**
+   * Run the Merge with retries in case it detects an RDD block lost error of 
the materialized
+   * source RDD. It will also record out of disk error, if such happens - 
possibly because of
+   * increased disk pressure from the materialized source RDD.
+   */
+  protected def runWithMaterializedSourceLostRetries(
+      spark: SparkSession,
+      deltaLog: DeltaLog,
+      metrics: Map[String, SQLMetric],
+      runMergeFunc: SparkSession => Seq[Row]): Seq[Row] = {
+    var doRetry = false
+    var runResult: Seq[Row] = null
+    attempt = 1
+    do {
+      doRetry = false
+      metrics.values.foreach(_.reset())
+      try {
+        runResult = runMergeFunc(spark)
+      } catch {
+        case NonFatal(ex) =>
+          val isLastAttempt =
+            attempt == 
spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_MAX_ATTEMPTS)
+          handleExceptionDuringAttempt(ex, isLastAttempt, deltaLog) match {
+            case RetryHandling.Retry =>
+              logInfo(s"Retrying MERGE with materialized source. Attempt 
$attempt failed.")
+              doRetry = true
+              attempt += 1
+            case RetryHandling.ExhaustedRetries =>
+              logError(
+                s"Exhausted retries after $attempt attempts in MERGE with" +
+                  s" materialized source. Logging latest exception.",
+                ex)
+              throw DeltaErrors.sourceMaterializationFailedRepeatedlyInMerge
+            case RetryHandling.RethrowException =>
+              logError(s"Fatal error in MERGE with materialized source in 
attempt $attempt.", ex)
+              throw ex
+          }
+      } finally {
+        // Remove source from RDD cache (noop if wasn't cached)
+        materializedSourceRDD.foreach(rdd => rdd.unpersist())
+        materializedSourceRDD = None
+        mergeSource = None
+      }
+    } while (doRetry)
+
+    runResult
+  }
+
+  object RetryHandling extends Enumeration {
+    type Result = Value
+
+    val Retry, RethrowException, ExhaustedRetries = Value
+  }
+
+  /**
+   * Handle exception that was thrown from runMerge(). Search for errors to 
log, or that can be
+   * handled by retry. It may need to descend into ex.getCause() to find the 
errors, as Spark may
+   * have wrapped them.
+   * @param isLastAttempt
+   *   indicates that it's the last allowed attempt and there shall be no 
retry.
+   * @return
+   *   true if the exception is handled and merge should retry false if the 
caller should rethrow
+   *   the error
+   */
+  @tailrec
+  private def handleExceptionDuringAttempt(
+      ex: Throwable,
+      isLastAttempt: Boolean,
+      deltaLog: DeltaLog): RetryHandling.Result = ex match {
+    // If Merge failed because the materialized source lost blocks from the
+    // locally checkpointed RDD, we want to retry the whole operation.
+    // If a checkpointed RDD block is lost, it throws
+    // SparkCoreErrors.checkpointRDDBlockIdNotFoundError from 
LocalCheckpointRDD.compute.
+    case s: SparkException
+        if materializedSourceRDD.nonEmpty &&
+          s.getMessage.matches(
+            
mergeMaterializedSourceRddBlockLostErrorRegex(materializedSourceRDD.get.id)) =>
+      log.warn(
+        "Materialized Merge source RDD block lost. Merge needs to be 
restarted. " +
+          s"This was attempt number $attempt.")
+      if (!isLastAttempt) {
+        RetryHandling.Retry
+      } else {
+        // Record situations where we lost RDD materialized source blocks, 
despite retries.
+        recordDeltaEvent(
+          deltaLog,
+          MergeIntoMaterializeSourceError.OP_TYPE,
+          data = MergeIntoMaterializeSourceError(
+            errorType = 
MergeIntoMaterializeSourceErrorType.RDD_BLOCK_LOST.toString,
+            attempt = attempt,
+            materializedSourceRDDStorageLevel = 
materializedSourceRDD.get.getStorageLevel.toString
+          )
+        )
+        RetryHandling.ExhaustedRetries
+      }
+
+    // Record if we ran out of executor disk space when we materialized the 
source.
+    case s: SparkException
+        if materializedSourceRDD.nonEmpty &&
+          s.getMessage.contains("java.io.IOException: No space left on 
device") =>
+      // Record situations where we ran out of disk space, possibly because of 
the space took
+      // by the materialized RDD.
+      recordDeltaEvent(
+        deltaLog,
+        MergeIntoMaterializeSourceError.OP_TYPE,
+        data = MergeIntoMaterializeSourceError(
+          errorType = MergeIntoMaterializeSourceErrorType.OUT_OF_DISK.toString,
+          attempt = attempt,
+          materializedSourceRDDStorageLevel = 
materializedSourceRDD.get.getStorageLevel.toString
+        )
+      )
+      RetryHandling.RethrowException
+
+    // Descend into ex.getCause.
+    // The errors that we are looking for above might have been wrapped inside 
another exception.
+    case NonFatal(ex) if ex.getCause() != null =>
+      handleExceptionDuringAttempt(ex.getCause(), isLastAttempt, deltaLog)
+
+    // Descended to the bottom of the causes without finding a retryable error
+    case _ => RetryHandling.RethrowException
+  }
+
+  private def planContainsIgnoreUnreadableFilesReadOptions(plan: LogicalPlan): 
Boolean = {
+    def relationContainsOptions(relation: BaseRelation): Boolean = {
+      relation match {
+        case hdpRelation: HadoopFsRelation =>
+          
hdpRelation.options.get(FileSourceOptions.IGNORE_CORRUPT_FILES).contains("true")
 ||
+          
hdpRelation.options.get(FileSourceOptions.IGNORE_MISSING_FILES).contains("true")
+        case _ => false
+      }
+    }
+
+    val res = plan.collectFirst {
+      case lr: LogicalRelation if relationContainsOptions(lr.relation) => lr
+    }
+    res.nonEmpty
+  }
+
+  private def ignoreUnreadableFilesConfigsAreSet(
+      plan: LogicalPlan,
+      spark: SparkSession): Boolean = {
+    spark.conf.get(IGNORE_MISSING_FILES) || 
spark.conf.get(IGNORE_CORRUPT_FILES) ||
+    planContainsIgnoreUnreadableFilesReadOptions(plan)
+  }
+
+  /**
+   * @return
+   *   pair of boolean whether source should be materialized and the source 
materialization reason
+   */
+  protected def shouldMaterializeSource(
+      spark: SparkSession,
+      source: LogicalPlan,
+      isInsertOnly: Boolean
+  ): (Boolean, 
MergeIntoMaterializeSourceReason.MergeIntoMaterializeSourceReason) = {
+    val materializeType = spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE)
+    val forceMaterializationWithUnreadableFiles =
+      
spark.conf.get(DeltaSQLConf.MERGE_FORCE_SOURCE_MATERIALIZATION_WITH_UNREADABLE_FILES)
+    import DeltaSQLConf.MergeMaterializeSource._
+    val checkDeterministicOptions =
+      DeltaSparkPlanUtils.CheckDeterministicOptions(allowDeterministicUdf = 
true)
+    materializeType match {
+      case ALL =>
+        (true, MergeIntoMaterializeSourceReason.MATERIALIZE_ALL)
+      case NONE =>
+        (false, MergeIntoMaterializeSourceReason.NOT_MATERIALIZED_NONE)
+      case AUTO =>
+        if (isInsertOnly && 
spark.conf.get(DeltaSQLConf.MERGE_INSERT_ONLY_ENABLED)) {
+          (false, 
MergeIntoMaterializeSourceReason.NOT_MATERIALIZED_AUTO_INSERT_ONLY)
+        } else if (!planContainsOnlyDeltaScans(source)) {
+          (true, 
MergeIntoMaterializeSourceReason.NON_DETERMINISTIC_SOURCE_NON_DELTA)
+        } else if (!planIsDeterministic(source, checkDeterministicOptions)) {
+          (true, 
MergeIntoMaterializeSourceReason.NON_DETERMINISTIC_SOURCE_OPERATORS)
+          // Force source materialization if Spark configs 
IGNORE_CORRUPT_FILES,
+          // IGNORE_MISSING_FILES or file source read options 
FileSourceOptions.IGNORE_CORRUPT_FILES
+          // FileSourceOptions.IGNORE_MISSING_FILES are enabled on the source.
+          // This is done so to prevent irrecoverable data loss or unexpected 
results.
+        } else if (
+          forceMaterializationWithUnreadableFiles &&
+          ignoreUnreadableFilesConfigsAreSet(source, spark)
+        ) {
+          (true, 
MergeIntoMaterializeSourceReason.IGNORE_UNREADABLE_FILES_CONFIGS_ARE_SET)
+        } else if (planContainsUdf(source)) {
+          // Force source materialization if the source contains a User 
Defined Function, even if
+          // the user defined function is marked as deterministic, as it is 
often incorrectly marked
+          // as such.
+          (true, 
MergeIntoMaterializeSourceReason.NON_DETERMINISTIC_SOURCE_WITH_DETERMINISTIC_UDF)
+        } else {
+          (false, MergeIntoMaterializeSourceReason.NOT_MATERIALIZED_AUTO)
+        }
+      case _ =>
+        // If the config is invalidly set, also materialize.
+        (true, MergeIntoMaterializeSourceReason.INVALID_CONFIG)
+    }
+  }
+
+  /**
+   * If source needs to be materialized, prepare the materialized dataframe in 
sourceDF Otherwise,
+   * prepare regular dataframe.
+   * @return
+   *   the source materialization reason
+   */
+  protected def prepareMergeSource(
+      spark: SparkSession,
+      source: LogicalPlan,
+      condition: Expression,
+      matchedClauses: Seq[DeltaMergeIntoMatchedClause],
+      notMatchedClauses: Seq[DeltaMergeIntoNotMatchedClause],
+      isInsertOnly: Boolean): Unit = {
+    val (materialize, materializeReason) =
+      shouldMaterializeSource(spark, source, isInsertOnly)
+
+    // --- modified start
+    val originalRemoveTopmostC2R = 
CHRemoveTopmostColumnarToRow.isRemoveTopmostC2R(spark)
+    try{
+      spark.sparkContext.setLocalProperty(
+        CHRemoveTopmostColumnarToRow.REMOVE_TOPMOST_COLUMNAR_TO_ROW,
+        "true")
+      // --- modified end
+
+      if (!materialize) {
+        // Does not materialize, simply return the dataframe from source plan
+        mergeSource = Some(
+          MergeSource(
+            df = Dataset.ofRows(spark, source),
+            isMaterialized = false,
+            materializeReason = materializeReason
+          )
+        )
+        return
+      }
+
+      val referencedSourceColumns =
+        getReferencedSourceColumns(source, condition, matchedClauses, 
notMatchedClauses)
+      // When we materialize the source, we want to make sure that columns got 
pruned before caching.
+      val sourceWithSelectedColumns = Project(referencedSourceColumns, source)
+      val baseSourcePlanDF = Dataset.ofRows(spark, sourceWithSelectedColumns)
+
+      // Caches the source in RDD cache using localCheckpoint, which cuts away 
the RDD lineage,
+      // which shall ensure that the source cannot be recomputed and thus 
become inconsistent.
+      val checkpointedSourcePlanDF = baseSourcePlanDF
+        // Set eager=false for now, even if we should be doing eager, so that 
we can set the storage
+        // level before executing.
+        .localCheckpoint(eager = false)
+
+      // We have to reach through the crust and into the plan of the 
checkpointed DF
+      // to get the RDD that was actually checkpointed, to be able to 
unpersist it later...
+      var checkpointedPlan = checkpointedSourcePlanDF.queryExecution.analyzed
+      val rdd = checkpointedPlan.asInstanceOf[LogicalRDD].rdd
+      materializedSourceRDD = Some(rdd)
+      rdd.setName("mergeMaterializedSource")
+
+      // We should still keep the hints from the input plan.
+      checkpointedPlan = addHintsToPlan(source, checkpointedPlan)
+
+      mergeSource = Some(
+        MergeSource(
+          df = Dataset.ofRows(spark, checkpointedPlan),
+          isMaterialized = true,
+          materializeReason = materializeReason
+        )
+      )
+
+      // Sets appropriate StorageLevel
+      val storageLevel = StorageLevel.fromString(
+        if (attempt == 1) {
+          
spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_RDD_STORAGE_LEVEL)
+        } else {
+          // If it failed the first time, potentially use a different storage 
level on retry.
+          
spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_RDD_STORAGE_LEVEL_RETRY)
+        }
+      )
+      rdd.persist(storageLevel)
+
+      // WARNING: if eager == false, the source used during the first Spark 
Job that uses this may
+      // still be inconsistent with source materialized afterwards.
+      // This is because doCheckpoint that finalizes the lazy checkpoint is 
called after the Job
+      // that triggered the lazy checkpointing finished.
+      // If blocks were lost during that job, they may still get recomputed 
and changed compared
+      // to how they were used during the execution of the job.
+      if (spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER)) {
+        // Force the evaluation of the `rdd`, since we cannot access 
`doCheckpoint()` from here.
+        rdd
+          .mapPartitions(_ => 
Iterator.empty.asInstanceOf[Iterator[InternalRow]])
+          .foreach((_: InternalRow) => ())
+        assert(rdd.isCheckpointed)
+      }
+
+      logDebug(s"Materializing MERGE with pruned columns 
$referencedSourceColumns.")
+      logDebug(s"Materialized MERGE source 
plan:\n${getMergeSource.df.queryExecution}")
+    } finally {
+      // --- modified start
+      
CHRemoveTopmostColumnarToRow.setRemoveTopmostC2R(originalRemoveTopmostC2R, 
spark)
+      // --- modified end
+    }
+  }
+
+  /** Returns the prepared merge source. */
+  protected def getMergeSource: MergeSource = mergeSource match {
+    case Some(source) => source
+    case None =>
+      throw new IllegalStateException(
+        "mergeSource was not initialized! Call prepareMergeSource before.")
+  }
+
+  private def addHintsToPlan(sourcePlan: LogicalPlan, plan: LogicalPlan): 
LogicalPlan = {
+    val hints = EliminateResolvedHint.extractHintsFromPlan(sourcePlan)._2
+    // This follows similar code in CacheManager from 
https://github.com/apache/spark/pull/24580
+    if (hints.nonEmpty) {
+      // The returned hint list is in top-down order, we should create the 
hint nodes from
+      // right to left.
+      val planWithHints =
+        hints.foldRight[LogicalPlan](plan) {
+          case (hint, p) =>
+            ResolvedHint(p, hint)
+        }
+      planWithHints
+    } else {
+      plan
+    }
+  }
+}
+
+object MergeIntoMaterializeSource {
+  case class MergeSource(
+      df: DataFrame,
+      isMaterialized: Boolean,
+      materializeReason: 
MergeIntoMaterializeSourceReason.MergeIntoMaterializeSourceReason) {
+    assert(
+      !isMaterialized ||
+        
MergeIntoMaterializeSourceReason.MATERIALIZED_REASONS.contains(materializeReason))
+  }
+
+  // This depends on SparkCoreErrors.checkpointRDDBlockIdNotFoundError msg
+  def mergeMaterializedSourceRddBlockLostErrorRegex(rddId: Int): String =
+    s"(?s).*Checkpoint block rdd_${rddId}_[0-9]+ not found!.*"
+
+  /** @return The columns of the source plan that are used in this MERGE */
+  private def getReferencedSourceColumns(
+      source: LogicalPlan,
+      condition: Expression,
+      matchedClauses: Seq[DeltaMergeIntoMatchedClause],
+      notMatchedClauses: Seq[DeltaMergeIntoNotMatchedClause]) = {
+    val conditionCols = condition.references
+    val matchedCondCols = 
matchedClauses.flatMap(_.condition).flatMap(_.references)
+    val notMatchedCondCols = 
notMatchedClauses.flatMap(_.condition).flatMap(_.references)
+    val matchedActionsCols = matchedClauses
+      .flatMap(_.resolvedActions)
+      .flatMap(_.expr.references)
+    val notMatchedActionsCols = notMatchedClauses
+      .flatMap(_.resolvedActions)
+      .flatMap(_.expr.references)
+    val allCols = AttributeSet(
+      conditionCols ++
+        matchedCondCols ++
+        notMatchedCondCols ++
+        matchedActionsCols ++
+        notMatchedActionsCols)
+
+    source.output.filter(allCols.contains)
+  }
+}
+
+/** Enumeration with possible reasons that source may be materialized in a 
MERGE command. */
+object MergeIntoMaterializeSourceReason extends Enumeration {
+  type MergeIntoMaterializeSourceReason = Value
+  // It was determined to not materialize on auto config.
+  val NOT_MATERIALIZED_AUTO = Value("notMaterializedAuto")
+  // Config was set to never materialize source.
+  val NOT_MATERIALIZED_NONE = Value("notMaterializedNone")
+  // Insert only merge is single pass, no need for materialization
+  val NOT_MATERIALIZED_AUTO_INSERT_ONLY = 
Value("notMaterializedAutoInsertOnly")
+  // Config was set to always materialize source.
+  val MATERIALIZE_ALL = Value("materializeAll")
+  // The source query is considered non-deterministic, because it contains a 
non-delta scan.
+  val NON_DETERMINISTIC_SOURCE_NON_DELTA = 
Value("materializeNonDeterministicSourceNonDelta")
+  // The source query is considered non-deterministic, because it contains 
non-deterministic
+  // operators.
+  val NON_DETERMINISTIC_SOURCE_OPERATORS = 
Value("materializeNonDeterministicSourceOperators")
+  // Either spark configs to ignore unreadable files are set or the source 
plan contains relations
+  // with ignore unreadable files options.
+  val IGNORE_UNREADABLE_FILES_CONFIGS_ARE_SET =
+    Value("materializeIgnoreUnreadableFilesConfigsAreSet")
+  // The source query is considered non-determistic because it contains a User 
Defined Function.
+  val NON_DETERMINISTIC_SOURCE_WITH_DETERMINISTIC_UDF =
+    Value("materializeNonDeterministicSourceWithDeterministicUdf")
+  // Materialize when the configuration is invalid
+  val INVALID_CONFIG = Value("invalidConfigurationFailsafe")
+  // Catch-all case.
+  val UNKNOWN = Value("unknown")
+
+  // Set of reasons that result in source materialization.
+  final val MATERIALIZED_REASONS: Set[MergeIntoMaterializeSourceReason] = Set(
+    MATERIALIZE_ALL,
+    NON_DETERMINISTIC_SOURCE_NON_DELTA,
+    NON_DETERMINISTIC_SOURCE_OPERATORS,
+    IGNORE_UNREADABLE_FILES_CONFIGS_ARE_SET,
+    NON_DETERMINISTIC_SOURCE_WITH_DETERMINISTIC_UDF,
+    INVALID_CONFIG
+  )
+}
+
+/**
+ * Structure with data for "delta.dml.merge.materializeSourceError" event. 
Note: We log only errors
+ * that we want to track (out of disk or lost RDD blocks).
+ */
+case class MergeIntoMaterializeSourceError(
+    errorType: String,
+    attempt: Int,
+    materializedSourceRDDStorageLevel: String
+)
+
+object MergeIntoMaterializeSourceError {
+  val OP_TYPE = "delta.dml.merge.materializeSourceError"
+}
+
+object MergeIntoMaterializeSourceErrorType extends Enumeration {
+  type MergeIntoMaterializeSourceError = Value
+  val RDD_BLOCK_LOST = Value("materializeSourceRDDBlockLostRetriesFailure")
+  val OUT_OF_DISK = Value("materializeSourceOutOfDiskFailure")
+}
diff --git 
a/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/sql/execution/RDDScanSuite.scala
 
b/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/sql/execution/RDDScanSuite.scala
new file mode 100644
index 0000000000..ab9479ff66
--- /dev/null
+++ 
b/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/sql/execution/RDDScanSuite.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.execution._
+import org.apache.gluten.extension.CHRemoveTopmostColumnarToRow
+
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+
+// Some sqls' line length exceeds 100
+// scalastyle:off line.size.limit
+
+class RDDScanSuite extends GlutenClickHouseTPCHAbstractSuite with 
AdaptiveSparkPlanHelper {
+
+  override protected val needCopyParquetToTablePath = true
+  override protected val tablesPath: String = basePath + "/tpch-data"
+  override protected val tpchQueries: String = rootPath + 
"queries/tpch-queries-ch"
+  override protected val queriesResults: String = rootPath + "queries-output"
+
+  override protected def createTPCHNotNullTables(): Unit = {
+    createNotNullTPCHTablesInParquet(tablesPath)
+  }
+
+  test("test RDDScanTransform") {
+    val test_sql =
+      """
+        |SELECT
+        |    l_returnflag,
+        |    l_linestatus,
+        |    sum(l_quantity) AS sum_qty
+        |FROM
+        |    lineitem
+        |WHERE
+        |    l_shipdate <= date'1998-09-02' - interval 1 day
+        |GROUP BY
+        |    l_returnflag,
+        |    l_linestatus
+        |""".stripMargin
+
+    val expectedAnswer = sql(test_sql).collect()
+
+    spark.sparkContext.setLocalProperty(
+      CHRemoveTopmostColumnarToRow.REMOVE_TOPMOST_COLUMNAR_TO_ROW,
+      "true")
+    val data = sql(test_sql)
+    val node = LogicalRDD.fromDataset(
+      rdd = data.queryExecution.toRdd,
+      originDataset = data,
+      isStreaming = false)
+
+    spark.sparkContext.setLocalProperty(
+      CHRemoveTopmostColumnarToRow.REMOVE_TOPMOST_COLUMNAR_TO_ROW,
+      "false")
+    val df = Dataset.ofRows(data.sparkSession, node).toDF()
+    checkAnswer(df, expectedAnswer)
+
+    var cnt = df.queryExecution.executedPlan.collect { case _: 
CHRDDScanTransformer => true }
+    assertResult(1)(cnt.size)
+
+    val data2 = sql(test_sql)
+    val node2 = LogicalRDD.fromDataset(
+      rdd = data2.queryExecution.toRdd,
+      originDataset = data2,
+      isStreaming = false)
+
+    val df2 = Dataset.ofRows(data2.sparkSession, node2).toDF()
+    checkAnswer(df2, expectedAnswer)
+    cnt = df2.queryExecution.executedPlan.collect { case _: 
CHRDDScanTransformer => true }
+    assertResult(1)(cnt.size)
+  }
+}
diff --git 
a/backends-clickhouse/src-kafka/main/scala/org/apache/gluten/component/CHKafkaComponent.scala
 
b/backends-clickhouse/src-kafka/main/scala/org/apache/gluten/component/CHKafkaComponent.scala
index 1e9f0f9d77..f388ed8623 100644
--- 
a/backends-clickhouse/src-kafka/main/scala/org/apache/gluten/component/CHKafkaComponent.scala
+++ 
b/backends-clickhouse/src-kafka/main/scala/org/apache/gluten/component/CHKafkaComponent.scala
@@ -19,6 +19,7 @@ package org.apache.gluten.component
 
 import org.apache.gluten.backendsapi.clickhouse.CHBackend
 import org.apache.gluten.execution.OffloadKafkaScan
+import 
org.apache.gluten.extension.columnar.KafkaMiscColumnarRules.RemoveStreamingTopmostColumnarToRow
 import org.apache.gluten.extension.injector.Injector
 
 class CHKafkaComponent extends Component {
@@ -28,5 +29,6 @@ class CHKafkaComponent extends Component {
   override def dependencies(): Seq[Class[_ <: Component]] = classOf[CHBackend] 
:: Nil
   override def injectRules(injector: Injector): Unit = {
     OffloadKafkaScan.inject(injector)
+    injector.gluten.legacy.injectPost(c => 
RemoveStreamingTopmostColumnarToRow(c.session, c.caller.isStreaming()))
   }
 }
diff --git 
a/backends-clickhouse/src-kafka/main/scala/org/apache/gluten/extension/columnar/KafkaMiscColumnarRules.scala
 
b/backends-clickhouse/src-kafka/main/scala/org/apache/gluten/extension/columnar/KafkaMiscColumnarRules.scala
new file mode 100644
index 0000000000..38e163cb44
--- /dev/null
+++ 
b/backends-clickhouse/src-kafka/main/scala/org/apache/gluten/extension/columnar/KafkaMiscColumnarRules.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar
+
+import org.apache.gluten.execution.MicroBatchScanExecTransformer
+import org.apache.gluten.extension.columnar.transition.ColumnarToRowLike
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
+
+object KafkaMiscColumnarRules {
+  // Remove topmost columnar-to-row.
+  case class RemoveStreamingTopmostColumnarToRow(session: SparkSession, 
isStreamingPlan: Boolean)
+    extends Rule[SparkPlan] {
+    override def apply(plan: SparkPlan): SparkPlan = {
+      if (
+        !isStreamingPlan || plan.collectFirst { case e: 
MicroBatchScanExecTransformer => e }.isEmpty
+      ) {
+        return plan
+      }
+
+      plan match {
+        case ColumnarToRowLike(child) => wrapperFakeRowAdaptor(child)
+        case other => other
+      }
+    }
+
+    private def wrapperFakeRowAdaptor(plan: SparkPlan): SparkPlan = {
+      FakeRowAdaptor(plan)
+    }
+  }
+}
diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java
index d9006a098d..1885538f9b 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java
@@ -74,6 +74,8 @@ public class BlockOutputStream implements Closeable {
 
   private native void nativeFlush(long instance);
 
+  public static native long directWrite(OutputStream stream, byte[] buf, int 
size, long block);
+
   public void write(ColumnarBatch cb) {
     if (cb.numCols() == 0 || cb.numRows() == 0) return;
     CHNativeBlock block = CHNativeBlock.fromColumnarBatch(cb);
diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
index c8cce61b4f..8cf365335c 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
@@ -77,6 +77,12 @@ public class CHNativeBlock {
     return nativeBlockStats(blockAddress, columnPosition);
   }
 
+  public static native long copyBlock(long blockAddress);
+
+  public ColumnarBatch copyColumnarBatch() {
+    return new CHNativeBlock(copyBlock(blockAddress)).toColumnarBatch();
+  }
+
   public void close() {
     if (blockAddress != 0) {
       nativeClose(blockAddress);
diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHStreamReader.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHStreamReader.java
index 3151adfceb..0e081a099a 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHStreamReader.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHStreamReader.java
@@ -49,6 +49,8 @@ public class CHStreamReader implements AutoCloseable {
 
   private native long nativeNext(long nativeShuffleReader);
 
+  public static native long directRead(InputStream inputStream, byte[] buffer, 
int bufferSize);
+
   public CHNativeBlock next() {
     long block = nativeNext(nativeShuffleReader);
     return new CHNativeBlock(block);
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 374e223ea9..411351e9a4 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -135,6 +135,7 @@ object CHRuleApi {
 
     // Gluten columnar: Post rules.
     injector.injectPost(c => RemoveTopmostColumnarToRow(c.session, 
c.caller.isAqe()))
+    injector.injectPost(c => CHRemoveTopmostColumnarToRow(c.session, 
c.caller.isAqe()))
     SparkShimLoader.getSparkShims
       .getExtendedColumnarPostRules()
       .foreach(each => injector.injectPost(c => intercept(each(c.session))))
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index c77e7bc31b..eecf0588d4 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -27,7 +27,7 @@ import 
org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.expression.{ExpressionBuilder, 
ExpressionNode, WindowFunctionNode}
 import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy}
-import org.apache.gluten.vectorized.CHColumnarBatchSerializer
+import org.apache.gluten.vectorized.{BlockOutputStream, 
CHColumnarBatchSerializer, CHNativeBlock, CHStreamReader}
 
 import org.apache.spark.ShuffleDependency
 import org.apache.spark.internal.Logging
@@ -58,6 +58,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 
 import org.apache.commons.lang3.ClassUtils
 
+import java.io.{ObjectInputStream, ObjectOutputStream}
 import java.lang.{Long => JLong}
 import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
 
@@ -525,6 +526,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
             wrapChild(union)
           case ordered: TakeOrderedAndProjectExecTransformer =>
             wrapChild(ordered)
+          case rddScan: CHRDDScanTransformer =>
+            wrapChild(rddScan)
           case other =>
             throw new GlutenNotSupportException(
               s"Not supported operator ${other.nodeName} for 
BroadcastRelation")
@@ -965,4 +968,29 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
     case co: FlattenedOr => GenericExpressionTransformer(co.name, children, co)
     case _ => super.genFlattenedExpressionTransformer(substraitName, children, 
expr)
   }
+
+  override def isSupportRDDScanExec(plan: RDDScanExec): Boolean = true
+
+  override def getRDDScanTransform(plan: RDDScanExec): RDDScanTransformer =
+    CHRDDScanTransformer.replace(plan)
+
+  override def copyColumnarBatch(batch: ColumnarBatch): ColumnarBatch =
+    CHNativeBlock.fromColumnarBatch(batch).copyColumnarBatch()
+
+  override def serializeColumnarBatch(output: ObjectOutputStream, batch: 
ColumnarBatch): Unit = {
+    val writeBuffer: Array[Byte] =
+      new Array[Byte](CHBackendSettings.customizeBufferSize)
+    BlockOutputStream.directWrite(
+      output,
+      writeBuffer,
+      CHBackendSettings.customizeBufferSize,
+      CHNativeBlock.fromColumnarBatch(batch).blockAddress())
+  }
+
+  override def deserializeColumnarBatch(input: ObjectInputStream): 
ColumnarBatch = {
+    val bufferSize = CHBackendSettings.customizeBufferSize
+    val readBuffer: Array[Byte] = new Array[Byte](bufferSize)
+    val address = CHStreamReader.directRead(input, readBuffer, bufferSize)
+    new CHNativeBlock(address).toColumnarBatch
+  }
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHRemoveTopmostColumnarToRow.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHRemoveTopmostColumnarToRow.scala
new file mode 100644
index 0000000000..d05cbb28ce
--- /dev/null
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHRemoveTopmostColumnarToRow.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.extension.columnar.transition.ColumnarToRowLike
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, 
ShuffleExchangeLike}
+
+// Remove the topmost columnar-to-row conversion
+// Primarily for structured streaming and delta deletion vector
+//
+// Sometimes, the code uses dataFrame.queryExecution.toRdd as the data source.
+//   queryExecution will use columnar-to-row (c2r) and row-to-columnar (r2c)
+//   conversions for the next operation.
+// This rule aims to eliminate the redundant double conversion.
+case class CHRemoveTopmostColumnarToRow(session: SparkSession, 
isAdaptiveContext: Boolean)
+  extends Rule[SparkPlan] {
+  override def apply(plan: SparkPlan): SparkPlan = {
+    val removeTopmostColumnarToRow = 
CHRemoveTopmostColumnarToRow.isRemoveTopmostC2R(session)
+
+    if (!removeTopmostColumnarToRow) {
+      return plan
+    }
+
+    plan match {
+      case shuffleExchangeLike @ ColumnarToRowLike(_: ShuffleExchangeLike) =>
+        shuffleExchangeLike
+      case broadcastExchangeLike @ ColumnarToRowLike(_: BroadcastExchangeLike) 
=>
+        broadcastExchangeLike
+      case broadcastQueryStageExec @ ColumnarToRowLike(_: 
BroadcastQueryStageExec) =>
+        broadcastQueryStageExec
+      case ColumnarToRowLike(child) => wrapperColumnarRowAdaptor(child)
+      case other => other
+    }
+  }
+
+  private def wrapperColumnarRowAdaptor(plan: SparkPlan): SparkPlan = {
+    FakeRowAdaptor(plan)
+  }
+}
+
+object CHRemoveTopmostColumnarToRow {
+  val REMOVE_TOPMOST_COLUMNAR_TO_ROW: String = 
"gluten.removeTopmostColumnarToRow"
+
+  def isRemoveTopmostC2R(spark: SparkSession): Boolean = {
+    
Option(spark.sparkContext.getLocalProperty(REMOVE_TOPMOST_COLUMNAR_TO_ROW)).exists(_.toBoolean)
+  }
+
+  def setRemoveTopmostC2R(value: Boolean, spark: SparkSession): Unit = {
+    spark.sparkContext.setLocalProperty(REMOVE_TOPMOST_COLUMNAR_TO_ROW, 
value.toString)
+  }
+}
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHRDDScanTransformer.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHRDDScanTransformer.scala
new file mode 100644
index 0000000000..70491b295e
--- /dev/null
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHRDDScanTransformer.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.execution.SparkRowIterator
+import org.apache.gluten.expression.ConverterUtils
+import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.vectorized.{CHBlockConverterJniWrapper, CHNativeBlock}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, 
UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.execution.datasources.FakeRow
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+case class CHRDDScanTransformer(
+    outputAttributes: Seq[Attribute],
+    rdd: RDD[InternalRow],
+    name: String,
+    override val outputPartitioning: Partitioning = UnknownPartitioning(0),
+    override val outputOrdering: Seq[SortOrder]
+) extends RDDScanTransformer(outputAttributes, outputPartitioning, 
outputOrdering) {
+
+  override protected def doValidateInternal(): ValidationResult = {
+    output
+      .foreach(attr => ConverterUtils.getTypeNode(attr.dataType, 
attr.nullable))
+    ValidationResult.succeeded
+  }
+
+  override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val localSchema = this.schema
+    val fieldNames = output.map(ConverterUtils.genColumnNameWithExprId).toArray
+    val fieldTypes = output
+      .map(attr => ConverterUtils.getTypeNode(attr.dataType, 
attr.nullable).toProtobuf.toByteArray)
+      .toArray
+
+    rdd.mapPartitions(
+      it => {
+        if (it.hasNext) {
+          val projection = UnsafeProjection.create(localSchema)
+
+          val res: Iterator[ColumnarBatch] = new Iterator[ColumnarBatch] {
+            var sample = false
+            var isBatch = false
+            var byteArrayIterator: Iterator[Array[Byte]] = _
+            private var last_address: Long = 0
+
+            override def hasNext: Boolean = {
+              if (isBatch || !sample) {
+                it.hasNext
+              } else {
+                if (last_address != 0) {
+                  CHBlockConverterJniWrapper.freeBlock(last_address)
+                  last_address = 0
+                }
+                byteArrayIterator.hasNext
+              }
+            }
+
+            override def next(): ColumnarBatch = {
+              if (isBatch) {
+                return it.next().asInstanceOf[FakeRow].batch
+              }
+
+              if (sample) {
+                val slice = byteArrayIterator.take(8192)
+                val sparkRowIterator = new SparkRowIterator(slice)
+                last_address = CHBlockConverterJniWrapper
+                  .convertSparkRowsToCHColumn(sparkRowIterator, fieldNames, 
fieldTypes)
+                return new CHNativeBlock(last_address).toColumnarBatch
+              }
+
+              sample = true
+              val data = it.next()
+              data match {
+                case row: FakeRow =>
+                  isBatch = true
+                  return row.batch
+                case _ =>
+                  byteArrayIterator = it.map {
+                    case u: UnsafeRow => u.getBytes
+                    case i: InternalRow => projection.apply(i).getBytes
+                  }
+
+                  // deal first block
+                  val bytes = data match {
+                    case u: UnsafeRow => u.getBytes
+                    case i: InternalRow => projection.apply(i).getBytes
+                  }
+
+                  val sparkRowIterator = new 
SparkRowIterator(Iterator.apply(bytes))
+                  last_address = CHBlockConverterJniWrapper
+                    .convertSparkRowsToCHColumn(sparkRowIterator, fieldNames, 
fieldTypes)
+                  new CHNativeBlock(last_address).toColumnarBatch
+              }
+            }
+
+          }
+          res
+        } else {
+          Iterator.empty
+        }
+      })
+  }
+
+  override protected def withNewChildrenInternal(newChildren: 
IndexedSeq[SparkPlan]): SparkPlan =
+    copy(outputAttributes, rdd, name, outputPartitioning, outputOrdering)
+}
+
+object CHRDDScanTransformer {
+  def replace(rdd: RDDScanExec): RDDScanTransformer =
+    CHRDDScanTransformer(
+      rdd.output,
+      rdd.inputRDD,
+      rdd.nodeName,
+      rdd.outputPartitioning,
+      rdd.outputOrdering)
+}
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala
index 02e5b34a37..2250f8bfe3 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala
@@ -563,7 +563,7 @@ class GlutenFunctionValidateSuite extends 
GlutenClickHouseWholeStageTransformerS
         | str_to_map('a,b', ',', ''),
         | str_to_map('a:c|b:c', '\\|', ':')
         |""".stripMargin
-    runQueryAndCompare(sql1, true, 
false)(checkGlutenOperatorMatch[ProjectExecTransformer])
+    runQueryAndCompare(sql1, 
true)(checkGlutenOperatorMatch[ProjectExecTransformer])
   }
 
   test("test parse_url") {
@@ -622,14 +622,12 @@ class GlutenFunctionValidateSuite extends 
GlutenClickHouseWholeStageTransformerS
         (ConstantFolding.ruleName + "," + NullPropagation.ruleName)) {
       // Test codec with 'US-ASCII'
       runQueryAndCompare(
-        "SELECT decode(encode('Spark SQL', 'US-ASCII'), 'US-ASCII')",
-        noFallBack = false
+        "SELECT decode(encode('Spark SQL', 'US-ASCII'), 'US-ASCII')"
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
 
       // Test codec with 'UTF-16'
       runQueryAndCompare(
-        "SELECT decode(encode('Spark SQL', 'UTF-16'), 'UTF-16')",
-        noFallBack = false
+        "SELECT decode(encode('Spark SQL', 'UTF-16'), 'UTF-16')"
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
     }
   }
@@ -645,8 +643,7 @@ class GlutenFunctionValidateSuite extends 
GlutenClickHouseWholeStageTransformerS
       SQLConf.OPTIMIZER_EXCLUDED_RULES.key ->
         (ConstantFolding.ruleName + "," + NullPropagation.ruleName)) {
       runQueryAndCompare(
-        "select cast('7.921901' as float), cast('7.921901' as double)",
-        noFallBack = false
+        "select cast('7.921901' as float), cast('7.921901' as double)"
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
     }
   }
@@ -1061,8 +1058,7 @@ class GlutenFunctionValidateSuite extends 
GlutenClickHouseWholeStageTransformerS
       SQLConf.OPTIMIZER_EXCLUDED_RULES.key ->
         (ConstantFolding.ruleName + "," + NullPropagation.ruleName)) {
       runQueryAndCompare(
-        "select cast(' \t2570852431\n' as long), cast('25708\t52431\n' as 
long)",
-        noFallBack = false
+        "select cast(' \t2570852431\n' as long), cast('25708\t52431\n' as 
long)"
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
     }
   }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenNothingValueCheck.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenNothingValueCheck.scala
index 0c4f865b26..c0dfbed7d1 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenNothingValueCheck.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenNothingValueCheck.scala
@@ -109,7 +109,7 @@ class GlutenNothingValueCheck extends 
GlutenClickHouseWholeStageTransformerSuite
       """
         |select array() as x union all select array(123) as x
         |""".stripMargin
-    compareResultsAgainstVanillaSpark(sql, true, { _ => }, false)
+    compareResultsAgainstVanillaSpark(sql, true, { _ => })
   }
 
   test("nothing array: convert nullable to nullable 1") {
@@ -117,7 +117,7 @@ class GlutenNothingValueCheck extends 
GlutenClickHouseWholeStageTransformerSuite
       """
         |select array() as x union all select array(123, null) as x
         |""".stripMargin
-    compareResultsAgainstVanillaSpark(sql, true, { _ => }, false)
+    compareResultsAgainstVanillaSpark(sql, true, { _ => })
   }
 
   test("nothing array: convert nullable to nullable 2") {
@@ -125,7 +125,7 @@ class GlutenNothingValueCheck extends 
GlutenClickHouseWholeStageTransformerSuite
       """
         |select array() as x union all select array(null) as x
         |""".stripMargin
-    compareResultsAgainstVanillaSpark(sql, true, { _ => }, false)
+    compareResultsAgainstVanillaSpark(sql, true, { _ => })
   }
 
   test("nothing array: null array") {
@@ -133,7 +133,7 @@ class GlutenNothingValueCheck extends 
GlutenClickHouseWholeStageTransformerSuite
       """
         |select array(null)
         |""".stripMargin
-    compareResultsAgainstVanillaSpark(sql, true, { _ => }, false)
+    compareResultsAgainstVanillaSpark(sql, true, { _ => })
   }
 
   test("nothing map: convert nullable to non-nullable") {
@@ -141,7 +141,7 @@ class GlutenNothingValueCheck extends 
GlutenClickHouseWholeStageTransformerSuite
       """
         |select map() as x union all select map(123, 456) as x
         |""".stripMargin
-    compareResultsAgainstVanillaSpark(sql, true, { _ => }, false)
+    compareResultsAgainstVanillaSpark(sql, true, { _ => })
   }
 
   test("nothing map: convert nullable to nullable 1") {
@@ -149,7 +149,7 @@ class GlutenNothingValueCheck extends 
GlutenClickHouseWholeStageTransformerSuite
       """
         |select map() as x union all select map(1, null, 2, 23) as x
         |""".stripMargin
-    compareResultsAgainstVanillaSpark(sql, true, { _ => }, false)
+    compareResultsAgainstVanillaSpark(sql, true, { _ => })
   }
 
   test("nothing array in map 1") {
@@ -157,7 +157,7 @@ class GlutenNothingValueCheck extends 
GlutenClickHouseWholeStageTransformerSuite
       """
         |select map(1, null) as x union all select map(1, array(456)) as x
         |""".stripMargin
-    compareResultsAgainstVanillaSpark(sql, true, { _ => }, false)
+    compareResultsAgainstVanillaSpark(sql, true, { _ => })
   }
 
   test("nothing array in map 2") {
@@ -165,7 +165,7 @@ class GlutenNothingValueCheck extends 
GlutenClickHouseWholeStageTransformerSuite
       """
         |select map(1, array()) as x union all select map(1, array(456)) as x
         |""".stripMargin
-    compareResultsAgainstVanillaSpark(sql, true, { _ => }, false)
+    compareResultsAgainstVanillaSpark(sql, true, { _ => })
   }
 
   test("nothing array in map 3") {
@@ -173,7 +173,7 @@ class GlutenNothingValueCheck extends 
GlutenClickHouseWholeStageTransformerSuite
       """
         |select map(1, array()) as x union all select map(1, array(456, null)) 
as x
         |""".stripMargin
-    compareResultsAgainstVanillaSpark(sql, true, { _ => }, false)
+    compareResultsAgainstVanillaSpark(sql, true, { _ => })
   }
 
   test("nothing array in shuffle") {
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala
index 907a5323a3..38d45db098 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala
@@ -187,8 +187,8 @@ class GlutenClickhouseFunctionSuite extends 
GlutenClickHouseTPCHAbstractSuite {
   }
 
   test("array decimal32 CH column to row") {
-    compareResultsAgainstVanillaSpark("SELECT array(1.0, 2.0)", true, { _ => 
}, false)
-    compareResultsAgainstVanillaSpark("SELECT map(1.0, '2', 3.0, '4')", true, 
{ _ => }, false)
+    compareResultsAgainstVanillaSpark("SELECT array(1.0, 2.0)", true, { _ => })
+    compareResultsAgainstVanillaSpark("SELECT map(1.0, '2', 3.0, '4')", true, 
{ _ => })
   }
 
   test("array decimal32 spark row to CH column") {
@@ -281,8 +281,7 @@ class GlutenClickhouseFunctionSuite extends 
GlutenClickHouseTPCHAbstractSuite {
         """
           |select cast(map(1,'2') as string)
           |""".stripMargin,
-        true,
-        false
+        true
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
     }
   }
@@ -443,7 +442,7 @@ class GlutenClickhouseFunctionSuite extends 
GlutenClickHouseTPCHAbstractSuite {
         { _ => }
       )
       val q = "select cast(a as string) from (select array('123',NULL) as a)"
-      compareResultsAgainstVanillaSpark(q, true, { _ => }, false)
+      compareResultsAgainstVanillaSpark(q, true, { _ => })
     }
   }
 
@@ -505,8 +504,7 @@ class GlutenClickhouseFunctionSuite extends 
GlutenClickHouseTPCHAbstractSuite {
             |cast(map(array(1), map("aa", "123\'")) as string),
             |cast(named_struct("a", "test\'", "b", 1) as string),
             |cast(named_struct("a", "test\'", "b", 1, "c", struct("\'test"), 
"d", array('123\'')) as string)
-            |""".stripMargin,
-          noFallBack = false
+            |""".stripMargin
         )(checkGlutenOperatorMatch[ProjectExecTransformer])
       }
     }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
index 18db7e4070..19544e7f0a 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
@@ -422,13 +422,11 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
 
       runQueryAndCompare(
         "select a from (select array_intersect(array(null,1,2,3,null), 
array(3,5,1,null,null)) as arr) " +
-          "lateral view explode(arr) as a order by a",
-        noFallBack = false
+          "lateral view explode(arr) as a order by a"
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
 
       runQueryAndCompare(
-        "select array_intersect(array(null,1,2,3,null), cast(null as 
array<int>))",
-        noFallBack = false
+        "select array_intersect(array(null,1,2,3,null), cast(null as 
array<int>))"
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
 
       runQueryAndCompare(
@@ -449,8 +447,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
       runQueryAndCompare(
         "select array_position(array(1,2,3,null), 1), 
array_position(array(1,2,3,null), null)," +
           "array_position(array(1,2,3,null), 5), array_position(array(1,2,3), 
5), " +
-          "array_position(array(1,2,3), 2), array_position(cast(null as 
array<int>), 1)",
-        noFallBack = false
+          "array_position(array(1,2,3), 2), array_position(cast(null as 
array<int>), 1)"
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
     }
   }
@@ -465,8 +462,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
       runQueryAndCompare(
         "select array_contains(array(1,2,3,null), 1), 
array_contains(array(1,2,3,null), " +
           "cast(null as int)), array_contains(array(1,2,3,null), 5), 
array_contains(array(1,2,3), 5)," +
-          "array_contains(array(1,2,3), 2), array_contains(cast(null as 
array<int>), 1)",
-        noFallBack = false
+          "array_contains(array(1,2,3), 2), array_contains(cast(null as 
array<int>), 1)"
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
     }
   }
@@ -483,8 +479,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
 
       runQueryAndCompare(
-        "select sort_array(array(1,3,2,null)), 
sort_array(array(1,2,3,null),false)",
-        noFallBack = false
+        "select sort_array(array(1,3,2,null)), 
sort_array(array(1,2,3,null),false)"
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
     }
   }
@@ -535,8 +530,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
       SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> (ConstantFolding.ruleName + "," 
+ NullPropagation.ruleName)) {
       runQueryAndCompare(
         "select find_in_set(null, 'a'), find_in_set('a', null), " +
-          "find_in_set('a', 'a,b'), find_in_set('a', 'ab,ab')",
-        noFallBack = false
+          "find_in_set('a', 'a,b'), find_in_set('a', 'ab,ab')"
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
     }
   }
@@ -587,8 +581,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
         "select elt(2, n_comment, n_regionkey) from nation"
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
       runQueryAndCompare(
-        "select elt(null, 'a', 'b'), elt(0, 'a', 'b'), elt(1, 'a', 'b'), 
elt(3, 'a', 'b')",
-        noFallBack = false
+        "select elt(null, 'a', 'b'), elt(0, 'a', 'b'), elt(1, 'a', 'b'), 
elt(3, 'a', 'b')"
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
     }
   }
@@ -601,8 +594,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
       runQueryAndCompare(
         "select array_max(null), array_max(array(null)), array_max(array(1, 2, 
3, null)), " +
-          "array_max(array(1.0, 2.0, 3.0, null)), array_max(array('z', 't', 
'abc'))",
-        noFallBack = false
+          "array_max(array(1.0, 2.0, 3.0, null)), array_max(array('z', 't', 
'abc'))"
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
     }
   }
@@ -615,8 +607,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
       runQueryAndCompare(
         "select array_min(null), array_min(array(null)), array_min(array(1, 2, 
3, null)), " +
-          "array_min(array(1.0, 2.0, 3.0, null)), array_min(array('z', 't', 
'abc'))",
-        noFallBack = false
+          "array_min(array(1.0, 2.0, 3.0, null)), array_min(array('z', 't', 
'abc'))"
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
     }
   }
@@ -664,8 +655,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
 
       runQueryAndCompare(
         "select array_distinct(array(1,2,1,2,3)), 
array_distinct(array(null,1,null,1,2,null,3)), " +
-          "array_distinct(array(array(1,null,2), array(1,null,2))), 
array_distinct(null), array_distinct(array(null))",
-        noFallBack = false
+          "array_distinct(array(array(1,null,2), array(1,null,2))), 
array_distinct(null), array_distinct(array(null))"
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
     }
   }
@@ -682,8 +672,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
           "array_union(array(null,1,null,1,2,null,3), 
array(1,null,2,null,3,null,4)), " +
           "array_union(array(array(1,null,2), array(2,null,3)), 
array(array(2,null,3), array(1,null,2))), " +
           "array_union(array(null), array(null)), " +
-          "array_union(cast(null as array<int>), cast(null as array<int>))",
-        noFallBack = false
+          "array_union(cast(null as array<int>), cast(null as array<int>))"
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
     }
   }
@@ -698,8 +687,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
 
       runQueryAndCompare(
         "select shuffle(array(1,2,3,4,5)), shuffle(array(1,3,null,3,4)), 
shuffle(null)",
-        compareResult = false,
-        noFallBack = false
+        compareResult = false
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
     }
   }
@@ -1235,8 +1223,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
       SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> (ConstantFolding.ruleName + "," 
+ NullPropagation.ruleName)) {
       runQueryAndCompare(
         "select sequence(null, 1), sequence(1, null), sequence(1, 3, null), 
sequence(1, 5)," +
-          "sequence(5, 1), sequence(1, 5, 2), sequence(5, 1, -2)",
-        noFallBack = false
+          "sequence(5, 1), sequence(1, 5, 2), sequence(5, 1, -2)"
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
 
       runQueryAndCompare(
@@ -1935,8 +1922,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
       runQueryAndCompare(
         "select concat_ws(null), concat_ws('-'), concat_ws('-', null), 
concat_ws('-', null, null), " +
           "concat_ws(null, 'a'), concat_ws('-', 'a'), concat_ws('-', 'a', 
null), " +
-          "concat_ws('-', 'a', null, 'b', 'c', null, array(null), array('d', 
null), array('f', 'g'))",
-        noFallBack = false
+          "concat_ws('-', 'a', null, 'b', 'c', null, array(null), array('d', 
null), array('f', 'g'))"
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
 
       runQueryAndCompare(
@@ -2195,8 +2181,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
     withSQLConf(
       SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> (ConstantFolding.ruleName + "," 
+ NullPropagation.ruleName)) {
       runQueryAndCompare(
-        "select 1/0f, 1/0.0d",
-        noFallBack = false
+        "select 1/0f, 1/0.0d"
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
 
       runQueryAndCompare(
@@ -2439,8 +2424,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
     withSQLConf(
       SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> (ConstantFolding.ruleName + "," 
+ NullPropagation.ruleName)) {
       runQueryAndCompare(
-        "select trunc('2023-12-06', 'MM'), trunc('2023-12-06', 'YEAR'), 
trunc('2023-12-06', 'WEEK'), trunc('2023-12-06', 'QUARTER')",
-        noFallBack = false
+        "select trunc('2023-12-06', 'MM'), trunc('2023-12-06', 'YEAR'), 
trunc('2023-12-06', 'WEEK'), trunc('2023-12-06', 'QUARTER')"
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
 
       runQueryAndCompare(
@@ -2465,8 +2449,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
     withSQLConf(
       SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> (ConstantFolding.ruleName + "," 
+ NullPropagation.ruleName)) {
       runQueryAndCompare(
-        "select cast('1' as boolean), cast('t' as boolean), cast('all' as 
boolean), cast('f' as boolean)",
-        noFallBack = false
+        "select cast('1' as boolean), cast('t' as boolean), cast('all' as 
boolean), cast('f' as boolean)"
       )(checkGlutenOperatorMatch[ProjectExecTransformer])
     }
   }
diff --git 
a/cpp-ch/local-engine/AggregateFunctions/AggregateFunctionDVRoaringBitmap.h 
b/cpp-ch/local-engine/AggregateFunctions/AggregateFunctionDVRoaringBitmap.h
index de338ce6f0..d3cf296e56 100644
--- a/cpp-ch/local-engine/AggregateFunctions/AggregateFunctionDVRoaringBitmap.h
+++ b/cpp-ch/local-engine/AggregateFunctions/AggregateFunctionDVRoaringBitmap.h
@@ -99,7 +99,6 @@ public:
         auto & to_tuple = assert_cast<DB::ColumnTuple &>(to);
         auto & cardinality = assert_cast<DB::ColumnInt64 
&>(to_tuple.getColumn(0));
         auto & last = assert_cast<DB::ColumnInt64 &>(to_tuple.getColumn(1));
-        auto a = to_tuple.getColumn(2).getDataType();
         auto & bitmap = assert_cast<DB::ColumnString &>(to_tuple.getColumn(2));
         this->data(place).insertResultInto(cardinality, last, bitmap);
     }
diff --git a/cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp 
b/cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp
index a6ce6fd408..18417fb3b7 100644
--- a/cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp
+++ b/cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp
@@ -57,7 +57,7 @@ ALWAYS_INLINE static void writeRowToColumns(const 
std::vector<MutableColumnPtr>
         {
             const StringRef str_ref{spark_row_reader.getStringRef(i)};
             if (str_ref.data == nullptr)
-                columns[i]->insertData(nullptr, str_ref.size);
+                columns[i]->insertDefault();
             else if (!spark_row_reader.isBigEndianInSparkRow(i))
                 columns[i]->insertData(str_ref.data, str_ref.size);
             else
diff --git a/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp 
b/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp
index 143fc19ae8..dcead78ce0 100644
--- a/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp
+++ b/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp
@@ -62,10 +62,10 @@ ShuffleReader::~ShuffleReader()
     input_stream.reset();
 }
 
-jclass ShuffleReader::input_stream_class = nullptr;
-jmethodID ShuffleReader::input_stream_read = nullptr;
+jclass ShuffleReader::shuffle_input_stream_class = nullptr;
+jmethodID ShuffleReader::shuffle_input_stream_read = nullptr;
 
-bool ReadBufferFromJavaInputStream::nextImpl()
+bool ReadBufferFromJavaShuffleInputStream::nextImpl()
 {
     int count = readFromJava();
     if (count > 0)
@@ -73,20 +73,20 @@ bool ReadBufferFromJavaInputStream::nextImpl()
     return count > 0;
 }
 
-int ReadBufferFromJavaInputStream::readFromJava() const
+int ReadBufferFromJavaShuffleInputStream::readFromJava() const
 {
     GET_JNIENV(env)
     jint count = safeCallIntMethod(
-        env, java_in, ShuffleReader::input_stream_read, 
reinterpret_cast<jlong>(internal_buffer.begin()), internal_buffer.size());
+        env, java_in, ShuffleReader::shuffle_input_stream_read, 
reinterpret_cast<jlong>(internal_buffer.begin()), internal_buffer.size());
     CLEAN_JNIENV
     return count;
 }
 
-ReadBufferFromJavaInputStream::ReadBufferFromJavaInputStream(jobject 
input_stream) : java_in(input_stream)
+ReadBufferFromJavaShuffleInputStream::ReadBufferFromJavaShuffleInputStream(jobject
 input_stream) : java_in(input_stream)
 {
 }
 
-ReadBufferFromJavaInputStream::~ReadBufferFromJavaInputStream()
+ReadBufferFromJavaShuffleInputStream::~ReadBufferFromJavaShuffleInputStream()
 {
     GET_JNIENV(env)
     env->DeleteGlobalRef(java_in);
diff --git a/cpp-ch/local-engine/Shuffle/ShuffleReader.h 
b/cpp-ch/local-engine/Shuffle/ShuffleReader.h
index 721a157849..d1d36e176b 100644
--- a/cpp-ch/local-engine/Shuffle/ShuffleReader.h
+++ b/cpp-ch/local-engine/Shuffle/ShuffleReader.h
@@ -31,7 +31,7 @@ class NativeReader;
 namespace local_engine
 {
 void configureCompressedReadBuffer(DB::CompressedReadBuffer & 
compressedReadBuffer);
-class ReadBufferFromJavaInputStream;
+class ReadBufferFromJavaShuffleInputStream;
 class ShuffleReader : BlockIterator
 {
 public:
@@ -39,8 +39,8 @@ public:
         std::unique_ptr<DB::ReadBuffer> in_, bool compressed, Int64 
max_shuffle_read_rows_, Int64 max_shuffle_read_bytes_);
     DB::Block * read();
     ~ShuffleReader();
-    static jclass input_stream_class;
-    static jmethodID input_stream_read;
+    static jclass shuffle_input_stream_class;
+    static jmethodID shuffle_input_stream_read;
 
 private:
     std::unique_ptr<DB::ReadBuffer> in;
@@ -52,11 +52,11 @@ private:
 };
 
 
-class ReadBufferFromJavaInputStream final : public 
DB::BufferWithOwnMemory<DB::ReadBuffer>
+class ReadBufferFromJavaShuffleInputStream final : public 
DB::BufferWithOwnMemory<DB::ReadBuffer>
 {
 public:
-    explicit ReadBufferFromJavaInputStream(jobject input_stream);
-    ~ReadBufferFromJavaInputStream() override;
+    explicit ReadBufferFromJavaShuffleInputStream(jobject input_stream);
+    ~ReadBufferFromJavaShuffleInputStream() override;
 
 private:
     jobject java_in;
diff --git a/cpp-ch/local-engine/Storages/IO/NativeReader.cpp 
b/cpp-ch/local-engine/Storages/IO/NativeReader.cpp
index 7b016eb886..e2f94e7cb7 100644
--- a/cpp-ch/local-engine/Storages/IO/NativeReader.cpp
+++ b/cpp-ch/local-engine/Storages/IO/NativeReader.cpp
@@ -25,17 +25,19 @@
 #include <IO/VarInt.h>
 #include <Storages/IO/AggregateSerializationUtils.h>
 #include <Storages/IO/NativeWriter.h>
+#include <jni/jni_common.h>
 #include <Common/Arena.h>
+#include <Common/JNIUtils.h>
 
 namespace DB
 {
 namespace ErrorCodes
 {
-    extern const int INCORRECT_INDEX;
-    extern const int LOGICAL_ERROR;
-    extern const int CANNOT_READ_ALL_DATA;
-    extern const int INCORRECT_DATA;
-    extern const int TOO_LARGE_ARRAY_SIZE;
+extern const int INCORRECT_INDEX;
+extern const int LOGICAL_ERROR;
+extern const int CANNOT_READ_ALL_DATA;
+extern const int INCORRECT_DATA;
+extern const int TOO_LARGE_ARRAY_SIZE;
 }
 }
 
@@ -68,10 +70,8 @@ DB::Block NativeReader::read()
 
     /// Append small blocks into a large one, could reduce memory allocations 
overhead.
     while (result_block.rows() < max_block_size && result_block.bytes() < 
max_block_bytes)
-    {
         if (!appendNextBlock(result_block))
             break;
-    }
 
     if (result_block.rows())
     {
@@ -85,7 +85,8 @@ DB::Block NativeReader::read()
     return result_block;
 }
 
-static void readFixedSizeAggregateData(DB::ReadBuffer &in, DB::ColumnPtr & 
column, size_t rows, NativeReader::ColumnParseUtil & column_parse_util)
+static void
+readFixedSizeAggregateData(DB::ReadBuffer & in, DB::ColumnPtr & column, size_t 
rows, NativeReader::ColumnParseUtil & column_parse_util)
 {
     ColumnAggregateFunction & real_column = 
typeid_cast<ColumnAggregateFunction &>(*column->assumeMutable());
     auto & arena = real_column.createOrGetArena();
@@ -102,7 +103,8 @@ static void readFixedSizeAggregateData(DB::ReadBuffer &in, 
DB::ColumnPtr & colum
     }
 }
 
-static void readVarSizeAggregateData(DB::ReadBuffer &in, DB::ColumnPtr & 
column, size_t rows, NativeReader::ColumnParseUtil & column_parse_util)
+static void
+readVarSizeAggregateData(DB::ReadBuffer & in, DB::ColumnPtr & column, size_t 
rows, NativeReader::ColumnParseUtil & column_parse_util)
 {
     ColumnAggregateFunction & real_column = 
typeid_cast<ColumnAggregateFunction &>(*column->assumeMutable());
     auto & arena = real_column.createOrGetArena();
@@ -119,7 +121,8 @@ static void readVarSizeAggregateData(DB::ReadBuffer &in, 
DB::ColumnPtr & column,
     }
 }
 
-static void readNormalSimpleData(DB::ReadBuffer &in, DB::ColumnPtr & column, 
size_t rows, NativeReader::ColumnParseUtil & column_parse_util)
+static void
+readNormalSimpleData(DB::ReadBuffer & in, DB::ColumnPtr & column, size_t rows, 
NativeReader::ColumnParseUtil & column_parse_util)
 {
     ISerialization::DeserializeBinaryBulkSettings settings;
     settings.getter = [&](ISerialization::SubstreamPath) -> ReadBuffer * { 
return &in; };
@@ -146,7 +149,7 @@ readNormalComplexData(DB::ReadBuffer & in, DB::ColumnPtr & 
column, size_t rows,
     ISerialization::DeserializeBinaryBulkStatePtr state;
 
     DB::ColumnPtr new_col = column->cloneResized(0);
-    column_parse_util.serializer->deserializeBinaryBulkStatePrefix(settings, 
state ,nullptr);
+    column_parse_util.serializer->deserializeBinaryBulkStatePrefix(settings, 
state, nullptr);
     
column_parse_util.serializer->deserializeBinaryBulkWithMultipleStreams(new_col, 
0, rows, settings, state, nullptr);
     column->assumeMutable()->insertRangeFrom(*new_col, 0, new_col->size());
 }
@@ -201,7 +204,7 @@ DB::Block NativeReader::prepareByFirstBlock()
         /// Data
         ColumnPtr read_column = column.type->createColumn(*serialization);
 
-        if (rows)    /// If no rows, nothing to read.
+        if (rows) /// If no rows, nothing to read.
         {
             if (is_agg_state_type && agg_opt_column)
             {
@@ -272,4 +275,36 @@ bool NativeReader::appendNextBlock(DB::Block & 
result_block)
     return true;
 }
 
+jclass ReadBufferFromJavaInputStream::input_stream_class = nullptr;
+jmethodID ReadBufferFromJavaInputStream::input_stream_read = nullptr;
+
+bool ReadBufferFromJavaInputStream::nextImpl()
+{
+    int count = readFromJava();
+    if (count > 0)
+        working_buffer.resize(count);
+    return count > 0;
+}
+
+int ReadBufferFromJavaInputStream::readFromJava() const
+{
+    GET_JNIENV(env)
+    jint count = safeCallIntMethod(env, input_stream, input_stream_read, 
buffer);
+
+    if (count > 0)
+        env->GetByteArrayRegion(buffer, 0, count, reinterpret_cast<jbyte 
*>(internal_buffer.begin()));
+
+    CLEAN_JNIENV
+    return count;
+}
+
+ReadBufferFromJavaInputStream::ReadBufferFromJavaInputStream(jobject 
input_stream_, jbyteArray buffer_, const size_t buffer_size_)
+    : input_stream(input_stream_), buffer(buffer_), buffer_size(buffer_size_)
+{
+}
+
+ReadBufferFromJavaInputStream::~ReadBufferFromJavaInputStream()
+{
+}
+
 }
diff --git a/cpp-ch/local-engine/Storages/IO/NativeReader.h 
b/cpp-ch/local-engine/Storages/IO/NativeReader.h
index 4e85526e7d..bf8246a623 100644
--- a/cpp-ch/local-engine/Storages/IO/NativeReader.h
+++ b/cpp-ch/local-engine/Storages/IO/NativeReader.h
@@ -16,10 +16,12 @@
  */
 #pragma once
 
-#include <Common/PODArray.h>
+#include <jni.h>
 #include <Core/Block.h>
 #include <Core/Defines.h>
 #include <DataTypes/DataTypeAggregateFunction.h>
+#include <IO/BufferWithOwnMemory.h>
+#include <IO/ReadBuffer.h>
 
 namespace local_engine
 {
@@ -34,14 +36,13 @@ public:
         std::string name;
         DB::SerializationPtr serializer = nullptr;
         size_t avg_value_size_hint = 0;
-        
+
         // for aggregate data
         size_t aggregate_state_size = 0;
         size_t aggregate_state_align = 0;
         DB::AggregateFunctionPtr aggregate_function = nullptr;
 
         std::function<void(DB::ReadBuffer &, DB::ColumnPtr &, size_t, 
ColumnParseUtil &)> parse;
-
     };
 
     NativeReader(
@@ -72,4 +73,21 @@ private:
     bool appendNextBlock(DB::Block & result_block);
 };
 
+class ReadBufferFromJavaInputStream final : public 
DB::BufferWithOwnMemory<DB::ReadBuffer>
+{
+public:
+    static jclass input_stream_class;
+    static jmethodID input_stream_read;
+
+    explicit ReadBufferFromJavaInputStream(jobject input_stream_, jbyteArray 
buffer_, size_t buffer_size_);
+    ~ReadBufferFromJavaInputStream() override;
+
+private:
+    jobject input_stream;
+    size_t buffer_size;
+    jbyteArray buffer;
+    int readFromJava() const;
+    bool nextImpl() override;
+};
+
 }
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp
index 926c3ab215..2997e990b5 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp
@@ -91,10 +91,10 @@ void DeltaWriter::writeDeletionVector(const DB::Block & 
block)
     for (size_t row_idx = 0; row_idx < block.rows(); row_idx++)
     {
         const auto file_path = file_path_columns.column->getDataAt(row_idx);
-        auto bitmap = bitmap_columns.column->getDataAt(row_idx);
+        auto bitmap = bitmap_columns.column->getDataAt(row_idx).toString();
         auto cardinality = cardinality_src_columns.column->get64(row_idx); // 
alisa deletedRowIndexCount
 
-        if (size_of_current_bin > 0 && bitmap.size + size_of_current_bin > 
packing_target_size)
+        if (size_of_current_bin > 0 && bitmap.length() + size_of_current_bin > 
packing_target_size)
         {
             write_buffer->finalize();
             write_buffer = nullptr;
@@ -120,7 +120,7 @@ void DeltaWriter::writeDeletionVector(const DB::Block & 
block)
                 {
                     DeltaDVRoaringBitmapArray existing_bitmap
                         = 
deserializeExistingBitmap(existing_path_or_inline_dv, existing_offset, 
existing_size_in_bytes, table_path);
-                    existing_bitmap.merge(bitmap.toString());
+                    existing_bitmap.merge(bitmap);
                     bitmap = existing_bitmap.serialize();
                     cardinality = existing_bitmap.cardinality();
                 }
@@ -140,12 +140,13 @@ void DeltaWriter::writeDeletionVector(const DB::Block & 
block)
         if (!write_buffer)
             initBinPackage();
 
-        size_of_current_bin = size_of_current_bin + bitmap.size;
-        Int32 bitmap_size = static_cast<Int32>(bitmap.size);
+        Int32 bitmap_size = static_cast<Int32>(bitmap.length());
+        size_of_current_bin = size_of_current_bin + bitmap.length();
+
         DB::writeBinaryBigEndian(bitmap_size, *write_buffer);
 
-        write_buffer->write(bitmap.data, bitmap.size);
-        Int32 checksum_value = static_cast<Int32>(crc32_z(0L, 
reinterpret_cast<const unsigned char *>(bitmap.data), bitmap_size));
+        write_buffer->write(bitmap.c_str(), bitmap_size);
+        Int32 checksum_value = static_cast<Int32>(crc32_z(0L, 
reinterpret_cast<const unsigned char *>(bitmap.c_str()), bitmap_size));
         DB::writeBinaryBigEndian(checksum_value, *write_buffer);
 
         auto dv_descriptor_field
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h
index f6d7372ff4..5a08a33169 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h
@@ -64,7 +64,7 @@ private:
 
     void initBinPackage();
 
-    const DB::ContextPtr & context;
+    DB::ContextPtr context;
     const String table_path;
     const size_t prefix_length;
     const size_t packing_target_size;
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp 
b/cpp-ch/local-engine/local_engine_jni.cpp
index e3ac674360..13b7cf2b54 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -121,7 +121,7 @@ JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/)
     block_stats_class = local_engine::CreateGlobalClassReference(env, 
"Lorg/apache/gluten/vectorized/BlockStats;");
     block_stats_constructor = local_engine::GetMethodID(env, 
block_stats_class, "<init>", "(JZ)V");
 
-    local_engine::ShuffleReader::input_stream_class
+    local_engine::ShuffleReader::shuffle_input_stream_class
         = local_engine::CreateGlobalClassReference(env, 
"Lorg/apache/gluten/vectorized/ShuffleInputStream;");
     local_engine::NativeSplitter::iterator_class
         = local_engine::CreateGlobalClassReference(env, 
"Lorg/apache/gluten/vectorized/IteratorWrapper;");
@@ -134,8 +134,13 @@ JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/)
     local_engine::SourceFromJavaIter::serialized_record_batch_iterator_next
         = local_engine::GetMethodID(env, 
local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, 
"next", "()[B");
 
-    local_engine::ShuffleReader::input_stream_read
-        = local_engine::GetMethodID(env, 
local_engine::ShuffleReader::input_stream_class, "read", "(JJ)J");
+    local_engine::ReadBufferFromJavaInputStream::input_stream_class
+        = local_engine::CreateGlobalClassReference(env, 
"Ljava/io/InputStream;");
+    local_engine::ReadBufferFromJavaInputStream::input_stream_read
+        = local_engine::GetMethodID(env, 
local_engine::ReadBufferFromJavaInputStream::input_stream_class, "read", 
"([B)I");
+
+    local_engine::ShuffleReader::shuffle_input_stream_read
+        = local_engine::GetMethodID(env, 
local_engine::ShuffleReader::shuffle_input_stream_class, "read", "(JJ)J");
 
     local_engine::NativeSplitter::iterator_has_next
         = local_engine::GetMethodID(env, 
local_engine::NativeSplitter::iterator_class, "hasNext", "()Z");
@@ -205,7 +210,7 @@ JNIEXPORT void 
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_n
     env->DeleteGlobalRef(block_stripes_class);
     env->DeleteGlobalRef(split_result_class);
     env->DeleteGlobalRef(block_stats_class);
-    env->DeleteGlobalRef(local_engine::ShuffleReader::input_stream_class);
+    
env->DeleteGlobalRef(local_engine::ShuffleReader::shuffle_input_stream_class);
     env->DeleteGlobalRef(local_engine::NativeSplitter::iterator_class);
     
env->DeleteGlobalRef(local_engine::WriteBufferFromJavaOutputStream::output_stream_class);
     
env->DeleteGlobalRef(local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class);
@@ -536,12 +541,24 @@ 
Java_org_apache_gluten_vectorized_CHNativeBlock_nativeBlockStats(JNIEnv * env, j
     LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
 }
 
+JNIEXPORT jlong
+Java_org_apache_gluten_vectorized_CHNativeBlock_copyBlock(JNIEnv * env, 
jobject obj, jlong block_address)
+{
+    LOCAL_ENGINE_JNI_METHOD_START
+    DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
+
+    auto copied_block = block->cloneWithColumns(block->getColumns());
+    auto a = new DB::Block(copied_block);
+    return reinterpret_cast<jlong>(a);
+    LOCAL_ENGINE_JNI_METHOD_END(env, -1)
+}
+
 JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_CHStreamReader_createNativeShuffleReader(
     JNIEnv * env, jclass /*clazz*/, jobject input_stream, jboolean compressed, 
jlong max_shuffle_read_rows, jlong max_shuffle_read_bytes)
 {
     LOCAL_ENGINE_JNI_METHOD_START
     auto * input = env->NewGlobalRef(input_stream);
-    auto read_buffer = 
std::make_unique<local_engine::ReadBufferFromJavaInputStream>(input);
+    auto read_buffer = 
std::make_unique<local_engine::ReadBufferFromJavaShuffleInputStream>(input);
     auto * shuffle_reader
         = new local_engine::ShuffleReader(std::move(read_buffer), compressed, 
max_shuffle_read_rows, max_shuffle_read_bytes);
     return reinterpret_cast<jlong>(shuffle_reader);
@@ -557,6 +574,19 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_CHStreamReader_nativeNext(JNIE
     LOCAL_ENGINE_JNI_METHOD_END(env, -1)
 }
 
+JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHStreamReader_directRead(
+    JNIEnv * env, jclass /*clazz*/, jobject input_stream, jbyteArray buffer, 
jint buffer_size)
+{
+    LOCAL_ENGINE_JNI_METHOD_START
+    // auto * input = env->NewGlobalRef(input_stream);
+    auto rb = 
std::make_unique<local_engine::ReadBufferFromJavaInputStream>(input_stream, 
buffer, buffer_size);
+    auto reader = std::make_unique<local_engine::NativeReader>(*rb);
+    DB::Block block = reader->read();
+    DB::Block * res = new DB::Block(block);
+    return reinterpret_cast<jlong>(res);
+    LOCAL_ENGINE_JNI_METHOD_END(env, -1)
+}
+
 JNIEXPORT void 
Java_org_apache_gluten_vectorized_CHStreamReader_nativeClose(JNIEnv * env, 
jobject /*obj*/, jlong shuffle_reader)
 {
     LOCAL_ENGINE_JNI_METHOD_START
@@ -1192,6 +1222,25 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_BlockOutputStream_nativeCreate
     LOCAL_ENGINE_JNI_METHOD_END(env, -1)
 }
 
+JNIEXPORT long Java_org_apache_gluten_vectorized_BlockOutputStream_directWrite(
+    JNIEnv * env,
+    jclass,
+    jobject output_stream,
+    jbyteArray buffer,
+    jint customize_buffer_size,
+    jlong block_address)
+{
+    LOCAL_ENGINE_JNI_METHOD_START
+    DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
+    auto wb = 
std::make_shared<local_engine::WriteBufferFromJavaOutputStream>(output_stream, 
buffer, customize_buffer_size);
+    auto native_writer = std::make_unique<local_engine::NativeWriter>(*wb, 
block->cloneEmpty());
+    auto write_size = native_writer->write(*block);
+    native_writer->flush();
+    wb->finalize();
+    return write_size;
+    LOCAL_ENGINE_JNI_METHOD_END(env, -1)
+}
+
 JNIEXPORT void 
Java_org_apache_gluten_vectorized_BlockOutputStream_nativeClose(JNIEnv * env, 
jobject, jlong instance)
 {
     LOCAL_ENGINE_JNI_METHOD_START
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala
index d23d172e46..1307351a43 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.extension.caller
 
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
+import org.apache.spark.sql.execution.streaming.StreamExecution
 
 /**
  * Helper API that stores information about the call site of the columnar 
rule. Specific columnar
@@ -28,6 +29,7 @@ import 
org.apache.spark.sql.execution.columnar.InMemoryRelation
 trait CallerInfo {
   def isAqe(): Boolean
   def isCache(): Boolean
+  def isStreaming(): Boolean
 }
 
 object CallerInfo {
@@ -36,7 +38,11 @@ object CallerInfo {
       override def initialValue(): Option[CallerInfo] = None
     }
 
-  private class Impl(override val isAqe: Boolean, override val isCache: 
Boolean) extends CallerInfo
+  private class Impl(
+      override val isAqe: Boolean,
+      override val isCache: Boolean,
+      override val isStreaming: Boolean
+  ) extends CallerInfo
 
   /*
    * Find the information about the caller that initiated the rule call.
@@ -46,7 +52,10 @@ object CallerInfo {
       return localStorage.get().get
     }
     val stack = Thread.currentThread.getStackTrace
-    new Impl(isAqe = inAqeCall(stack), isCache = inCacheCall(stack))
+    new Impl(
+      isAqe = inAqeCall(stack),
+      isCache = inCacheCall(stack),
+      isStreaming = inStreamingCall(stack))
   }
 
   private def inAqeCall(stack: Seq[StackTraceElement]): Boolean = {
@@ -57,10 +66,15 @@ object CallerInfo {
     stack.exists(_.getClassName.equals(InMemoryRelation.getClass.getName))
   }
 
+  private def inStreamingCall(stack: Seq[StackTraceElement]): Boolean = {
+    
stack.exists(_.getClassName.equals(StreamExecution.getClass.getName.split('$').head))
+  }
+
   /** For testing only. */
-  def withLocalValue[T](isAqe: Boolean, isCache: Boolean)(body: => T): T = {
+  def withLocalValue[T](isAqe: Boolean, isCache: Boolean, isStreaming: Boolean 
= false)(
+      body: => T): T = {
     val prevValue = localStorage.get()
-    val newValue = new Impl(isAqe, isCache)
+    val newValue = new Impl(isAqe, isCache, isStreaming)
     localStorage.set(Some(newValue))
     try {
       body
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index c6672cdbb2..1bb5a255f5 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -45,6 +45,7 @@ import org.apache.spark.sql.hive.HiveUDFTransformer
 import org.apache.spark.sql.types.{DecimalType, LongType, NullType, StructType}
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
+import java.io.{ObjectInputStream, ObjectOutputStream}
 import java.lang.{Long => JLong}
 import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
 
@@ -727,4 +728,18 @@ trait SparkPlanExecApi {
       children: Seq[ExpressionTransformer],
       expr: Expression): ExpressionTransformer =
     GenericExpressionTransformer(substraitName, children, expr)
+
+  def isSupportRDDScanExec(plan: RDDScanExec): Boolean = false
+
+  def getRDDScanTransform(plan: RDDScanExec): RDDScanTransformer =
+    throw new GlutenNotSupportException("RDDScanExec is not supported")
+
+  def copyColumnarBatch(batch: ColumnarBatch): ColumnarBatch =
+    throw new GlutenNotSupportException("Copying ColumnarBatch is not 
supported")
+
+  def serializeColumnarBatch(output: ObjectOutputStream, batch: 
ColumnarBatch): Unit =
+    throw new GlutenNotSupportException("Serialize ColumnarBatch is not 
supported")
+
+  def deserializeColumnarBatch(input: ObjectInputStream): ColumnarBatch =
+    throw new GlutenNotSupportException("Deserialize ColumnarBatch is not 
supported")
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
index 611b2a8ff7..750fc060cd 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, 
BuildRight, BuildSide
 import org.apache.spark.sql.catalyst.plans.logical.Join
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.CollectLimitExec
+import org.apache.spark.sql.execution.RDDScanTransformer
 import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, 
ObjectHashAggregateExec, SortAggregateExec}
 import org.apache.spark.sql.execution.datasources.WriteFilesExec
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
@@ -348,6 +349,9 @@ object OffloadOthers {
             plan.limit,
             plan.child
           )
+        case plan: RDDScanExec if 
RDDScanTransformer.isSupportRDDScanExec(plan) =>
+          logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
+          RDDScanTransformer.getRDDScanTransform(plan)
         case p if !p.isInstanceOf[GlutenPlan] =>
           logDebug(s"Transformation for ${p.getClass} is currently not 
supported.")
           p
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/RDDScanTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/RDDScanTransformer.scala
new file mode 100644
index 0000000000..e3fc728477
--- /dev/null
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/RDDScanTransformer.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.ValidatablePlan
+import org.apache.gluten.extension.columnar.transition.Convention
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+
+abstract class RDDScanTransformer(
+    outputAttributes: Seq[Attribute],
+    override val outputPartitioning: Partitioning = UnknownPartitioning(0),
+    override val outputOrdering: Seq[SortOrder]
+) extends ValidatablePlan {
+
+  override def rowType0(): Convention.RowType = Convention.RowType.None
+  override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
+  override def output: Seq[Attribute] = {
+    outputAttributes
+  }
+
+  override protected def doExecute()
+      : org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
+    throw new UnsupportedOperationException(s"This operator doesn't support 
doExecute().")
+  }
+
+  override def children: Seq[SparkPlan] = Seq.empty
+}
+
+object RDDScanTransformer {
+  def isSupportRDDScanExec(plan: RDDScanExec): Boolean =
+    BackendsApiManager.getSparkPlanExecApiInstance.isSupportRDDScanExec(plan)
+
+  def getRDDScanTransform(plan: RDDScanExec): RDDScanTransformer =
+    BackendsApiManager.getSparkPlanExecApiInstance.getRDDScanTransform(plan)
+}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRowEnhancement.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRowEnhancement.scala
new file mode 100644
index 0000000000..019100234d
--- /dev/null
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRowEnhancement.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+
+class FakeRowEnhancement(@transient private val _batch: ColumnarBatch)
+  extends FakeRow(_batch)
+  with Serializable {
+
+  override def copy(): InternalRow = {
+    val copied = 
BackendsApiManager.getSparkPlanExecApiInstance.copyColumnarBatch(batch)
+    new FakeRowEnhancement(copied)
+  }
+
+  @throws(classOf[IOException])
+  private def writeObject(output: ObjectOutputStream): Unit = {
+    
BackendsApiManager.getSparkPlanExecApiInstance.serializeColumnarBatch(output, 
batch)
+  }
+
+  @throws(classOf[IOException])
+  private def readObject(input: ObjectInputStream): Unit = {
+    batch = 
BackendsApiManager.getSparkPlanExecApiInstance.deserializeColumnarBatch(input)
+  }
+}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
index 54b5a34639..d58026b123 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
@@ -64,7 +64,7 @@ case class FakeRowAdaptor(child: SparkPlan)
   override def rowType0(): Convention.RowType = Convention.RowType.VanillaRow
 
   override protected def doExecute(): RDD[InternalRow] = {
-    doExecuteColumnar().map(cb => new FakeRow(cb))
+    doExecuteColumnar().map(cb => new FakeRowEnhancement(cb))
   }
 
   override def outputOrdering: Seq[SortOrder] = child match {
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index 5601c2d321..0e4666c604 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -483,6 +483,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
     .exclude("column stats collection for null columns")
     .exclude("store and retrieve column stats in different time zones")
     .excludeGlutenTest("store and retrieve column stats in different time 
zones")
+    .excludeCH("statistics collection of a table with zero column")
   enableSuite[GlutenStringFunctionsSuite]
     .exclude("string regex_replace / regex_extract")
     .exclude("string overlay function")
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index 3436be6f85..af8c0ac201 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -1788,6 +1788,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
     .includeCH("SPARK-16371 Do not push down filters when inner name and outer 
name are the same")
     .exclude("filter pushdown - StringPredicate")
     .excludeCH("filter pushdown - StringContains")
+    .excludeCH("SPARK-36866: filter pushdown - year-month interval")
   // avoid Velox compile error
   enableSuite(
     "org.apache.gluten.execution.parquet.GlutenParquetV1FilterSuite2"
@@ -1809,6 +1810,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
     .includeCH("SPARK-16371 Do not push down filters when inner name and outer 
name are the same")
     .exclude("filter pushdown - StringPredicate")
     .excludeCH("filter pushdown - StringContains")
+    .excludeCH("SPARK-36866: filter pushdown - year-month interval")
   enableSuite[GlutenParquetV1PartitionDiscoverySuite]
     .excludeCH("Various partition value types")
     .excludeCH("Various inferred partition value types")
@@ -2049,6 +2051,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
     .excludeCH("store and retrieve column stats in different time zones")
     .excludeCH("SPARK-42777: describe column stats (min, max) for 
timestamp_ntz column")
     .excludeCH("Gluten - store and retrieve column stats in different time 
zones")
+    .excludeCH("statistics collection of a table with zero column")
   enableSuite[GlutenStringExpressionsSuite]
     .excludeCH("StringComparison")
     .excludeCH("Substring")
diff --git 
a/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRow.scala
 
b/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRow.scala
index 0ffd832f3b..679a69334a 100644
--- 
a/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRow.scala
+++ 
b/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRow.scala
@@ -24,7 +24,7 @@ import org.apache.spark.unsafe.types.{CalendarInterval, 
UTF8String}
 
 trait IFakeRowAdaptor
 
-class FakeRow(val batch: ColumnarBatch) extends InternalRow {
+class FakeRow(@transient var batch: ColumnarBatch) extends InternalRow {
   override def numFields: Int = throw new UnsupportedOperationException()
 
   override def setNullAt(i: Int): Unit = throw new 
UnsupportedOperationException()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org
For additional commands, e-mail: commits-h...@gluten.apache.org

Reply via email to