This is an automated email from the ASF dual-hosted git repository. loneylee pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push: new 65ad57add7 [Gluten-9254][CH] Support RDDScanExec (#9270) 65ad57add7 is described below commit 65ad57add72e665b4e07b2d994347c9881d9fc13 Author: Shuai li <loney...@live.cn> AuthorDate: Tue Apr 15 16:35:27 2025 +0800 [Gluten-9254][CH] Support RDDScanExec (#9270) * [Gluten-9254][CH] Support RDDScanExec --- .../merge/MergeIntoMaterializeSource.scala | 499 +++++++++++++++++++++ .../apache/spark/sql/execution/RDDScanSuite.scala | 86 ++++ .../apache/gluten/component/CHKafkaComponent.scala | 2 + .../columnar/KafkaMiscColumnarRules.scala | 47 ++ .../gluten/vectorized/BlockOutputStream.java | 2 + .../apache/gluten/vectorized/CHNativeBlock.java | 6 + .../apache/gluten/vectorized/CHStreamReader.java | 2 + .../gluten/backendsapi/clickhouse/CHRuleApi.scala | 1 + .../clickhouse/CHSparkPlanExecApi.scala | 30 +- .../extension/CHRemoveTopmostColumnarToRow.scala | 71 +++ .../spark/sql/execution/CHRDDScanTransformer.scala | 133 ++++++ .../execution/GlutenFunctionValidateSuite.scala | 14 +- .../gluten/execution/GlutenNothingValueCheck.scala | 18 +- .../GlutenClickhouseFunctionSuite.scala | 12 +- .../GlutenClickHouseTPCHSaltNullParquetSuite.scala | 51 +-- .../AggregateFunctionDVRoaringBitmap.h | 1 - cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp | 2 +- cpp-ch/local-engine/Shuffle/ShuffleReader.cpp | 14 +- cpp-ch/local-engine/Shuffle/ShuffleReader.h | 12 +- cpp-ch/local-engine/Storages/IO/NativeReader.cpp | 59 ++- cpp-ch/local-engine/Storages/IO/NativeReader.h | 24 +- .../Storages/SubstraitSource/Delta/DeltaWriter.cpp | 15 +- .../Storages/SubstraitSource/Delta/DeltaWriter.h | 2 +- cpp-ch/local-engine/local_engine_jni.cpp | 59 ++- .../gluten/extension/caller/CallerInfo.scala | 22 +- .../gluten/backendsapi/SparkPlanExecApi.scala | 15 + .../columnar/offload/OffloadSingleNodeRules.scala | 4 + .../spark/sql/execution/RDDScanTransformer.scala | 52 +++ .../execution/datasources/FakeRowEnhancement.scala | 44 ++ .../datasources/GlutenWriterColumnarRules.scala | 2 +- .../utils/clickhouse/ClickHouseTestSettings.scala | 1 + .../utils/clickhouse/ClickHouseTestSettings.scala | 3 + .../spark/sql/execution/datasources/FakeRow.scala | 2 +- 33 files changed, 1198 insertions(+), 109 deletions(-) diff --git a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala new file mode 100644 index 0000000000..f1efab98df --- /dev/null +++ b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala @@ -0,0 +1,499 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.commands.merge + +import org.apache.gluten.extension.CHRemoveTopmostColumnarToRow +import org.apache.spark.SparkException +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow} +import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression} +import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.util.DeltaSparkPlanUtils +import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog} +import org.apache.spark.sql.execution.LogicalRDD +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.internal.SQLConf._ +import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.storage.StorageLevel + +import scala.annotation.tailrec +import scala.util.control.NonFatal + +/** + * Trait with logic and utilities used for materializing a snapshot of MERGE source in case we can't + * guarantee deterministic repeated reads from it. + * + * We materialize source if it is not safe to assume that it's deterministic (override with + * MERGE_SOURCE_MATERIALIZATION). Otherwise, if source changes between the phases of the MERGE, it + * can produce wrong results. We use local checkpointing for the materialization, which saves the + * source as a materialized RDD[InternalRow] on the executor local disks. + * + * 1st concern is that if an executor is lost, this data can be lost. When Spark executor + * decommissioning API is used, it should attempt to move this materialized data safely out before + * removing the executor. + * + * 2nd concern is that if an executor is lost for another reason (e.g. spot kill), we will still + * lose that data. To mitigate that, we implement a retry loop. The whole Merge operation needs to + * be restarted from the beginning in this case. When we retry, we increase the replication level of + * the materialized data from 1 to 2. (override with + * MERGE_SOURCE_MATERIALIZATION_RDD_STORAGE_LEVEL_RETRY). If it still fails after the maximum number + * of attempts (MERGE_MATERIALIZE_SOURCE_MAX_ATTEMPTS), we record the failure for tracking purposes. + * + * 3rd concern is that executors run out of disk space with the extra materialization. We record + * such failures for tracking purposes. + */ +trait MergeIntoMaterializeSource extends DeltaLogging with DeltaSparkPlanUtils { + + import MergeIntoMaterializeSource._ + + /** + * Prepared Dataframe with source data. If needed, it is materialized, @see prepareMergeSource + */ + private var mergeSource: Option[MergeSource] = None + + /** If the source was materialized, reference to the checkpointed RDD. */ + protected var materializedSourceRDD: Option[RDD[InternalRow]] = None + + /** Track which attempt or retry it is in runWithMaterializedSourceAndRetries */ + protected var attempt: Int = 0 + + /** + * Run the Merge with retries in case it detects an RDD block lost error of the materialized + * source RDD. It will also record out of disk error, if such happens - possibly because of + * increased disk pressure from the materialized source RDD. + */ + protected def runWithMaterializedSourceLostRetries( + spark: SparkSession, + deltaLog: DeltaLog, + metrics: Map[String, SQLMetric], + runMergeFunc: SparkSession => Seq[Row]): Seq[Row] = { + var doRetry = false + var runResult: Seq[Row] = null + attempt = 1 + do { + doRetry = false + metrics.values.foreach(_.reset()) + try { + runResult = runMergeFunc(spark) + } catch { + case NonFatal(ex) => + val isLastAttempt = + attempt == spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_MAX_ATTEMPTS) + handleExceptionDuringAttempt(ex, isLastAttempt, deltaLog) match { + case RetryHandling.Retry => + logInfo(s"Retrying MERGE with materialized source. Attempt $attempt failed.") + doRetry = true + attempt += 1 + case RetryHandling.ExhaustedRetries => + logError( + s"Exhausted retries after $attempt attempts in MERGE with" + + s" materialized source. Logging latest exception.", + ex) + throw DeltaErrors.sourceMaterializationFailedRepeatedlyInMerge + case RetryHandling.RethrowException => + logError(s"Fatal error in MERGE with materialized source in attempt $attempt.", ex) + throw ex + } + } finally { + // Remove source from RDD cache (noop if wasn't cached) + materializedSourceRDD.foreach(rdd => rdd.unpersist()) + materializedSourceRDD = None + mergeSource = None + } + } while (doRetry) + + runResult + } + + object RetryHandling extends Enumeration { + type Result = Value + + val Retry, RethrowException, ExhaustedRetries = Value + } + + /** + * Handle exception that was thrown from runMerge(). Search for errors to log, or that can be + * handled by retry. It may need to descend into ex.getCause() to find the errors, as Spark may + * have wrapped them. + * @param isLastAttempt + * indicates that it's the last allowed attempt and there shall be no retry. + * @return + * true if the exception is handled and merge should retry false if the caller should rethrow + * the error + */ + @tailrec + private def handleExceptionDuringAttempt( + ex: Throwable, + isLastAttempt: Boolean, + deltaLog: DeltaLog): RetryHandling.Result = ex match { + // If Merge failed because the materialized source lost blocks from the + // locally checkpointed RDD, we want to retry the whole operation. + // If a checkpointed RDD block is lost, it throws + // SparkCoreErrors.checkpointRDDBlockIdNotFoundError from LocalCheckpointRDD.compute. + case s: SparkException + if materializedSourceRDD.nonEmpty && + s.getMessage.matches( + mergeMaterializedSourceRddBlockLostErrorRegex(materializedSourceRDD.get.id)) => + log.warn( + "Materialized Merge source RDD block lost. Merge needs to be restarted. " + + s"This was attempt number $attempt.") + if (!isLastAttempt) { + RetryHandling.Retry + } else { + // Record situations where we lost RDD materialized source blocks, despite retries. + recordDeltaEvent( + deltaLog, + MergeIntoMaterializeSourceError.OP_TYPE, + data = MergeIntoMaterializeSourceError( + errorType = MergeIntoMaterializeSourceErrorType.RDD_BLOCK_LOST.toString, + attempt = attempt, + materializedSourceRDDStorageLevel = materializedSourceRDD.get.getStorageLevel.toString + ) + ) + RetryHandling.ExhaustedRetries + } + + // Record if we ran out of executor disk space when we materialized the source. + case s: SparkException + if materializedSourceRDD.nonEmpty && + s.getMessage.contains("java.io.IOException: No space left on device") => + // Record situations where we ran out of disk space, possibly because of the space took + // by the materialized RDD. + recordDeltaEvent( + deltaLog, + MergeIntoMaterializeSourceError.OP_TYPE, + data = MergeIntoMaterializeSourceError( + errorType = MergeIntoMaterializeSourceErrorType.OUT_OF_DISK.toString, + attempt = attempt, + materializedSourceRDDStorageLevel = materializedSourceRDD.get.getStorageLevel.toString + ) + ) + RetryHandling.RethrowException + + // Descend into ex.getCause. + // The errors that we are looking for above might have been wrapped inside another exception. + case NonFatal(ex) if ex.getCause() != null => + handleExceptionDuringAttempt(ex.getCause(), isLastAttempt, deltaLog) + + // Descended to the bottom of the causes without finding a retryable error + case _ => RetryHandling.RethrowException + } + + private def planContainsIgnoreUnreadableFilesReadOptions(plan: LogicalPlan): Boolean = { + def relationContainsOptions(relation: BaseRelation): Boolean = { + relation match { + case hdpRelation: HadoopFsRelation => + hdpRelation.options.get(FileSourceOptions.IGNORE_CORRUPT_FILES).contains("true") || + hdpRelation.options.get(FileSourceOptions.IGNORE_MISSING_FILES).contains("true") + case _ => false + } + } + + val res = plan.collectFirst { + case lr: LogicalRelation if relationContainsOptions(lr.relation) => lr + } + res.nonEmpty + } + + private def ignoreUnreadableFilesConfigsAreSet( + plan: LogicalPlan, + spark: SparkSession): Boolean = { + spark.conf.get(IGNORE_MISSING_FILES) || spark.conf.get(IGNORE_CORRUPT_FILES) || + planContainsIgnoreUnreadableFilesReadOptions(plan) + } + + /** + * @return + * pair of boolean whether source should be materialized and the source materialization reason + */ + protected def shouldMaterializeSource( + spark: SparkSession, + source: LogicalPlan, + isInsertOnly: Boolean + ): (Boolean, MergeIntoMaterializeSourceReason.MergeIntoMaterializeSourceReason) = { + val materializeType = spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE) + val forceMaterializationWithUnreadableFiles = + spark.conf.get(DeltaSQLConf.MERGE_FORCE_SOURCE_MATERIALIZATION_WITH_UNREADABLE_FILES) + import DeltaSQLConf.MergeMaterializeSource._ + val checkDeterministicOptions = + DeltaSparkPlanUtils.CheckDeterministicOptions(allowDeterministicUdf = true) + materializeType match { + case ALL => + (true, MergeIntoMaterializeSourceReason.MATERIALIZE_ALL) + case NONE => + (false, MergeIntoMaterializeSourceReason.NOT_MATERIALIZED_NONE) + case AUTO => + if (isInsertOnly && spark.conf.get(DeltaSQLConf.MERGE_INSERT_ONLY_ENABLED)) { + (false, MergeIntoMaterializeSourceReason.NOT_MATERIALIZED_AUTO_INSERT_ONLY) + } else if (!planContainsOnlyDeltaScans(source)) { + (true, MergeIntoMaterializeSourceReason.NON_DETERMINISTIC_SOURCE_NON_DELTA) + } else if (!planIsDeterministic(source, checkDeterministicOptions)) { + (true, MergeIntoMaterializeSourceReason.NON_DETERMINISTIC_SOURCE_OPERATORS) + // Force source materialization if Spark configs IGNORE_CORRUPT_FILES, + // IGNORE_MISSING_FILES or file source read options FileSourceOptions.IGNORE_CORRUPT_FILES + // FileSourceOptions.IGNORE_MISSING_FILES are enabled on the source. + // This is done so to prevent irrecoverable data loss or unexpected results. + } else if ( + forceMaterializationWithUnreadableFiles && + ignoreUnreadableFilesConfigsAreSet(source, spark) + ) { + (true, MergeIntoMaterializeSourceReason.IGNORE_UNREADABLE_FILES_CONFIGS_ARE_SET) + } else if (planContainsUdf(source)) { + // Force source materialization if the source contains a User Defined Function, even if + // the user defined function is marked as deterministic, as it is often incorrectly marked + // as such. + (true, MergeIntoMaterializeSourceReason.NON_DETERMINISTIC_SOURCE_WITH_DETERMINISTIC_UDF) + } else { + (false, MergeIntoMaterializeSourceReason.NOT_MATERIALIZED_AUTO) + } + case _ => + // If the config is invalidly set, also materialize. + (true, MergeIntoMaterializeSourceReason.INVALID_CONFIG) + } + } + + /** + * If source needs to be materialized, prepare the materialized dataframe in sourceDF Otherwise, + * prepare regular dataframe. + * @return + * the source materialization reason + */ + protected def prepareMergeSource( + spark: SparkSession, + source: LogicalPlan, + condition: Expression, + matchedClauses: Seq[DeltaMergeIntoMatchedClause], + notMatchedClauses: Seq[DeltaMergeIntoNotMatchedClause], + isInsertOnly: Boolean): Unit = { + val (materialize, materializeReason) = + shouldMaterializeSource(spark, source, isInsertOnly) + + // --- modified start + val originalRemoveTopmostC2R = CHRemoveTopmostColumnarToRow.isRemoveTopmostC2R(spark) + try{ + spark.sparkContext.setLocalProperty( + CHRemoveTopmostColumnarToRow.REMOVE_TOPMOST_COLUMNAR_TO_ROW, + "true") + // --- modified end + + if (!materialize) { + // Does not materialize, simply return the dataframe from source plan + mergeSource = Some( + MergeSource( + df = Dataset.ofRows(spark, source), + isMaterialized = false, + materializeReason = materializeReason + ) + ) + return + } + + val referencedSourceColumns = + getReferencedSourceColumns(source, condition, matchedClauses, notMatchedClauses) + // When we materialize the source, we want to make sure that columns got pruned before caching. + val sourceWithSelectedColumns = Project(referencedSourceColumns, source) + val baseSourcePlanDF = Dataset.ofRows(spark, sourceWithSelectedColumns) + + // Caches the source in RDD cache using localCheckpoint, which cuts away the RDD lineage, + // which shall ensure that the source cannot be recomputed and thus become inconsistent. + val checkpointedSourcePlanDF = baseSourcePlanDF + // Set eager=false for now, even if we should be doing eager, so that we can set the storage + // level before executing. + .localCheckpoint(eager = false) + + // We have to reach through the crust and into the plan of the checkpointed DF + // to get the RDD that was actually checkpointed, to be able to unpersist it later... + var checkpointedPlan = checkpointedSourcePlanDF.queryExecution.analyzed + val rdd = checkpointedPlan.asInstanceOf[LogicalRDD].rdd + materializedSourceRDD = Some(rdd) + rdd.setName("mergeMaterializedSource") + + // We should still keep the hints from the input plan. + checkpointedPlan = addHintsToPlan(source, checkpointedPlan) + + mergeSource = Some( + MergeSource( + df = Dataset.ofRows(spark, checkpointedPlan), + isMaterialized = true, + materializeReason = materializeReason + ) + ) + + // Sets appropriate StorageLevel + val storageLevel = StorageLevel.fromString( + if (attempt == 1) { + spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_RDD_STORAGE_LEVEL) + } else { + // If it failed the first time, potentially use a different storage level on retry. + spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_RDD_STORAGE_LEVEL_RETRY) + } + ) + rdd.persist(storageLevel) + + // WARNING: if eager == false, the source used during the first Spark Job that uses this may + // still be inconsistent with source materialized afterwards. + // This is because doCheckpoint that finalizes the lazy checkpoint is called after the Job + // that triggered the lazy checkpointing finished. + // If blocks were lost during that job, they may still get recomputed and changed compared + // to how they were used during the execution of the job. + if (spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER)) { + // Force the evaluation of the `rdd`, since we cannot access `doCheckpoint()` from here. + rdd + .mapPartitions(_ => Iterator.empty.asInstanceOf[Iterator[InternalRow]]) + .foreach((_: InternalRow) => ()) + assert(rdd.isCheckpointed) + } + + logDebug(s"Materializing MERGE with pruned columns $referencedSourceColumns.") + logDebug(s"Materialized MERGE source plan:\n${getMergeSource.df.queryExecution}") + } finally { + // --- modified start + CHRemoveTopmostColumnarToRow.setRemoveTopmostC2R(originalRemoveTopmostC2R, spark) + // --- modified end + } + } + + /** Returns the prepared merge source. */ + protected def getMergeSource: MergeSource = mergeSource match { + case Some(source) => source + case None => + throw new IllegalStateException( + "mergeSource was not initialized! Call prepareMergeSource before.") + } + + private def addHintsToPlan(sourcePlan: LogicalPlan, plan: LogicalPlan): LogicalPlan = { + val hints = EliminateResolvedHint.extractHintsFromPlan(sourcePlan)._2 + // This follows similar code in CacheManager from https://github.com/apache/spark/pull/24580 + if (hints.nonEmpty) { + // The returned hint list is in top-down order, we should create the hint nodes from + // right to left. + val planWithHints = + hints.foldRight[LogicalPlan](plan) { + case (hint, p) => + ResolvedHint(p, hint) + } + planWithHints + } else { + plan + } + } +} + +object MergeIntoMaterializeSource { + case class MergeSource( + df: DataFrame, + isMaterialized: Boolean, + materializeReason: MergeIntoMaterializeSourceReason.MergeIntoMaterializeSourceReason) { + assert( + !isMaterialized || + MergeIntoMaterializeSourceReason.MATERIALIZED_REASONS.contains(materializeReason)) + } + + // This depends on SparkCoreErrors.checkpointRDDBlockIdNotFoundError msg + def mergeMaterializedSourceRddBlockLostErrorRegex(rddId: Int): String = + s"(?s).*Checkpoint block rdd_${rddId}_[0-9]+ not found!.*" + + /** @return The columns of the source plan that are used in this MERGE */ + private def getReferencedSourceColumns( + source: LogicalPlan, + condition: Expression, + matchedClauses: Seq[DeltaMergeIntoMatchedClause], + notMatchedClauses: Seq[DeltaMergeIntoNotMatchedClause]) = { + val conditionCols = condition.references + val matchedCondCols = matchedClauses.flatMap(_.condition).flatMap(_.references) + val notMatchedCondCols = notMatchedClauses.flatMap(_.condition).flatMap(_.references) + val matchedActionsCols = matchedClauses + .flatMap(_.resolvedActions) + .flatMap(_.expr.references) + val notMatchedActionsCols = notMatchedClauses + .flatMap(_.resolvedActions) + .flatMap(_.expr.references) + val allCols = AttributeSet( + conditionCols ++ + matchedCondCols ++ + notMatchedCondCols ++ + matchedActionsCols ++ + notMatchedActionsCols) + + source.output.filter(allCols.contains) + } +} + +/** Enumeration with possible reasons that source may be materialized in a MERGE command. */ +object MergeIntoMaterializeSourceReason extends Enumeration { + type MergeIntoMaterializeSourceReason = Value + // It was determined to not materialize on auto config. + val NOT_MATERIALIZED_AUTO = Value("notMaterializedAuto") + // Config was set to never materialize source. + val NOT_MATERIALIZED_NONE = Value("notMaterializedNone") + // Insert only merge is single pass, no need for materialization + val NOT_MATERIALIZED_AUTO_INSERT_ONLY = Value("notMaterializedAutoInsertOnly") + // Config was set to always materialize source. + val MATERIALIZE_ALL = Value("materializeAll") + // The source query is considered non-deterministic, because it contains a non-delta scan. + val NON_DETERMINISTIC_SOURCE_NON_DELTA = Value("materializeNonDeterministicSourceNonDelta") + // The source query is considered non-deterministic, because it contains non-deterministic + // operators. + val NON_DETERMINISTIC_SOURCE_OPERATORS = Value("materializeNonDeterministicSourceOperators") + // Either spark configs to ignore unreadable files are set or the source plan contains relations + // with ignore unreadable files options. + val IGNORE_UNREADABLE_FILES_CONFIGS_ARE_SET = + Value("materializeIgnoreUnreadableFilesConfigsAreSet") + // The source query is considered non-determistic because it contains a User Defined Function. + val NON_DETERMINISTIC_SOURCE_WITH_DETERMINISTIC_UDF = + Value("materializeNonDeterministicSourceWithDeterministicUdf") + // Materialize when the configuration is invalid + val INVALID_CONFIG = Value("invalidConfigurationFailsafe") + // Catch-all case. + val UNKNOWN = Value("unknown") + + // Set of reasons that result in source materialization. + final val MATERIALIZED_REASONS: Set[MergeIntoMaterializeSourceReason] = Set( + MATERIALIZE_ALL, + NON_DETERMINISTIC_SOURCE_NON_DELTA, + NON_DETERMINISTIC_SOURCE_OPERATORS, + IGNORE_UNREADABLE_FILES_CONFIGS_ARE_SET, + NON_DETERMINISTIC_SOURCE_WITH_DETERMINISTIC_UDF, + INVALID_CONFIG + ) +} + +/** + * Structure with data for "delta.dml.merge.materializeSourceError" event. Note: We log only errors + * that we want to track (out of disk or lost RDD blocks). + */ +case class MergeIntoMaterializeSourceError( + errorType: String, + attempt: Int, + materializedSourceRDDStorageLevel: String +) + +object MergeIntoMaterializeSourceError { + val OP_TYPE = "delta.dml.merge.materializeSourceError" +} + +object MergeIntoMaterializeSourceErrorType extends Enumeration { + type MergeIntoMaterializeSourceError = Value + val RDD_BLOCK_LOST = Value("materializeSourceRDDBlockLostRetriesFailure") + val OUT_OF_DISK = Value("materializeSourceOutOfDiskFailure") +} diff --git a/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/sql/execution/RDDScanSuite.scala b/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/sql/execution/RDDScanSuite.scala new file mode 100644 index 0000000000..ab9479ff66 --- /dev/null +++ b/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/sql/execution/RDDScanSuite.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.execution._ +import org.apache.gluten.extension.CHRemoveTopmostColumnarToRow + +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper + +// Some sqls' line length exceeds 100 +// scalastyle:off line.size.limit + +class RDDScanSuite extends GlutenClickHouseTPCHAbstractSuite with AdaptiveSparkPlanHelper { + + override protected val needCopyParquetToTablePath = true + override protected val tablesPath: String = basePath + "/tpch-data" + override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch" + override protected val queriesResults: String = rootPath + "queries-output" + + override protected def createTPCHNotNullTables(): Unit = { + createNotNullTPCHTablesInParquet(tablesPath) + } + + test("test RDDScanTransform") { + val test_sql = + """ + |SELECT + | l_returnflag, + | l_linestatus, + | sum(l_quantity) AS sum_qty + |FROM + | lineitem + |WHERE + | l_shipdate <= date'1998-09-02' - interval 1 day + |GROUP BY + | l_returnflag, + | l_linestatus + |""".stripMargin + + val expectedAnswer = sql(test_sql).collect() + + spark.sparkContext.setLocalProperty( + CHRemoveTopmostColumnarToRow.REMOVE_TOPMOST_COLUMNAR_TO_ROW, + "true") + val data = sql(test_sql) + val node = LogicalRDD.fromDataset( + rdd = data.queryExecution.toRdd, + originDataset = data, + isStreaming = false) + + spark.sparkContext.setLocalProperty( + CHRemoveTopmostColumnarToRow.REMOVE_TOPMOST_COLUMNAR_TO_ROW, + "false") + val df = Dataset.ofRows(data.sparkSession, node).toDF() + checkAnswer(df, expectedAnswer) + + var cnt = df.queryExecution.executedPlan.collect { case _: CHRDDScanTransformer => true } + assertResult(1)(cnt.size) + + val data2 = sql(test_sql) + val node2 = LogicalRDD.fromDataset( + rdd = data2.queryExecution.toRdd, + originDataset = data2, + isStreaming = false) + + val df2 = Dataset.ofRows(data2.sparkSession, node2).toDF() + checkAnswer(df2, expectedAnswer) + cnt = df2.queryExecution.executedPlan.collect { case _: CHRDDScanTransformer => true } + assertResult(1)(cnt.size) + } +} diff --git a/backends-clickhouse/src-kafka/main/scala/org/apache/gluten/component/CHKafkaComponent.scala b/backends-clickhouse/src-kafka/main/scala/org/apache/gluten/component/CHKafkaComponent.scala index 1e9f0f9d77..f388ed8623 100644 --- a/backends-clickhouse/src-kafka/main/scala/org/apache/gluten/component/CHKafkaComponent.scala +++ b/backends-clickhouse/src-kafka/main/scala/org/apache/gluten/component/CHKafkaComponent.scala @@ -19,6 +19,7 @@ package org.apache.gluten.component import org.apache.gluten.backendsapi.clickhouse.CHBackend import org.apache.gluten.execution.OffloadKafkaScan +import org.apache.gluten.extension.columnar.KafkaMiscColumnarRules.RemoveStreamingTopmostColumnarToRow import org.apache.gluten.extension.injector.Injector class CHKafkaComponent extends Component { @@ -28,5 +29,6 @@ class CHKafkaComponent extends Component { override def dependencies(): Seq[Class[_ <: Component]] = classOf[CHBackend] :: Nil override def injectRules(injector: Injector): Unit = { OffloadKafkaScan.inject(injector) + injector.gluten.legacy.injectPost(c => RemoveStreamingTopmostColumnarToRow(c.session, c.caller.isStreaming())) } } diff --git a/backends-clickhouse/src-kafka/main/scala/org/apache/gluten/extension/columnar/KafkaMiscColumnarRules.scala b/backends-clickhouse/src-kafka/main/scala/org/apache/gluten/extension/columnar/KafkaMiscColumnarRules.scala new file mode 100644 index 0000000000..38e163cb44 --- /dev/null +++ b/backends-clickhouse/src-kafka/main/scala/org/apache/gluten/extension/columnar/KafkaMiscColumnarRules.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.extension.columnar + +import org.apache.gluten.execution.MicroBatchScanExecTransformer +import org.apache.gluten.extension.columnar.transition.ColumnarToRowLike +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor + +object KafkaMiscColumnarRules { + // Remove topmost columnar-to-row. + case class RemoveStreamingTopmostColumnarToRow(session: SparkSession, isStreamingPlan: Boolean) + extends Rule[SparkPlan] { + override def apply(plan: SparkPlan): SparkPlan = { + if ( + !isStreamingPlan || plan.collectFirst { case e: MicroBatchScanExecTransformer => e }.isEmpty + ) { + return plan + } + + plan match { + case ColumnarToRowLike(child) => wrapperFakeRowAdaptor(child) + case other => other + } + } + + private def wrapperFakeRowAdaptor(plan: SparkPlan): SparkPlan = { + FakeRowAdaptor(plan) + } + } +} diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java index d9006a098d..1885538f9b 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java @@ -74,6 +74,8 @@ public class BlockOutputStream implements Closeable { private native void nativeFlush(long instance); + public static native long directWrite(OutputStream stream, byte[] buf, int size, long block); + public void write(ColumnarBatch cb) { if (cb.numCols() == 0 || cb.numRows() == 0) return; CHNativeBlock block = CHNativeBlock.fromColumnarBatch(cb); diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java index c8cce61b4f..8cf365335c 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java @@ -77,6 +77,12 @@ public class CHNativeBlock { return nativeBlockStats(blockAddress, columnPosition); } + public static native long copyBlock(long blockAddress); + + public ColumnarBatch copyColumnarBatch() { + return new CHNativeBlock(copyBlock(blockAddress)).toColumnarBatch(); + } + public void close() { if (blockAddress != 0) { nativeClose(blockAddress); diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHStreamReader.java b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHStreamReader.java index 3151adfceb..0e081a099a 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHStreamReader.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHStreamReader.java @@ -49,6 +49,8 @@ public class CHStreamReader implements AutoCloseable { private native long nativeNext(long nativeShuffleReader); + public static native long directRead(InputStream inputStream, byte[] buffer, int bufferSize); + public CHNativeBlock next() { long block = nativeNext(nativeShuffleReader); return new CHNativeBlock(block); diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala index 374e223ea9..411351e9a4 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala @@ -135,6 +135,7 @@ object CHRuleApi { // Gluten columnar: Post rules. injector.injectPost(c => RemoveTopmostColumnarToRow(c.session, c.caller.isAqe())) + injector.injectPost(c => CHRemoveTopmostColumnarToRow(c.session, c.caller.isAqe())) SparkShimLoader.getSparkShims .getExtendedColumnarPostRules() .foreach(each => injector.injectPost(c => intercept(each(c.session)))) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index c77e7bc31b..eecf0588d4 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -27,7 +27,7 @@ import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode, WindowFunctionNode} import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy} -import org.apache.gluten.vectorized.CHColumnarBatchSerializer +import org.apache.gluten.vectorized.{BlockOutputStream, CHColumnarBatchSerializer, CHNativeBlock, CHStreamReader} import org.apache.spark.ShuffleDependency import org.apache.spark.internal.Logging @@ -58,6 +58,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.commons.lang3.ClassUtils +import java.io.{ObjectInputStream, ObjectOutputStream} import java.lang.{Long => JLong} import java.util.{ArrayList => JArrayList, List => JList, Map => JMap} @@ -525,6 +526,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { wrapChild(union) case ordered: TakeOrderedAndProjectExecTransformer => wrapChild(ordered) + case rddScan: CHRDDScanTransformer => + wrapChild(rddScan) case other => throw new GlutenNotSupportException( s"Not supported operator ${other.nodeName} for BroadcastRelation") @@ -965,4 +968,29 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { case co: FlattenedOr => GenericExpressionTransformer(co.name, children, co) case _ => super.genFlattenedExpressionTransformer(substraitName, children, expr) } + + override def isSupportRDDScanExec(plan: RDDScanExec): Boolean = true + + override def getRDDScanTransform(plan: RDDScanExec): RDDScanTransformer = + CHRDDScanTransformer.replace(plan) + + override def copyColumnarBatch(batch: ColumnarBatch): ColumnarBatch = + CHNativeBlock.fromColumnarBatch(batch).copyColumnarBatch() + + override def serializeColumnarBatch(output: ObjectOutputStream, batch: ColumnarBatch): Unit = { + val writeBuffer: Array[Byte] = + new Array[Byte](CHBackendSettings.customizeBufferSize) + BlockOutputStream.directWrite( + output, + writeBuffer, + CHBackendSettings.customizeBufferSize, + CHNativeBlock.fromColumnarBatch(batch).blockAddress()) + } + + override def deserializeColumnarBatch(input: ObjectInputStream): ColumnarBatch = { + val bufferSize = CHBackendSettings.customizeBufferSize + val readBuffer: Array[Byte] = new Array[Byte](bufferSize) + val address = CHStreamReader.directRead(input, readBuffer, bufferSize) + new CHNativeBlock(address).toColumnarBatch + } } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHRemoveTopmostColumnarToRow.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHRemoveTopmostColumnarToRow.scala new file mode 100644 index 0000000000..d05cbb28ce --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHRemoveTopmostColumnarToRow.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.extension + +import org.apache.gluten.extension.columnar.transition.ColumnarToRowLike + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ShuffleExchangeLike} + +// Remove the topmost columnar-to-row conversion +// Primarily for structured streaming and delta deletion vector +// +// Sometimes, the code uses dataFrame.queryExecution.toRdd as the data source. +// queryExecution will use columnar-to-row (c2r) and row-to-columnar (r2c) +// conversions for the next operation. +// This rule aims to eliminate the redundant double conversion. +case class CHRemoveTopmostColumnarToRow(session: SparkSession, isAdaptiveContext: Boolean) + extends Rule[SparkPlan] { + override def apply(plan: SparkPlan): SparkPlan = { + val removeTopmostColumnarToRow = CHRemoveTopmostColumnarToRow.isRemoveTopmostC2R(session) + + if (!removeTopmostColumnarToRow) { + return plan + } + + plan match { + case shuffleExchangeLike @ ColumnarToRowLike(_: ShuffleExchangeLike) => + shuffleExchangeLike + case broadcastExchangeLike @ ColumnarToRowLike(_: BroadcastExchangeLike) => + broadcastExchangeLike + case broadcastQueryStageExec @ ColumnarToRowLike(_: BroadcastQueryStageExec) => + broadcastQueryStageExec + case ColumnarToRowLike(child) => wrapperColumnarRowAdaptor(child) + case other => other + } + } + + private def wrapperColumnarRowAdaptor(plan: SparkPlan): SparkPlan = { + FakeRowAdaptor(plan) + } +} + +object CHRemoveTopmostColumnarToRow { + val REMOVE_TOPMOST_COLUMNAR_TO_ROW: String = "gluten.removeTopmostColumnarToRow" + + def isRemoveTopmostC2R(spark: SparkSession): Boolean = { + Option(spark.sparkContext.getLocalProperty(REMOVE_TOPMOST_COLUMNAR_TO_ROW)).exists(_.toBoolean) + } + + def setRemoveTopmostC2R(value: Boolean, spark: SparkSession): Unit = { + spark.sparkContext.setLocalProperty(REMOVE_TOPMOST_COLUMNAR_TO_ROW, value.toString) + } +} diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHRDDScanTransformer.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHRDDScanTransformer.scala new file mode 100644 index 0000000000..70491b295e --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHRDDScanTransformer.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.execution.SparkRowIterator +import org.apache.gluten.expression.ConverterUtils +import org.apache.gluten.extension.ValidationResult +import org.apache.gluten.vectorized.{CHBlockConverterJniWrapper, CHNativeBlock} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.execution.datasources.FakeRow +import org.apache.spark.sql.vectorized.ColumnarBatch + +case class CHRDDScanTransformer( + outputAttributes: Seq[Attribute], + rdd: RDD[InternalRow], + name: String, + override val outputPartitioning: Partitioning = UnknownPartitioning(0), + override val outputOrdering: Seq[SortOrder] +) extends RDDScanTransformer(outputAttributes, outputPartitioning, outputOrdering) { + + override protected def doValidateInternal(): ValidationResult = { + output + .foreach(attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable)) + ValidationResult.succeeded + } + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val localSchema = this.schema + val fieldNames = output.map(ConverterUtils.genColumnNameWithExprId).toArray + val fieldTypes = output + .map(attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable).toProtobuf.toByteArray) + .toArray + + rdd.mapPartitions( + it => { + if (it.hasNext) { + val projection = UnsafeProjection.create(localSchema) + + val res: Iterator[ColumnarBatch] = new Iterator[ColumnarBatch] { + var sample = false + var isBatch = false + var byteArrayIterator: Iterator[Array[Byte]] = _ + private var last_address: Long = 0 + + override def hasNext: Boolean = { + if (isBatch || !sample) { + it.hasNext + } else { + if (last_address != 0) { + CHBlockConverterJniWrapper.freeBlock(last_address) + last_address = 0 + } + byteArrayIterator.hasNext + } + } + + override def next(): ColumnarBatch = { + if (isBatch) { + return it.next().asInstanceOf[FakeRow].batch + } + + if (sample) { + val slice = byteArrayIterator.take(8192) + val sparkRowIterator = new SparkRowIterator(slice) + last_address = CHBlockConverterJniWrapper + .convertSparkRowsToCHColumn(sparkRowIterator, fieldNames, fieldTypes) + return new CHNativeBlock(last_address).toColumnarBatch + } + + sample = true + val data = it.next() + data match { + case row: FakeRow => + isBatch = true + return row.batch + case _ => + byteArrayIterator = it.map { + case u: UnsafeRow => u.getBytes + case i: InternalRow => projection.apply(i).getBytes + } + + // deal first block + val bytes = data match { + case u: UnsafeRow => u.getBytes + case i: InternalRow => projection.apply(i).getBytes + } + + val sparkRowIterator = new SparkRowIterator(Iterator.apply(bytes)) + last_address = CHBlockConverterJniWrapper + .convertSparkRowsToCHColumn(sparkRowIterator, fieldNames, fieldTypes) + new CHNativeBlock(last_address).toColumnarBatch + } + } + + } + res + } else { + Iterator.empty + } + }) + } + + override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan = + copy(outputAttributes, rdd, name, outputPartitioning, outputOrdering) +} + +object CHRDDScanTransformer { + def replace(rdd: RDDScanExec): RDDScanTransformer = + CHRDDScanTransformer( + rdd.output, + rdd.inputRDD, + rdd.nodeName, + rdd.outputPartitioning, + rdd.outputOrdering) +} diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala index 02e5b34a37..2250f8bfe3 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala @@ -563,7 +563,7 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS | str_to_map('a,b', ',', ''), | str_to_map('a:c|b:c', '\\|', ':') |""".stripMargin - runQueryAndCompare(sql1, true, false)(checkGlutenOperatorMatch[ProjectExecTransformer]) + runQueryAndCompare(sql1, true)(checkGlutenOperatorMatch[ProjectExecTransformer]) } test("test parse_url") { @@ -622,14 +622,12 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS (ConstantFolding.ruleName + "," + NullPropagation.ruleName)) { // Test codec with 'US-ASCII' runQueryAndCompare( - "SELECT decode(encode('Spark SQL', 'US-ASCII'), 'US-ASCII')", - noFallBack = false + "SELECT decode(encode('Spark SQL', 'US-ASCII'), 'US-ASCII')" )(checkGlutenOperatorMatch[ProjectExecTransformer]) // Test codec with 'UTF-16' runQueryAndCompare( - "SELECT decode(encode('Spark SQL', 'UTF-16'), 'UTF-16')", - noFallBack = false + "SELECT decode(encode('Spark SQL', 'UTF-16'), 'UTF-16')" )(checkGlutenOperatorMatch[ProjectExecTransformer]) } } @@ -645,8 +643,7 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> (ConstantFolding.ruleName + "," + NullPropagation.ruleName)) { runQueryAndCompare( - "select cast('7.921901' as float), cast('7.921901' as double)", - noFallBack = false + "select cast('7.921901' as float), cast('7.921901' as double)" )(checkGlutenOperatorMatch[ProjectExecTransformer]) } } @@ -1061,8 +1058,7 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> (ConstantFolding.ruleName + "," + NullPropagation.ruleName)) { runQueryAndCompare( - "select cast(' \t2570852431\n' as long), cast('25708\t52431\n' as long)", - noFallBack = false + "select cast(' \t2570852431\n' as long), cast('25708\t52431\n' as long)" )(checkGlutenOperatorMatch[ProjectExecTransformer]) } } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenNothingValueCheck.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenNothingValueCheck.scala index 0c4f865b26..c0dfbed7d1 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenNothingValueCheck.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenNothingValueCheck.scala @@ -109,7 +109,7 @@ class GlutenNothingValueCheck extends GlutenClickHouseWholeStageTransformerSuite """ |select array() as x union all select array(123) as x |""".stripMargin - compareResultsAgainstVanillaSpark(sql, true, { _ => }, false) + compareResultsAgainstVanillaSpark(sql, true, { _ => }) } test("nothing array: convert nullable to nullable 1") { @@ -117,7 +117,7 @@ class GlutenNothingValueCheck extends GlutenClickHouseWholeStageTransformerSuite """ |select array() as x union all select array(123, null) as x |""".stripMargin - compareResultsAgainstVanillaSpark(sql, true, { _ => }, false) + compareResultsAgainstVanillaSpark(sql, true, { _ => }) } test("nothing array: convert nullable to nullable 2") { @@ -125,7 +125,7 @@ class GlutenNothingValueCheck extends GlutenClickHouseWholeStageTransformerSuite """ |select array() as x union all select array(null) as x |""".stripMargin - compareResultsAgainstVanillaSpark(sql, true, { _ => }, false) + compareResultsAgainstVanillaSpark(sql, true, { _ => }) } test("nothing array: null array") { @@ -133,7 +133,7 @@ class GlutenNothingValueCheck extends GlutenClickHouseWholeStageTransformerSuite """ |select array(null) |""".stripMargin - compareResultsAgainstVanillaSpark(sql, true, { _ => }, false) + compareResultsAgainstVanillaSpark(sql, true, { _ => }) } test("nothing map: convert nullable to non-nullable") { @@ -141,7 +141,7 @@ class GlutenNothingValueCheck extends GlutenClickHouseWholeStageTransformerSuite """ |select map() as x union all select map(123, 456) as x |""".stripMargin - compareResultsAgainstVanillaSpark(sql, true, { _ => }, false) + compareResultsAgainstVanillaSpark(sql, true, { _ => }) } test("nothing map: convert nullable to nullable 1") { @@ -149,7 +149,7 @@ class GlutenNothingValueCheck extends GlutenClickHouseWholeStageTransformerSuite """ |select map() as x union all select map(1, null, 2, 23) as x |""".stripMargin - compareResultsAgainstVanillaSpark(sql, true, { _ => }, false) + compareResultsAgainstVanillaSpark(sql, true, { _ => }) } test("nothing array in map 1") { @@ -157,7 +157,7 @@ class GlutenNothingValueCheck extends GlutenClickHouseWholeStageTransformerSuite """ |select map(1, null) as x union all select map(1, array(456)) as x |""".stripMargin - compareResultsAgainstVanillaSpark(sql, true, { _ => }, false) + compareResultsAgainstVanillaSpark(sql, true, { _ => }) } test("nothing array in map 2") { @@ -165,7 +165,7 @@ class GlutenNothingValueCheck extends GlutenClickHouseWholeStageTransformerSuite """ |select map(1, array()) as x union all select map(1, array(456)) as x |""".stripMargin - compareResultsAgainstVanillaSpark(sql, true, { _ => }, false) + compareResultsAgainstVanillaSpark(sql, true, { _ => }) } test("nothing array in map 3") { @@ -173,7 +173,7 @@ class GlutenNothingValueCheck extends GlutenClickHouseWholeStageTransformerSuite """ |select map(1, array()) as x union all select map(1, array(456, null)) as x |""".stripMargin - compareResultsAgainstVanillaSpark(sql, true, { _ => }, false) + compareResultsAgainstVanillaSpark(sql, true, { _ => }) } test("nothing array in shuffle") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala index 907a5323a3..38d45db098 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala @@ -187,8 +187,8 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite { } test("array decimal32 CH column to row") { - compareResultsAgainstVanillaSpark("SELECT array(1.0, 2.0)", true, { _ => }, false) - compareResultsAgainstVanillaSpark("SELECT map(1.0, '2', 3.0, '4')", true, { _ => }, false) + compareResultsAgainstVanillaSpark("SELECT array(1.0, 2.0)", true, { _ => }) + compareResultsAgainstVanillaSpark("SELECT map(1.0, '2', 3.0, '4')", true, { _ => }) } test("array decimal32 spark row to CH column") { @@ -281,8 +281,7 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite { """ |select cast(map(1,'2') as string) |""".stripMargin, - true, - false + true )(checkGlutenOperatorMatch[ProjectExecTransformer]) } } @@ -443,7 +442,7 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite { { _ => } ) val q = "select cast(a as string) from (select array('123',NULL) as a)" - compareResultsAgainstVanillaSpark(q, true, { _ => }, false) + compareResultsAgainstVanillaSpark(q, true, { _ => }) } } @@ -505,8 +504,7 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite { |cast(map(array(1), map("aa", "123\'")) as string), |cast(named_struct("a", "test\'", "b", 1) as string), |cast(named_struct("a", "test\'", "b", 1, "c", struct("\'test"), "d", array('123\'')) as string) - |""".stripMargin, - noFallBack = false + |""".stripMargin )(checkGlutenOperatorMatch[ProjectExecTransformer]) } } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala index 18db7e4070..19544e7f0a 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala @@ -422,13 +422,11 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr runQueryAndCompare( "select a from (select array_intersect(array(null,1,2,3,null), array(3,5,1,null,null)) as arr) " + - "lateral view explode(arr) as a order by a", - noFallBack = false + "lateral view explode(arr) as a order by a" )(checkGlutenOperatorMatch[ProjectExecTransformer]) runQueryAndCompare( - "select array_intersect(array(null,1,2,3,null), cast(null as array<int>))", - noFallBack = false + "select array_intersect(array(null,1,2,3,null), cast(null as array<int>))" )(checkGlutenOperatorMatch[ProjectExecTransformer]) runQueryAndCompare( @@ -449,8 +447,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr runQueryAndCompare( "select array_position(array(1,2,3,null), 1), array_position(array(1,2,3,null), null)," + "array_position(array(1,2,3,null), 5), array_position(array(1,2,3), 5), " + - "array_position(array(1,2,3), 2), array_position(cast(null as array<int>), 1)", - noFallBack = false + "array_position(array(1,2,3), 2), array_position(cast(null as array<int>), 1)" )(checkGlutenOperatorMatch[ProjectExecTransformer]) } } @@ -465,8 +462,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr runQueryAndCompare( "select array_contains(array(1,2,3,null), 1), array_contains(array(1,2,3,null), " + "cast(null as int)), array_contains(array(1,2,3,null), 5), array_contains(array(1,2,3), 5)," + - "array_contains(array(1,2,3), 2), array_contains(cast(null as array<int>), 1)", - noFallBack = false + "array_contains(array(1,2,3), 2), array_contains(cast(null as array<int>), 1)" )(checkGlutenOperatorMatch[ProjectExecTransformer]) } } @@ -483,8 +479,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr )(checkGlutenOperatorMatch[ProjectExecTransformer]) runQueryAndCompare( - "select sort_array(array(1,3,2,null)), sort_array(array(1,2,3,null),false)", - noFallBack = false + "select sort_array(array(1,3,2,null)), sort_array(array(1,2,3,null),false)" )(checkGlutenOperatorMatch[ProjectExecTransformer]) } } @@ -535,8 +530,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> (ConstantFolding.ruleName + "," + NullPropagation.ruleName)) { runQueryAndCompare( "select find_in_set(null, 'a'), find_in_set('a', null), " + - "find_in_set('a', 'a,b'), find_in_set('a', 'ab,ab')", - noFallBack = false + "find_in_set('a', 'a,b'), find_in_set('a', 'ab,ab')" )(checkGlutenOperatorMatch[ProjectExecTransformer]) } } @@ -587,8 +581,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr "select elt(2, n_comment, n_regionkey) from nation" )(checkGlutenOperatorMatch[ProjectExecTransformer]) runQueryAndCompare( - "select elt(null, 'a', 'b'), elt(0, 'a', 'b'), elt(1, 'a', 'b'), elt(3, 'a', 'b')", - noFallBack = false + "select elt(null, 'a', 'b'), elt(0, 'a', 'b'), elt(1, 'a', 'b'), elt(3, 'a', 'b')" )(checkGlutenOperatorMatch[ProjectExecTransformer]) } } @@ -601,8 +594,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr )(checkGlutenOperatorMatch[ProjectExecTransformer]) runQueryAndCompare( "select array_max(null), array_max(array(null)), array_max(array(1, 2, 3, null)), " + - "array_max(array(1.0, 2.0, 3.0, null)), array_max(array('z', 't', 'abc'))", - noFallBack = false + "array_max(array(1.0, 2.0, 3.0, null)), array_max(array('z', 't', 'abc'))" )(checkGlutenOperatorMatch[ProjectExecTransformer]) } } @@ -615,8 +607,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr )(checkGlutenOperatorMatch[ProjectExecTransformer]) runQueryAndCompare( "select array_min(null), array_min(array(null)), array_min(array(1, 2, 3, null)), " + - "array_min(array(1.0, 2.0, 3.0, null)), array_min(array('z', 't', 'abc'))", - noFallBack = false + "array_min(array(1.0, 2.0, 3.0, null)), array_min(array('z', 't', 'abc'))" )(checkGlutenOperatorMatch[ProjectExecTransformer]) } } @@ -664,8 +655,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr runQueryAndCompare( "select array_distinct(array(1,2,1,2,3)), array_distinct(array(null,1,null,1,2,null,3)), " + - "array_distinct(array(array(1,null,2), array(1,null,2))), array_distinct(null), array_distinct(array(null))", - noFallBack = false + "array_distinct(array(array(1,null,2), array(1,null,2))), array_distinct(null), array_distinct(array(null))" )(checkGlutenOperatorMatch[ProjectExecTransformer]) } } @@ -682,8 +672,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr "array_union(array(null,1,null,1,2,null,3), array(1,null,2,null,3,null,4)), " + "array_union(array(array(1,null,2), array(2,null,3)), array(array(2,null,3), array(1,null,2))), " + "array_union(array(null), array(null)), " + - "array_union(cast(null as array<int>), cast(null as array<int>))", - noFallBack = false + "array_union(cast(null as array<int>), cast(null as array<int>))" )(checkGlutenOperatorMatch[ProjectExecTransformer]) } } @@ -698,8 +687,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr runQueryAndCompare( "select shuffle(array(1,2,3,4,5)), shuffle(array(1,3,null,3,4)), shuffle(null)", - compareResult = false, - noFallBack = false + compareResult = false )(checkGlutenOperatorMatch[ProjectExecTransformer]) } } @@ -1235,8 +1223,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> (ConstantFolding.ruleName + "," + NullPropagation.ruleName)) { runQueryAndCompare( "select sequence(null, 1), sequence(1, null), sequence(1, 3, null), sequence(1, 5)," + - "sequence(5, 1), sequence(1, 5, 2), sequence(5, 1, -2)", - noFallBack = false + "sequence(5, 1), sequence(1, 5, 2), sequence(5, 1, -2)" )(checkGlutenOperatorMatch[ProjectExecTransformer]) runQueryAndCompare( @@ -1935,8 +1922,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr runQueryAndCompare( "select concat_ws(null), concat_ws('-'), concat_ws('-', null), concat_ws('-', null, null), " + "concat_ws(null, 'a'), concat_ws('-', 'a'), concat_ws('-', 'a', null), " + - "concat_ws('-', 'a', null, 'b', 'c', null, array(null), array('d', null), array('f', 'g'))", - noFallBack = false + "concat_ws('-', 'a', null, 'b', 'c', null, array(null), array('d', null), array('f', 'g'))" )(checkGlutenOperatorMatch[ProjectExecTransformer]) runQueryAndCompare( @@ -2195,8 +2181,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr withSQLConf( SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> (ConstantFolding.ruleName + "," + NullPropagation.ruleName)) { runQueryAndCompare( - "select 1/0f, 1/0.0d", - noFallBack = false + "select 1/0f, 1/0.0d" )(checkGlutenOperatorMatch[ProjectExecTransformer]) runQueryAndCompare( @@ -2439,8 +2424,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr withSQLConf( SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> (ConstantFolding.ruleName + "," + NullPropagation.ruleName)) { runQueryAndCompare( - "select trunc('2023-12-06', 'MM'), trunc('2023-12-06', 'YEAR'), trunc('2023-12-06', 'WEEK'), trunc('2023-12-06', 'QUARTER')", - noFallBack = false + "select trunc('2023-12-06', 'MM'), trunc('2023-12-06', 'YEAR'), trunc('2023-12-06', 'WEEK'), trunc('2023-12-06', 'QUARTER')" )(checkGlutenOperatorMatch[ProjectExecTransformer]) runQueryAndCompare( @@ -2465,8 +2449,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr withSQLConf( SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> (ConstantFolding.ruleName + "," + NullPropagation.ruleName)) { runQueryAndCompare( - "select cast('1' as boolean), cast('t' as boolean), cast('all' as boolean), cast('f' as boolean)", - noFallBack = false + "select cast('1' as boolean), cast('t' as boolean), cast('all' as boolean), cast('f' as boolean)" )(checkGlutenOperatorMatch[ProjectExecTransformer]) } } diff --git a/cpp-ch/local-engine/AggregateFunctions/AggregateFunctionDVRoaringBitmap.h b/cpp-ch/local-engine/AggregateFunctions/AggregateFunctionDVRoaringBitmap.h index de338ce6f0..d3cf296e56 100644 --- a/cpp-ch/local-engine/AggregateFunctions/AggregateFunctionDVRoaringBitmap.h +++ b/cpp-ch/local-engine/AggregateFunctions/AggregateFunctionDVRoaringBitmap.h @@ -99,7 +99,6 @@ public: auto & to_tuple = assert_cast<DB::ColumnTuple &>(to); auto & cardinality = assert_cast<DB::ColumnInt64 &>(to_tuple.getColumn(0)); auto & last = assert_cast<DB::ColumnInt64 &>(to_tuple.getColumn(1)); - auto a = to_tuple.getColumn(2).getDataType(); auto & bitmap = assert_cast<DB::ColumnString &>(to_tuple.getColumn(2)); this->data(place).insertResultInto(cardinality, last, bitmap); } diff --git a/cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp b/cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp index a6ce6fd408..18417fb3b7 100644 --- a/cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp +++ b/cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp @@ -57,7 +57,7 @@ ALWAYS_INLINE static void writeRowToColumns(const std::vector<MutableColumnPtr> { const StringRef str_ref{spark_row_reader.getStringRef(i)}; if (str_ref.data == nullptr) - columns[i]->insertData(nullptr, str_ref.size); + columns[i]->insertDefault(); else if (!spark_row_reader.isBigEndianInSparkRow(i)) columns[i]->insertData(str_ref.data, str_ref.size); else diff --git a/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp b/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp index 143fc19ae8..dcead78ce0 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp +++ b/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp @@ -62,10 +62,10 @@ ShuffleReader::~ShuffleReader() input_stream.reset(); } -jclass ShuffleReader::input_stream_class = nullptr; -jmethodID ShuffleReader::input_stream_read = nullptr; +jclass ShuffleReader::shuffle_input_stream_class = nullptr; +jmethodID ShuffleReader::shuffle_input_stream_read = nullptr; -bool ReadBufferFromJavaInputStream::nextImpl() +bool ReadBufferFromJavaShuffleInputStream::nextImpl() { int count = readFromJava(); if (count > 0) @@ -73,20 +73,20 @@ bool ReadBufferFromJavaInputStream::nextImpl() return count > 0; } -int ReadBufferFromJavaInputStream::readFromJava() const +int ReadBufferFromJavaShuffleInputStream::readFromJava() const { GET_JNIENV(env) jint count = safeCallIntMethod( - env, java_in, ShuffleReader::input_stream_read, reinterpret_cast<jlong>(internal_buffer.begin()), internal_buffer.size()); + env, java_in, ShuffleReader::shuffle_input_stream_read, reinterpret_cast<jlong>(internal_buffer.begin()), internal_buffer.size()); CLEAN_JNIENV return count; } -ReadBufferFromJavaInputStream::ReadBufferFromJavaInputStream(jobject input_stream) : java_in(input_stream) +ReadBufferFromJavaShuffleInputStream::ReadBufferFromJavaShuffleInputStream(jobject input_stream) : java_in(input_stream) { } -ReadBufferFromJavaInputStream::~ReadBufferFromJavaInputStream() +ReadBufferFromJavaShuffleInputStream::~ReadBufferFromJavaShuffleInputStream() { GET_JNIENV(env) env->DeleteGlobalRef(java_in); diff --git a/cpp-ch/local-engine/Shuffle/ShuffleReader.h b/cpp-ch/local-engine/Shuffle/ShuffleReader.h index 721a157849..d1d36e176b 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleReader.h +++ b/cpp-ch/local-engine/Shuffle/ShuffleReader.h @@ -31,7 +31,7 @@ class NativeReader; namespace local_engine { void configureCompressedReadBuffer(DB::CompressedReadBuffer & compressedReadBuffer); -class ReadBufferFromJavaInputStream; +class ReadBufferFromJavaShuffleInputStream; class ShuffleReader : BlockIterator { public: @@ -39,8 +39,8 @@ public: std::unique_ptr<DB::ReadBuffer> in_, bool compressed, Int64 max_shuffle_read_rows_, Int64 max_shuffle_read_bytes_); DB::Block * read(); ~ShuffleReader(); - static jclass input_stream_class; - static jmethodID input_stream_read; + static jclass shuffle_input_stream_class; + static jmethodID shuffle_input_stream_read; private: std::unique_ptr<DB::ReadBuffer> in; @@ -52,11 +52,11 @@ private: }; -class ReadBufferFromJavaInputStream final : public DB::BufferWithOwnMemory<DB::ReadBuffer> +class ReadBufferFromJavaShuffleInputStream final : public DB::BufferWithOwnMemory<DB::ReadBuffer> { public: - explicit ReadBufferFromJavaInputStream(jobject input_stream); - ~ReadBufferFromJavaInputStream() override; + explicit ReadBufferFromJavaShuffleInputStream(jobject input_stream); + ~ReadBufferFromJavaShuffleInputStream() override; private: jobject java_in; diff --git a/cpp-ch/local-engine/Storages/IO/NativeReader.cpp b/cpp-ch/local-engine/Storages/IO/NativeReader.cpp index 7b016eb886..e2f94e7cb7 100644 --- a/cpp-ch/local-engine/Storages/IO/NativeReader.cpp +++ b/cpp-ch/local-engine/Storages/IO/NativeReader.cpp @@ -25,17 +25,19 @@ #include <IO/VarInt.h> #include <Storages/IO/AggregateSerializationUtils.h> #include <Storages/IO/NativeWriter.h> +#include <jni/jni_common.h> #include <Common/Arena.h> +#include <Common/JNIUtils.h> namespace DB { namespace ErrorCodes { - extern const int INCORRECT_INDEX; - extern const int LOGICAL_ERROR; - extern const int CANNOT_READ_ALL_DATA; - extern const int INCORRECT_DATA; - extern const int TOO_LARGE_ARRAY_SIZE; +extern const int INCORRECT_INDEX; +extern const int LOGICAL_ERROR; +extern const int CANNOT_READ_ALL_DATA; +extern const int INCORRECT_DATA; +extern const int TOO_LARGE_ARRAY_SIZE; } } @@ -68,10 +70,8 @@ DB::Block NativeReader::read() /// Append small blocks into a large one, could reduce memory allocations overhead. while (result_block.rows() < max_block_size && result_block.bytes() < max_block_bytes) - { if (!appendNextBlock(result_block)) break; - } if (result_block.rows()) { @@ -85,7 +85,8 @@ DB::Block NativeReader::read() return result_block; } -static void readFixedSizeAggregateData(DB::ReadBuffer &in, DB::ColumnPtr & column, size_t rows, NativeReader::ColumnParseUtil & column_parse_util) +static void +readFixedSizeAggregateData(DB::ReadBuffer & in, DB::ColumnPtr & column, size_t rows, NativeReader::ColumnParseUtil & column_parse_util) { ColumnAggregateFunction & real_column = typeid_cast<ColumnAggregateFunction &>(*column->assumeMutable()); auto & arena = real_column.createOrGetArena(); @@ -102,7 +103,8 @@ static void readFixedSizeAggregateData(DB::ReadBuffer &in, DB::ColumnPtr & colum } } -static void readVarSizeAggregateData(DB::ReadBuffer &in, DB::ColumnPtr & column, size_t rows, NativeReader::ColumnParseUtil & column_parse_util) +static void +readVarSizeAggregateData(DB::ReadBuffer & in, DB::ColumnPtr & column, size_t rows, NativeReader::ColumnParseUtil & column_parse_util) { ColumnAggregateFunction & real_column = typeid_cast<ColumnAggregateFunction &>(*column->assumeMutable()); auto & arena = real_column.createOrGetArena(); @@ -119,7 +121,8 @@ static void readVarSizeAggregateData(DB::ReadBuffer &in, DB::ColumnPtr & column, } } -static void readNormalSimpleData(DB::ReadBuffer &in, DB::ColumnPtr & column, size_t rows, NativeReader::ColumnParseUtil & column_parse_util) +static void +readNormalSimpleData(DB::ReadBuffer & in, DB::ColumnPtr & column, size_t rows, NativeReader::ColumnParseUtil & column_parse_util) { ISerialization::DeserializeBinaryBulkSettings settings; settings.getter = [&](ISerialization::SubstreamPath) -> ReadBuffer * { return ∈ }; @@ -146,7 +149,7 @@ readNormalComplexData(DB::ReadBuffer & in, DB::ColumnPtr & column, size_t rows, ISerialization::DeserializeBinaryBulkStatePtr state; DB::ColumnPtr new_col = column->cloneResized(0); - column_parse_util.serializer->deserializeBinaryBulkStatePrefix(settings, state ,nullptr); + column_parse_util.serializer->deserializeBinaryBulkStatePrefix(settings, state, nullptr); column_parse_util.serializer->deserializeBinaryBulkWithMultipleStreams(new_col, 0, rows, settings, state, nullptr); column->assumeMutable()->insertRangeFrom(*new_col, 0, new_col->size()); } @@ -201,7 +204,7 @@ DB::Block NativeReader::prepareByFirstBlock() /// Data ColumnPtr read_column = column.type->createColumn(*serialization); - if (rows) /// If no rows, nothing to read. + if (rows) /// If no rows, nothing to read. { if (is_agg_state_type && agg_opt_column) { @@ -272,4 +275,36 @@ bool NativeReader::appendNextBlock(DB::Block & result_block) return true; } +jclass ReadBufferFromJavaInputStream::input_stream_class = nullptr; +jmethodID ReadBufferFromJavaInputStream::input_stream_read = nullptr; + +bool ReadBufferFromJavaInputStream::nextImpl() +{ + int count = readFromJava(); + if (count > 0) + working_buffer.resize(count); + return count > 0; +} + +int ReadBufferFromJavaInputStream::readFromJava() const +{ + GET_JNIENV(env) + jint count = safeCallIntMethod(env, input_stream, input_stream_read, buffer); + + if (count > 0) + env->GetByteArrayRegion(buffer, 0, count, reinterpret_cast<jbyte *>(internal_buffer.begin())); + + CLEAN_JNIENV + return count; +} + +ReadBufferFromJavaInputStream::ReadBufferFromJavaInputStream(jobject input_stream_, jbyteArray buffer_, const size_t buffer_size_) + : input_stream(input_stream_), buffer(buffer_), buffer_size(buffer_size_) +{ +} + +ReadBufferFromJavaInputStream::~ReadBufferFromJavaInputStream() +{ +} + } diff --git a/cpp-ch/local-engine/Storages/IO/NativeReader.h b/cpp-ch/local-engine/Storages/IO/NativeReader.h index 4e85526e7d..bf8246a623 100644 --- a/cpp-ch/local-engine/Storages/IO/NativeReader.h +++ b/cpp-ch/local-engine/Storages/IO/NativeReader.h @@ -16,10 +16,12 @@ */ #pragma once -#include <Common/PODArray.h> +#include <jni.h> #include <Core/Block.h> #include <Core/Defines.h> #include <DataTypes/DataTypeAggregateFunction.h> +#include <IO/BufferWithOwnMemory.h> +#include <IO/ReadBuffer.h> namespace local_engine { @@ -34,14 +36,13 @@ public: std::string name; DB::SerializationPtr serializer = nullptr; size_t avg_value_size_hint = 0; - + // for aggregate data size_t aggregate_state_size = 0; size_t aggregate_state_align = 0; DB::AggregateFunctionPtr aggregate_function = nullptr; std::function<void(DB::ReadBuffer &, DB::ColumnPtr &, size_t, ColumnParseUtil &)> parse; - }; NativeReader( @@ -72,4 +73,21 @@ private: bool appendNextBlock(DB::Block & result_block); }; +class ReadBufferFromJavaInputStream final : public DB::BufferWithOwnMemory<DB::ReadBuffer> +{ +public: + static jclass input_stream_class; + static jmethodID input_stream_read; + + explicit ReadBufferFromJavaInputStream(jobject input_stream_, jbyteArray buffer_, size_t buffer_size_); + ~ReadBufferFromJavaInputStream() override; + +private: + jobject input_stream; + size_t buffer_size; + jbyteArray buffer; + int readFromJava() const; + bool nextImpl() override; +}; + } diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp index 926c3ab215..2997e990b5 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp @@ -91,10 +91,10 @@ void DeltaWriter::writeDeletionVector(const DB::Block & block) for (size_t row_idx = 0; row_idx < block.rows(); row_idx++) { const auto file_path = file_path_columns.column->getDataAt(row_idx); - auto bitmap = bitmap_columns.column->getDataAt(row_idx); + auto bitmap = bitmap_columns.column->getDataAt(row_idx).toString(); auto cardinality = cardinality_src_columns.column->get64(row_idx); // alisa deletedRowIndexCount - if (size_of_current_bin > 0 && bitmap.size + size_of_current_bin > packing_target_size) + if (size_of_current_bin > 0 && bitmap.length() + size_of_current_bin > packing_target_size) { write_buffer->finalize(); write_buffer = nullptr; @@ -120,7 +120,7 @@ void DeltaWriter::writeDeletionVector(const DB::Block & block) { DeltaDVRoaringBitmapArray existing_bitmap = deserializeExistingBitmap(existing_path_or_inline_dv, existing_offset, existing_size_in_bytes, table_path); - existing_bitmap.merge(bitmap.toString()); + existing_bitmap.merge(bitmap); bitmap = existing_bitmap.serialize(); cardinality = existing_bitmap.cardinality(); } @@ -140,12 +140,13 @@ void DeltaWriter::writeDeletionVector(const DB::Block & block) if (!write_buffer) initBinPackage(); - size_of_current_bin = size_of_current_bin + bitmap.size; - Int32 bitmap_size = static_cast<Int32>(bitmap.size); + Int32 bitmap_size = static_cast<Int32>(bitmap.length()); + size_of_current_bin = size_of_current_bin + bitmap.length(); + DB::writeBinaryBigEndian(bitmap_size, *write_buffer); - write_buffer->write(bitmap.data, bitmap.size); - Int32 checksum_value = static_cast<Int32>(crc32_z(0L, reinterpret_cast<const unsigned char *>(bitmap.data), bitmap_size)); + write_buffer->write(bitmap.c_str(), bitmap_size); + Int32 checksum_value = static_cast<Int32>(crc32_z(0L, reinterpret_cast<const unsigned char *>(bitmap.c_str()), bitmap_size)); DB::writeBinaryBigEndian(checksum_value, *write_buffer); auto dv_descriptor_field diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h index f6d7372ff4..5a08a33169 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h +++ b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h @@ -64,7 +64,7 @@ private: void initBinPackage(); - const DB::ContextPtr & context; + DB::ContextPtr context; const String table_path; const size_t prefix_length; const size_t packing_target_size; diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index e3ac674360..13b7cf2b54 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -121,7 +121,7 @@ JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/) block_stats_class = local_engine::CreateGlobalClassReference(env, "Lorg/apache/gluten/vectorized/BlockStats;"); block_stats_constructor = local_engine::GetMethodID(env, block_stats_class, "<init>", "(JZ)V"); - local_engine::ShuffleReader::input_stream_class + local_engine::ShuffleReader::shuffle_input_stream_class = local_engine::CreateGlobalClassReference(env, "Lorg/apache/gluten/vectorized/ShuffleInputStream;"); local_engine::NativeSplitter::iterator_class = local_engine::CreateGlobalClassReference(env, "Lorg/apache/gluten/vectorized/IteratorWrapper;"); @@ -134,8 +134,13 @@ JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/) local_engine::SourceFromJavaIter::serialized_record_batch_iterator_next = local_engine::GetMethodID(env, local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, "next", "()[B"); - local_engine::ShuffleReader::input_stream_read - = local_engine::GetMethodID(env, local_engine::ShuffleReader::input_stream_class, "read", "(JJ)J"); + local_engine::ReadBufferFromJavaInputStream::input_stream_class + = local_engine::CreateGlobalClassReference(env, "Ljava/io/InputStream;"); + local_engine::ReadBufferFromJavaInputStream::input_stream_read + = local_engine::GetMethodID(env, local_engine::ReadBufferFromJavaInputStream::input_stream_class, "read", "([B)I"); + + local_engine::ShuffleReader::shuffle_input_stream_read + = local_engine::GetMethodID(env, local_engine::ShuffleReader::shuffle_input_stream_class, "read", "(JJ)J"); local_engine::NativeSplitter::iterator_has_next = local_engine::GetMethodID(env, local_engine::NativeSplitter::iterator_class, "hasNext", "()Z"); @@ -205,7 +210,7 @@ JNIEXPORT void Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_n env->DeleteGlobalRef(block_stripes_class); env->DeleteGlobalRef(split_result_class); env->DeleteGlobalRef(block_stats_class); - env->DeleteGlobalRef(local_engine::ShuffleReader::input_stream_class); + env->DeleteGlobalRef(local_engine::ShuffleReader::shuffle_input_stream_class); env->DeleteGlobalRef(local_engine::NativeSplitter::iterator_class); env->DeleteGlobalRef(local_engine::WriteBufferFromJavaOutputStream::output_stream_class); env->DeleteGlobalRef(local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class); @@ -536,12 +541,24 @@ Java_org_apache_gluten_vectorized_CHNativeBlock_nativeBlockStats(JNIEnv * env, j LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) } +JNIEXPORT jlong +Java_org_apache_gluten_vectorized_CHNativeBlock_copyBlock(JNIEnv * env, jobject obj, jlong block_address) +{ + LOCAL_ENGINE_JNI_METHOD_START + DB::Block * block = reinterpret_cast<DB::Block *>(block_address); + + auto copied_block = block->cloneWithColumns(block->getColumns()); + auto a = new DB::Block(copied_block); + return reinterpret_cast<jlong>(a); + LOCAL_ENGINE_JNI_METHOD_END(env, -1) +} + JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHStreamReader_createNativeShuffleReader( JNIEnv * env, jclass /*clazz*/, jobject input_stream, jboolean compressed, jlong max_shuffle_read_rows, jlong max_shuffle_read_bytes) { LOCAL_ENGINE_JNI_METHOD_START auto * input = env->NewGlobalRef(input_stream); - auto read_buffer = std::make_unique<local_engine::ReadBufferFromJavaInputStream>(input); + auto read_buffer = std::make_unique<local_engine::ReadBufferFromJavaShuffleInputStream>(input); auto * shuffle_reader = new local_engine::ShuffleReader(std::move(read_buffer), compressed, max_shuffle_read_rows, max_shuffle_read_bytes); return reinterpret_cast<jlong>(shuffle_reader); @@ -557,6 +574,19 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHStreamReader_nativeNext(JNIE LOCAL_ENGINE_JNI_METHOD_END(env, -1) } +JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHStreamReader_directRead( + JNIEnv * env, jclass /*clazz*/, jobject input_stream, jbyteArray buffer, jint buffer_size) +{ + LOCAL_ENGINE_JNI_METHOD_START + // auto * input = env->NewGlobalRef(input_stream); + auto rb = std::make_unique<local_engine::ReadBufferFromJavaInputStream>(input_stream, buffer, buffer_size); + auto reader = std::make_unique<local_engine::NativeReader>(*rb); + DB::Block block = reader->read(); + DB::Block * res = new DB::Block(block); + return reinterpret_cast<jlong>(res); + LOCAL_ENGINE_JNI_METHOD_END(env, -1) +} + JNIEXPORT void Java_org_apache_gluten_vectorized_CHStreamReader_nativeClose(JNIEnv * env, jobject /*obj*/, jlong shuffle_reader) { LOCAL_ENGINE_JNI_METHOD_START @@ -1192,6 +1222,25 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_BlockOutputStream_nativeCreate LOCAL_ENGINE_JNI_METHOD_END(env, -1) } +JNIEXPORT long Java_org_apache_gluten_vectorized_BlockOutputStream_directWrite( + JNIEnv * env, + jclass, + jobject output_stream, + jbyteArray buffer, + jint customize_buffer_size, + jlong block_address) +{ + LOCAL_ENGINE_JNI_METHOD_START + DB::Block * block = reinterpret_cast<DB::Block *>(block_address); + auto wb = std::make_shared<local_engine::WriteBufferFromJavaOutputStream>(output_stream, buffer, customize_buffer_size); + auto native_writer = std::make_unique<local_engine::NativeWriter>(*wb, block->cloneEmpty()); + auto write_size = native_writer->write(*block); + native_writer->flush(); + wb->finalize(); + return write_size; + LOCAL_ENGINE_JNI_METHOD_END(env, -1) +} + JNIEXPORT void Java_org_apache_gluten_vectorized_BlockOutputStream_nativeClose(JNIEnv * env, jobject, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala index d23d172e46..1307351a43 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala @@ -18,6 +18,7 @@ package org.apache.gluten.extension.caller import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.columnar.InMemoryRelation +import org.apache.spark.sql.execution.streaming.StreamExecution /** * Helper API that stores information about the call site of the columnar rule. Specific columnar @@ -28,6 +29,7 @@ import org.apache.spark.sql.execution.columnar.InMemoryRelation trait CallerInfo { def isAqe(): Boolean def isCache(): Boolean + def isStreaming(): Boolean } object CallerInfo { @@ -36,7 +38,11 @@ object CallerInfo { override def initialValue(): Option[CallerInfo] = None } - private class Impl(override val isAqe: Boolean, override val isCache: Boolean) extends CallerInfo + private class Impl( + override val isAqe: Boolean, + override val isCache: Boolean, + override val isStreaming: Boolean + ) extends CallerInfo /* * Find the information about the caller that initiated the rule call. @@ -46,7 +52,10 @@ object CallerInfo { return localStorage.get().get } val stack = Thread.currentThread.getStackTrace - new Impl(isAqe = inAqeCall(stack), isCache = inCacheCall(stack)) + new Impl( + isAqe = inAqeCall(stack), + isCache = inCacheCall(stack), + isStreaming = inStreamingCall(stack)) } private def inAqeCall(stack: Seq[StackTraceElement]): Boolean = { @@ -57,10 +66,15 @@ object CallerInfo { stack.exists(_.getClassName.equals(InMemoryRelation.getClass.getName)) } + private def inStreamingCall(stack: Seq[StackTraceElement]): Boolean = { + stack.exists(_.getClassName.equals(StreamExecution.getClass.getName.split('$').head)) + } + /** For testing only. */ - def withLocalValue[T](isAqe: Boolean, isCache: Boolean)(body: => T): T = { + def withLocalValue[T](isAqe: Boolean, isCache: Boolean, isStreaming: Boolean = false)( + body: => T): T = { val prevValue = localStorage.get() - val newValue = new Impl(isAqe, isCache) + val newValue = new Impl(isAqe, isCache, isStreaming) localStorage.set(Some(newValue)) try { body diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala index c6672cdbb2..1bb5a255f5 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala @@ -45,6 +45,7 @@ import org.apache.spark.sql.hive.HiveUDFTransformer import org.apache.spark.sql.types.{DecimalType, LongType, NullType, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch +import java.io.{ObjectInputStream, ObjectOutputStream} import java.lang.{Long => JLong} import java.util.{ArrayList => JArrayList, List => JList, Map => JMap} @@ -727,4 +728,18 @@ trait SparkPlanExecApi { children: Seq[ExpressionTransformer], expr: Expression): ExpressionTransformer = GenericExpressionTransformer(substraitName, children, expr) + + def isSupportRDDScanExec(plan: RDDScanExec): Boolean = false + + def getRDDScanTransform(plan: RDDScanExec): RDDScanTransformer = + throw new GlutenNotSupportException("RDDScanExec is not supported") + + def copyColumnarBatch(batch: ColumnarBatch): ColumnarBatch = + throw new GlutenNotSupportException("Copying ColumnarBatch is not supported") + + def serializeColumnarBatch(output: ObjectOutputStream, batch: ColumnarBatch): Unit = + throw new GlutenNotSupportException("Serialize ColumnarBatch is not supported") + + def deserializeColumnarBatch(input: ObjectInputStream): ColumnarBatch = + throw new GlutenNotSupportException("Deserialize ColumnarBatch is not supported") } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala index 611b2a8ff7..750fc060cd 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide import org.apache.spark.sql.catalyst.plans.logical.Join import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.CollectLimitExec +import org.apache.spark.sql.execution.RDDScanTransformer import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.datasources.WriteFilesExec import org.apache.spark.sql.execution.datasources.v2.BatchScanExec @@ -348,6 +349,9 @@ object OffloadOthers { plan.limit, plan.child ) + case plan: RDDScanExec if RDDScanTransformer.isSupportRDDScanExec(plan) => + logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") + RDDScanTransformer.getRDDScanTransform(plan) case p if !p.isInstanceOf[GlutenPlan] => logDebug(s"Transformation for ${p.getClass} is currently not supported.") p diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/RDDScanTransformer.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/RDDScanTransformer.scala new file mode 100644 index 0000000000..e3fc728477 --- /dev/null +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/RDDScanTransformer.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.execution.ValidatablePlan +import org.apache.gluten.extension.columnar.transition.Convention + +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} + +abstract class RDDScanTransformer( + outputAttributes: Seq[Attribute], + override val outputPartitioning: Partitioning = UnknownPartitioning(0), + override val outputOrdering: Seq[SortOrder] +) extends ValidatablePlan { + + override def rowType0(): Convention.RowType = Convention.RowType.None + override def batchType(): Convention.BatchType = BackendsApiManager.getSettings.primaryBatchType + override def output: Seq[Attribute] = { + outputAttributes + } + + override protected def doExecute() + : org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = { + throw new UnsupportedOperationException(s"This operator doesn't support doExecute().") + } + + override def children: Seq[SparkPlan] = Seq.empty +} + +object RDDScanTransformer { + def isSupportRDDScanExec(plan: RDDScanExec): Boolean = + BackendsApiManager.getSparkPlanExecApiInstance.isSupportRDDScanExec(plan) + + def getRDDScanTransform(plan: RDDScanExec): RDDScanTransformer = + BackendsApiManager.getSparkPlanExecApiInstance.getRDDScanTransform(plan) +} diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRowEnhancement.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRowEnhancement.scala new file mode 100644 index 0000000000..019100234d --- /dev/null +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRowEnhancement.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources + +import org.apache.gluten.backendsapi.BackendsApiManager + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.vectorized.ColumnarBatch + +import java.io.{IOException, ObjectInputStream, ObjectOutputStream} + +class FakeRowEnhancement(@transient private val _batch: ColumnarBatch) + extends FakeRow(_batch) + with Serializable { + + override def copy(): InternalRow = { + val copied = BackendsApiManager.getSparkPlanExecApiInstance.copyColumnarBatch(batch) + new FakeRowEnhancement(copied) + } + + @throws(classOf[IOException]) + private def writeObject(output: ObjectOutputStream): Unit = { + BackendsApiManager.getSparkPlanExecApiInstance.serializeColumnarBatch(output, batch) + } + + @throws(classOf[IOException]) + private def readObject(input: ObjectInputStream): Unit = { + batch = BackendsApiManager.getSparkPlanExecApiInstance.deserializeColumnarBatch(input) + } +} diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala index 54b5a34639..d58026b123 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala @@ -64,7 +64,7 @@ case class FakeRowAdaptor(child: SparkPlan) override def rowType0(): Convention.RowType = Convention.RowType.VanillaRow override protected def doExecute(): RDD[InternalRow] = { - doExecuteColumnar().map(cb => new FakeRow(cb)) + doExecuteColumnar().map(cb => new FakeRowEnhancement(cb)) } override def outputOrdering: Seq[SortOrder] = child match { diff --git a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 5601c2d321..0e4666c604 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -483,6 +483,7 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("column stats collection for null columns") .exclude("store and retrieve column stats in different time zones") .excludeGlutenTest("store and retrieve column stats in different time zones") + .excludeCH("statistics collection of a table with zero column") enableSuite[GlutenStringFunctionsSuite] .exclude("string regex_replace / regex_extract") .exclude("string overlay function") diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 3436be6f85..af8c0ac201 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -1788,6 +1788,7 @@ class ClickHouseTestSettings extends BackendTestSettings { .includeCH("SPARK-16371 Do not push down filters when inner name and outer name are the same") .exclude("filter pushdown - StringPredicate") .excludeCH("filter pushdown - StringContains") + .excludeCH("SPARK-36866: filter pushdown - year-month interval") // avoid Velox compile error enableSuite( "org.apache.gluten.execution.parquet.GlutenParquetV1FilterSuite2" @@ -1809,6 +1810,7 @@ class ClickHouseTestSettings extends BackendTestSettings { .includeCH("SPARK-16371 Do not push down filters when inner name and outer name are the same") .exclude("filter pushdown - StringPredicate") .excludeCH("filter pushdown - StringContains") + .excludeCH("SPARK-36866: filter pushdown - year-month interval") enableSuite[GlutenParquetV1PartitionDiscoverySuite] .excludeCH("Various partition value types") .excludeCH("Various inferred partition value types") @@ -2049,6 +2051,7 @@ class ClickHouseTestSettings extends BackendTestSettings { .excludeCH("store and retrieve column stats in different time zones") .excludeCH("SPARK-42777: describe column stats (min, max) for timestamp_ntz column") .excludeCH("Gluten - store and retrieve column stats in different time zones") + .excludeCH("statistics collection of a table with zero column") enableSuite[GlutenStringExpressionsSuite] .excludeCH("StringComparison") .excludeCH("Substring") diff --git a/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRow.scala b/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRow.scala index 0ffd832f3b..679a69334a 100644 --- a/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRow.scala +++ b/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRow.scala @@ -24,7 +24,7 @@ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} trait IFakeRowAdaptor -class FakeRow(val batch: ColumnarBatch) extends InternalRow { +class FakeRow(@transient var batch: ColumnarBatch) extends InternalRow { override def numFields: Int = throw new UnsupportedOperationException() override def setNullAt(i: Int): Unit = throw new UnsupportedOperationException() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org For additional commands, e-mail: commits-h...@gluten.apache.org