Shekharrajak commented on code in PR #3519: URL: https://github.com/apache/datafusion-comet/pull/3519#discussion_r2813525711
########## spark/src/test/scala/org/apache/spark/sql/comet/CometNativeCompaction.scala: ########## @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet + +import scala.collection.JavaConverters._ + +import org.apache.iceberg.{FileScanTask, Table} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import org.apache.comet.Native + +/** Configuration for Iceberg table metadata passed to native code. */ +case class IcebergTableConfig( + table_identifier: String, + metadata_location: String, + warehouse_location: String, + current_snapshot_id: Option[Long], + file_io_properties: Map[String, String]) + +/** File scan task configuration for native compaction. */ +case class FileScanTaskConfig( + file_path: String, + file_size_bytes: Long, + record_count: Long, + partition_path: String, // e.g., "year=2024/month=01" or "" for unpartitioned + partition_spec_id: Int, + start: Long, + length: Long) + +/** Compaction task configuration for native execution. */ +case class CompactionTaskConfig( + table_config: IcebergTableConfig, + file_scan_tasks: Seq[FileScanTaskConfig], + target_file_size_bytes: Long, + compression: String, + data_dir: String) + +/** Iceberg DataFile metadata from native compaction. */ +case class IcebergDataFileMetadata( + file_path: String, + file_format: String, + record_count: Long, + file_size_in_bytes: Long, + partition_json: String, + column_sizes: Map[Int, Long], + value_counts: Map[Int, Long], + null_value_counts: Map[Int, Long], + split_offsets: Seq[Long], + partition_spec_id: Int) + +/** Result of native Iceberg compaction. */ +case class IcebergCompactionResult( + files_to_delete: Seq[String], + files_to_add: Seq[IcebergDataFileMetadata], + total_rows: Long, + total_bytes_written: Long) + +/** Native compaction execution result. */ +case class NativeCompactionResult( + success: Boolean, + error_message: Option[String], + result: Option[IcebergCompactionResult]) + +/** + * Native Iceberg compaction using Rust/DataFusion for scan+write, Java API for commit. + */ +class CometNativeCompaction(spark: SparkSession) extends Logging { + + private val objectMapper = new ObjectMapper().registerModule(DefaultScalaModule) + private val native = new Native() + + val DEFAULT_TARGET_FILE_SIZE: Long = 128 * 1024 * 1024 // 128 MB + val DEFAULT_COMPRESSION: String = "zstd" + + /** Rewrite data files using native scan+write with Iceberg Java API commit. */ + def rewriteDataFiles( + table: Table, + targetFileSizeBytes: Long = DEFAULT_TARGET_FILE_SIZE, + compression: String = DEFAULT_COMPRESSION): CompactionSummary = { + + logInfo(s"Starting native compaction for table ${table.name()}") + + val currentSnapshot = Option(table.currentSnapshot()) + val currentSnapshotId = currentSnapshot.map(_.snapshotId()) + val tableConfig = buildTableConfig(table, currentSnapshotId) + val fileGroups = planFileGroups(table, targetFileSizeBytes) + + if (fileGroups.isEmpty) { + logInfo("No files to compact") + return CompactionSummary( + filesDeleted = 0, + filesAdded = 0, + bytesDeleted = 0, + bytesAdded = 0, + rowsProcessed = 0) + } + + logInfo(s"Found ${fileGroups.size} file groups to compact") + + var totalFilesDeleted = 0 + var totalFilesAdded = 0 + var totalBytesDeleted = 0L + var totalBytesAdded = 0L + var totalRowsProcessed = 0L + + val allFilesToDelete = scala.collection.mutable.ArrayBuffer[String]() + val allFilesToAdd = scala.collection.mutable.ArrayBuffer[IcebergDataFileMetadata]() + + for ((group, groupIndex) <- fileGroups.zipWithIndex) { + logInfo( + s"Processing file group ${groupIndex + 1}/${fileGroups.size} " + + s"with ${group.size} files") + + val compactionConfig = + buildCompactionConfig(tableConfig, group, targetFileSizeBytes, compression) + val result = executeNativeCompaction(compactionConfig) + + result match { + case NativeCompactionResult(true, _, Some(compactionResult)) => + allFilesToDelete ++= compactionResult.files_to_delete + allFilesToAdd ++= compactionResult.files_to_add + totalFilesDeleted += compactionResult.files_to_delete.size + totalFilesAdded += compactionResult.files_to_add.size + totalRowsProcessed += compactionResult.total_rows + totalBytesAdded += compactionResult.total_bytes_written + for (task <- group) { + totalBytesDeleted += task.file().fileSizeInBytes() + } + + logInfo( + s"Group ${groupIndex + 1} completed: " + + s"${compactionResult.files_to_delete.size} files deleted, " + + s"${compactionResult.files_to_add.size} files added") + + case NativeCompactionResult(false, Some(error), _) => + logError(s"Native compaction failed for group ${groupIndex + 1}: $error") + throw new RuntimeException(s"Native compaction failed: $error") + + case _ => + logError(s"Unexpected native compaction result for group ${groupIndex + 1}") + throw new RuntimeException("Unexpected native compaction result") + } + } + + if (allFilesToAdd.nonEmpty) { + logInfo( + s"Committing compaction: ${allFilesToDelete.size} files to delete, " + + s"${allFilesToAdd.size} files to add") + + val commitSuccess = + commitCompaction(table, allFilesToDelete.toSeq, allFilesToAdd.toSeq) + + if (!commitSuccess) { + throw new RuntimeException("Failed to commit compaction results") + } + + logInfo("Compaction committed successfully") + } + + CompactionSummary( + filesDeleted = totalFilesDeleted, + filesAdded = totalFilesAdded, + bytesDeleted = totalBytesDeleted, + bytesAdded = totalBytesAdded, + rowsProcessed = totalRowsProcessed) + } + + private def buildTableConfig( + table: Table, + currentSnapshotId: Option[Long]): IcebergTableConfig = { + + val tableLocation = table.location() + val metadataLocation = + try { + table + .asInstanceOf[org.apache.iceberg.BaseTable] + .operations() + .current() + .metadataFileLocation() + } catch { + case _: Exception => s"$tableLocation/metadata/v1.metadata.json" + } + val warehouseLocation = tableLocation.substring(0, tableLocation.lastIndexOf('/')) + val fileIOProperties = table.properties().asScala.toMap + + IcebergTableConfig( + table_identifier = table.name(), + metadata_location = metadataLocation, + warehouse_location = warehouseLocation, + current_snapshot_id = currentSnapshotId, + file_io_properties = fileIOProperties) + } + + /** Plan file groups using bin-pack strategy. */ Review Comment: Right now, we are using bin-pack runner strategy for small files into groups. We can extend the other runner and planners. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
