This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 68d75269d [Gluten-5256][CH]optimizing table after spark restart bug
(#5258)
68d75269d is described below
commit 68d75269d8af6819a1f48da9939ef36b44ffb141
Author: Hongbin Ma <[email protected]>
AuthorDate: Wed Apr 3 13:27:17 2024 +0800
[Gluten-5256][CH]optimizing table after spark restart bug (#5258)
[Gluten-5256][CH]optimizing table after spark restart bug
---
.../delta/commands/OptimizeTableCommandBase.scala | 1 +
.../sql/delta/commands/OptimizeTableCommand.scala | 2 +
.../commands/OptimizeTableCommandOverwrites.scala | 323 ---------------------
.../spark/sql/delta/commands/CommandUtils.scala | 43 +++
.../commands/OptimizeTableCommandOverwrites.scala | 7 +-
.../GlutenClickHouseTableAfterRestart.scala | 178 ++++++++++--
6 files changed, 210 insertions(+), 344 deletions(-)
diff --git
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandBase.scala
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandBase.scala
index bbfa2ecee..2d6ac48f5 100644
---
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandBase.scala
+++
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandBase.scala
@@ -126,6 +126,7 @@ case class OptimizeTableCommand(
override val otherCopyArgs: Seq[AnyRef] = zOrderBy :: Nil
override def run(sparkSession: SparkSession): Seq[Row] = {
+ CommandUtils.ensureClickHouseTableV2(tableId, sparkSession)
val deltaLog = getDeltaLogClickhouse(sparkSession, path, tableId,
"OPTIMIZE")
val partitionColumns = deltaLog.snapshot.metadata.partitionColumns
diff --git
a/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala
b/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala
index 57427957d..346943671 100644
---
a/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala
+++
b/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala
@@ -126,6 +126,8 @@ case class OptimizeTableCommand(
override val otherCopyArgs: Seq[AnyRef] = zOrderBy :: Nil
override def run(sparkSession: SparkSession): Seq[Row] = {
+ CommandUtils.ensureClickHouseTableV2(tableId, sparkSession)
+
val deltaLog = getDeltaLogClickhouse(sparkSession, path, tableId,
"OPTIMIZE", options)
val txn = deltaLog.startTransaction()
diff --git
a/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
b/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
deleted file mode 100644
index 498b7ff4f..000000000
---
a/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.delta.commands
-
-import org.apache.gluten.expression.ConverterUtils
-
-import org.apache.spark.{TaskContext, TaskOutputFileAlreadyExistException}
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
-import org.apache.spark.internal.io.SparkHadoopWriterUtils
-import org.apache.spark.shuffle.FetchFailedException
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog,
DeltaTableIdentifier, OptimisticTransaction}
-import org.apache.spark.sql.delta.actions.{AddFile, FileAction}
-import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
-import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.execution.datasources.CHDatasourceJniWrapper
-import org.apache.spark.sql.execution.datasources.v1.CHMergeTreeWriterInjects
-import org.apache.spark.sql.execution.datasources.v1.clickhouse._
-import
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.{AddFileTags,
AddMergeTreeParts}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.{SerializableConfiguration, SystemClock, Utils}
-
-import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
-import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID,
TaskType}
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-
-import java.util.{Date, UUID}
-
-import scala.collection.mutable.ArrayBuffer
-
-object OptimizeTableCommandOverwrites extends Logging {
-
- case class TaskDescription(
- path: String,
- database: String,
- tableName: String,
- orderByKeyOption: Option[Seq[String]],
- lowCardKeyOption: Option[Seq[String]],
- minmaxIndexKeyOption: Option[Seq[String]],
- bfIndexKeyOption: Option[Seq[String]],
- setIndexKeyOption: Option[Seq[String]],
- primaryKeyOption: Option[Seq[String]],
- partitionColumns: Seq[String],
- partList: Seq[String],
- tableSchema: StructType,
- clickhouseTableConfigs: Map[String, String],
- serializableHadoopConf: SerializableConfiguration,
- jobIdInstant: Long,
- partitionDir: Option[String],
- bucketDir: Option[String]
- )
-
- private def executeTask(
- description: TaskDescription,
- sparkStageId: Int,
- sparkPartitionId: Int,
- sparkAttemptNumber: Int
- ): MergeTreeWriteTaskResult = {
-
- val jobId = SparkHadoopWriterUtils.createJobID(new
Date(description.jobIdInstant), sparkStageId)
- val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
- val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
-
- // Set up the attempt context required to use in the output committer.
- val taskAttemptContext: TaskAttemptContext = {
- // Set up the configuration object
- val hadoopConf = description.serializableHadoopConf.value
- hadoopConf.set("mapreduce.job.id", jobId.toString)
- hadoopConf.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
- hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString)
- hadoopConf.setBoolean("mapreduce.task.ismap", true)
- hadoopConf.setInt("mapreduce.task.partition", 0)
-
- new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
- }
-
- try {
- Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
-
- val uuid = UUID.randomUUID.toString
-
- val planWithSplitInfo = CHMergeTreeWriterInjects.genMergeTreeWriteRel(
- description.path,
- description.database,
- description.tableName,
- description.orderByKeyOption,
- description.lowCardKeyOption,
- description.minmaxIndexKeyOption,
- description.bfIndexKeyOption,
- description.setIndexKeyOption,
- description.primaryKeyOption,
- description.partitionColumns,
- description.partList,
- ConverterUtils.convertNamedStructJson(description.tableSchema),
- description.clickhouseTableConfigs,
- description.tableSchema.toAttributes
- )
-
- val datasourceJniWrapper = new CHDatasourceJniWrapper()
- val returnedMetrics =
- datasourceJniWrapper.nativeMergeMTParts(
- planWithSplitInfo.plan,
- planWithSplitInfo.splitInfo,
- uuid,
- taskId.getId.toString,
- description.partitionDir.getOrElse(""),
- description.bucketDir.getOrElse("")
- )
- if (returnedMetrics != null && returnedMetrics.nonEmpty) {
- val addFiles = AddFileTags.partsMetricsToAddFile(
- description.database,
- description.tableName,
- description.path,
- returnedMetrics,
- Seq(Utils.localHostName()))
-
- val (taskCommitMessage, taskCommitTime) = Utils.timeTakenMs {
- // committer.commitTask(taskAttemptContext)
- new TaskCommitMessage(addFiles.toSeq)
- }
-
-// val summary = MergeTreeExecutedWriteSummary(
-// updatedPartitions = updatedPartitions.toSet,
-// stats = statsTrackers.map(_.getFinalStats(taskCommitTime)))
- MergeTreeWriteTaskResult(taskCommitMessage, null)
- } else {
- throw new IllegalStateException()
- }
- })(
- catchBlock = {
- // If there is an error, abort the task
- logError(s"Job $jobId aborted.")
- },
- finallyBlock = {})
- } catch {
- case e: FetchFailedException =>
- throw e
- case f: FileAlreadyExistsException if
SQLConf.get.fastFailFileFormatOutput =>
- // If any output file to write already exists, it does not make sense
to re-run this task.
- // We throw the exception and let Executor throw ExceptionFailure to
abort the job.
- throw new TaskOutputFileAlreadyExistException(f)
- case t: Throwable =>
- throw QueryExecutionErrors.taskFailedWhileWritingRowsError(t)
- }
-
- }
-
- def runOptimizeBinJobClickhouse(
- txn: OptimisticTransaction,
- partitionValues: Map[String, String],
- bucketNum: String,
- bin: Seq[AddFile],
- maxFileSize: Long): Seq[FileAction] = {
- val tableV2 = ClickHouseTableV2.getTable(txn.deltaLog);
-
- val sparkSession = SparkSession.getActiveSession.get
-
- val rddWithNonEmptyPartitions =
- sparkSession.sparkContext.parallelize(Array.empty[InternalRow], 1)
-
- val jobIdInstant = new Date().getTime
- val ret = new
Array[MergeTreeWriteTaskResult](rddWithNonEmptyPartitions.partitions.length)
-
- val serializableHadoopConf = new SerializableConfiguration(
- sparkSession.sessionState.newHadoopConfWithOptions(
- txn.metadata.configuration ++ txn.deltaLog.options))
-
- val partitionDir = if (tableV2.partitionColumns.isEmpty) {
- None
- } else {
- Some(tableV2.partitionColumns.map(c => c + "=" +
partitionValues(c)).mkString("/"))
- }
-
- val bucketDir = if (tableV2.bucketOption.isEmpty) {
- None
- } else {
- Some(bucketNum)
- }
-
- val description = TaskDescription.apply(
- txn.deltaLog.dataPath.toString,
- tableV2.dataBaseName,
- tableV2.tableName,
- tableV2.orderByKeyOption,
- tableV2.lowCardKeyOption,
- tableV2.minmaxIndexKeyOption,
- tableV2.bfIndexKeyOption,
- tableV2.setIndexKeyOption,
- tableV2.primaryKeyOption,
- tableV2.partitionColumns,
- bin.map(_.asInstanceOf[AddMergeTreeParts].name),
- tableV2.schema(),
- tableV2.clickhouseTableConfigs,
- serializableHadoopConf,
- jobIdInstant,
- partitionDir,
- bucketDir
- )
- sparkSession.sparkContext.runJob(
- rddWithNonEmptyPartitions,
- (taskContext: TaskContext, _: Iterator[InternalRow]) => {
- executeTask(
- description,
- taskContext.stageId(),
- taskContext.partitionId(),
- taskContext.taskAttemptId().toInt & Integer.MAX_VALUE
- )
- },
- rddWithNonEmptyPartitions.partitions.indices,
- (index, res: MergeTreeWriteTaskResult) => {
- ret(index) = res
- }
- )
-
- val addFiles = ret
- .flatMap(_.commitMsg.obj.asInstanceOf[Seq[AddFile]])
- .toSeq
-
- val removeFiles =
- bin.map(f => f.removeWithTimestamp(new SystemClock().getTimeMillis(),
dataChange = false))
- addFiles ++ removeFiles
-
- }
-
- private def isDeltaTable(spark: SparkSession, tableName: TableIdentifier):
Boolean = {
- val catalog = spark.sessionState.catalog
- val tableIsNotTemporaryTable = !catalog.isTempView(tableName)
- val tableExists = {
- (tableName.database.isEmpty ||
catalog.databaseExists(tableName.database.get)) &&
- catalog.tableExists(tableName)
- }
- tableIsNotTemporaryTable && tableExists && catalog
- .getTableMetadata(tableName)
- .provider
- .get
- .toLowerCase()
- .equals("clickhouse")
- }
-
- def getDeltaLogClickhouse(
- spark: SparkSession,
- path: Option[String],
- tableIdentifier: Option[TableIdentifier],
- operationName: String,
- hadoopConf: Map[String, String] = Map.empty): DeltaLog = {
- val tablePath =
- if (tableIdentifier.nonEmpty && isDeltaTable(spark,
tableIdentifier.get)) {
- val sessionCatalog = spark.sessionState.catalog
- lazy val metadata =
sessionCatalog.getTableMetadata(tableIdentifier.get)
- new Path(metadata.location)
- } else {
- throw new UnsupportedOperationException("OPTIMIZE is ony supported for
clickhouse tables")
- }
-
- val startTime = Some(System.currentTimeMillis)
- val deltaLog = DeltaLog.forTable(spark, tablePath, hadoopConf)
- if (deltaLog.update(checkIfUpdatedSinceTs = startTime).version < 0) {
- throw DeltaErrors.notADeltaTableException(
- operationName,
- DeltaTableIdentifier(path, tableIdentifier))
- }
- deltaLog
- }
-
- def groupFilesIntoBinsClickhouse(
- partitionsToCompact: Seq[((String, Map[String, String]), Seq[AddFile])],
- maxTargetFileSize: Long): Seq[((String, Map[String, String]),
Seq[AddFile])] = {
- partitionsToCompact.flatMap {
- case (partition, files) =>
- val bins = new ArrayBuffer[Seq[AddFile]]()
-
- val currentBin = new ArrayBuffer[AddFile]()
- var currentBinSize = 0L
-
- files.sortBy(_.size).foreach {
- file =>
- // Generally, a bin is a group of existing files, whose total size
does not exceed the
- // desired maxFileSize. They will be coalesced into a single
output file.
- // However, if isMultiDimClustering = true, all files in a
partition will be read by the
- // same job, the data will be range-partitioned and numFiles =
totalFileSize / maxFileSize
- // will be produced. See below.
-
- // isMultiDimClustering is always false for Gluten Clickhouse for
now
- if (file.size + currentBinSize > maxTargetFileSize /*&&
!isMultiDimClustering */ ) {
- bins += currentBin.toVector
- currentBin.clear()
- currentBin += file
- currentBinSize = file.size
- } else {
- currentBin += file
- currentBinSize += file.size
- }
- }
-
- if (currentBin.nonEmpty) {
- bins += currentBin.toVector
- }
-
- bins
- .map(b => (partition, b))
- // select bins that have at least two files or in case of multi-dim
clustering
- // select all bins
- .filter(_._2.size > 1 /*|| isMultiDimClustering*/ )
- }
- }
-}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/commands/CommandUtils.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/commands/CommandUtils.scala
new file mode 100644
index 000000000..262c37eff
--- /dev/null
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/commands/CommandUtils.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.commands
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.connector.catalog.Identifier
+
+object CommandUtils {
+ // Ensure ClickHouseTableV2 table exists
+ def ensureClickHouseTableV2(
+ tableId: Option[TableIdentifier],
+ sparkSession: SparkSession): Unit = {
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+ if (tableId.isEmpty) {
+ throw new UnsupportedOperationException("Current command requires table
identifier.")
+ }
+ // If user comes into this function without previously triggering loadTable
+ // (which creates ClickhouseTableV2), we have to load the table manually
+ // Notice: Multi-catalog case is not well considered!
+
sparkSession.sessionState.catalogManager.currentCatalog.asTableCatalog.loadTable(
+ Identifier.of(
+ Array(
+ tableId.get.database.getOrElse(
+ sparkSession.sessionState.catalogManager.currentNamespace.head)),
+ tableId.get.table)
+ )
+ }
+}
diff --git
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
similarity index 98%
rename from
backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
rename to
backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
index 498b7ff4f..5aeafbf81 100644
---
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
@@ -294,11 +294,12 @@ object OptimizeTableCommandOverwrites extends Logging {
// Generally, a bin is a group of existing files, whose total size
does not exceed the
// desired maxFileSize. They will be coalesced into a single
output file.
// However, if isMultiDimClustering = true, all files in a
partition will be read by the
- // same job, the data will be range-partitioned and numFiles =
totalFileSize / maxFileSize
+ // same job, the data will be range-partitioned and
+ // numFiles = totalFileSize / maxFileSize
// will be produced. See below.
// isMultiDimClustering is always false for Gluten Clickhouse for
now
- if (file.size + currentBinSize > maxTargetFileSize /*&&
!isMultiDimClustering */ ) {
+ if (file.size + currentBinSize > maxTargetFileSize /* &&
!isMultiDimClustering */ ) {
bins += currentBin.toVector
currentBin.clear()
currentBin += file
@@ -317,7 +318,7 @@ object OptimizeTableCommandOverwrites extends Logging {
.map(b => (partition, b))
// select bins that have at least two files or in case of multi-dim
clustering
// select all bins
- .filter(_._2.size > 1 /*|| isMultiDimClustering*/ )
+ .filter(_._2.size > 1 /* || isMultiDimClustering */ )
}
}
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
index c6abbd444..e751c2fda 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
@@ -30,6 +30,7 @@ import java.io.File
// Some sqls' line length exceeds 100
// scalastyle:off line.size.limit
+// This suite is to make sure clickhouse commands works well even after spark
restart
class GlutenClickHouseTableAfterRestart
extends GlutenClickHouseTPCHAbstractSuite
with AdaptiveSparkPlanHelper {
@@ -73,7 +74,9 @@ class GlutenClickHouseTableAfterRestart
override protected def initializeSession(): Unit = {
if (_hiveSpark == null) {
- val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db"
+ val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db_" +
current_db_num
+ current_db_num += 1
+
_hiveSpark = SparkSession
.builder()
.config(sparkConf)
@@ -108,6 +111,8 @@ class GlutenClickHouseTableAfterRestart
}
}
+ var current_db_num: Int = 0
+
test("test mergetree after restart") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree;
@@ -182,13 +187,157 @@ class GlutenClickHouseTableAfterRestart
assert(stats2.missCount() - oldMissingCount2 == 0)
}
+ val oldMissingCount1 =
ClickhouseSnapshot.deltaScanCache.stats().missCount()
+ val oldMissingCount2 =
ClickhouseSnapshot.addFileToAddMTPCache.stats().missCount()
+
+ restartSpark()
+
+ runTPCHQueryBySQL(1, sqlStr)(_ => {})
+
+ // after restart, additionally check stats of delta scan cache
+ val stats1 = ClickhouseSnapshot.deltaScanCache.stats()
+ assert(stats1.missCount() - oldMissingCount1 == 1)
+ val stats2 = ClickhouseSnapshot.addFileToAddMTPCache.stats()
+ assert(stats2.missCount() - oldMissingCount2 == 6)
+
+ }
+
+ test("test optimize after restart") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS table_restart_optimize;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS table_restart_optimize (id
bigint, name string)
+ |USING clickhouse
+ |LOCATION '$basePath/table_restart_optimize'
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table table_restart_optimize values (1,"tom"),
(2, "jim")
+ |""".stripMargin)
+ // second file
+ spark.sql(s"""
+ | insert into table table_restart_optimize values (1,"tom"),
(2, "jim")
+ |""".stripMargin)
+
+ restartSpark()
+
+ spark.sql("optimize table_restart_optimize")
+ assert(spark.sql("select count(*) from
table_restart_optimize").collect().apply(0).get(0) == 4)
+ }
+
+ test("test vacuum after restart") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS table_restart_vacuum;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS table_restart_vacuum (id bigint,
name string)
+ |USING clickhouse
+ |LOCATION '$basePath/table_restart_vacuum'
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table table_restart_vacuum values (1,"tom"),
(2, "jim")
+ |""".stripMargin)
+ // second file
+ spark.sql(s"""
+ | insert into table table_restart_vacuum values (1,"tom"),
(2, "jim")
+ |""".stripMargin)
+
+ spark.sql("optimize table_restart_vacuum")
+
+ restartSpark()
+
+ spark.sql("set spark.gluten.enabled=false")
+ spark.sql("vacuum table_restart_vacuum")
+ spark.sql("set spark.gluten.enabled=true")
+
+ assert(spark.sql("select count(*) from
table_restart_vacuum").collect().apply(0).get(0) == 4)
+ }
+
+ test("test update after restart") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS table_restart_update;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS table_restart_update (id bigint,
name string)
+ |USING clickhouse
+ |LOCATION '$basePath/table_restart_update'
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table table_restart_update values (1,"tom"),
(2, "jim")
+ |""".stripMargin)
+ // second file
+ spark.sql(s"""
+ | insert into table table_restart_update values (1,"tom"),
(2, "jim")
+ |""".stripMargin)
+
+ restartSpark()
+
+ spark.sql("update table_restart_update set name = 'tom' where id = 1")
+
+ assert(spark.sql("select count(*) from
table_restart_update").collect().apply(0).get(0) == 4)
+ }
+
+ test("test delete after restart") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS table_restart_delete;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS table_restart_delete (id bigint,
name string)
+ |USING clickhouse
+ |LOCATION '$basePath/table_restart_delete'
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table table_restart_delete values (1,"tom"),
(2, "jim")
+ |""".stripMargin)
+ // second file
+ spark.sql(s"""
+ | insert into table table_restart_delete values (1,"tom"),
(2, "jim")
+ |""".stripMargin)
+
+ restartSpark()
+
+ spark.sql("delete from table_restart_delete where where id = 1")
+
+ assert(spark.sql("select count(*) from
table_restart_delete").collect().apply(0).get(0) == 2)
+ }
+
+ test("test drop after restart") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS table_restart_drop;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS table_restart_drop (id bigint,
name string)
+ |USING clickhouse
+ |LOCATION '$basePath/table_restart_drop'
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table table_restart_drop values (1,"tom"), (2,
"jim")
+ |""".stripMargin)
+ // second file
+ spark.sql(s"""
+ | insert into table table_restart_drop values (1,"tom"), (2,
"jim")
+ |""".stripMargin)
+
+ restartSpark()
+
+ spark.sql("drop table table_restart_drop")
+ }
+
+ private def restartSpark(): Unit = {
// now restart
ClickHouseTableV2.clearCache()
ClickhouseSnapshot.clearAllFileStatusCache()
- val oldMissingCount1 =
ClickhouseSnapshot.deltaScanCache.stats().missCount()
- val oldMissingCount2 =
ClickhouseSnapshot.addFileToAddMTPCache.stats().missCount()
-
val session = getActiveSession.orElse(getDefaultSession)
if (session.isDefined) {
session.get.stop()
@@ -196,29 +345,22 @@ class GlutenClickHouseTableAfterRestart
SparkSession.clearDefaultSession()
}
- val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db"
+ val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db_"
// use metastore_db2 to avoid issue: "Another instance of Derby may have
already booted the database"
- val destDir = new File(hiveMetaStoreDB + "2")
+ val destDir = new File(hiveMetaStoreDB + current_db_num)
destDir.mkdirs()
- FileUtils.copyDirectory(new File(hiveMetaStoreDB), destDir)
+ FileUtils.copyDirectory(new File(hiveMetaStoreDB + (current_db_num - 1)),
destDir)
_hiveSpark = null
_hiveSpark = SparkSession
.builder()
.config(sparkConf)
.enableHiveSupport()
- .config("javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=${hiveMetaStoreDB}2")
+ .config(
+ "javax.jdo.option.ConnectionURL",
+ s"jdbc:derby:;databaseName=$hiveMetaStoreDB$current_db_num")
.master("local[2]")
.getOrCreate()
-
- runTPCHQueryBySQL(1, sqlStr)(_ => {})
-
- // after restart, additionally check stats of delta scan cache
- val stats1 = ClickhouseSnapshot.deltaScanCache.stats()
- assert(stats1.missCount() - oldMissingCount1 == 1)
- val stats2 = ClickhouseSnapshot.addFileToAddMTPCache.stats()
- assert(stats2.missCount() - oldMissingCount2 == 6)
-
+ current_db_num += 1
}
-
}
// scalastyle:off line.size.limit
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]