CodiumAI-Agent commented on PR #9520: URL: https://github.com/apache/incubator-gluten/pull/9520#issuecomment-2853277311
## PR Reviewer Guide ๐ Here are some key observations to aid the review process: <table> <tr><td> **๐ซ Ticket compliance analysis โ ** **[9517](https://github.com/apache/incubator-gluten/issues/9517) - PR Code Verified** Compliant requirements: - Provide configuration option to disable local disk cache for MergeTree - Ensure tables on HDFS and Minio can be created and written without a cache disk - Update tests to cover scenarios with disk cache disabled Requires further human verification: - Manual end-to-end testing on actual HDFS and S3/Minio clusters to verify cache-less writes </td></tr> <tr><td>โฑ๏ธ <strong>Estimated effort to review</strong>: 4 ๐ต๐ต๐ต๐ตโช</td></tr> <tr><td>๐งช <strong>PR contains tests</strong></td></tr> <tr><td>๐ <strong>No security concerns identified</strong></td></tr> <tr><td>โก <strong>Recommended focus areas for review</strong><br><br> <details><summary><a href='https://github.com/apache/incubator-gluten/pull/9520/files#diff-ac385e1de3d82890f3ff2c3a30f087563105c87ace7331edf1036d1057266889R85-R131'><strong>Configuration Application</strong></a> Verify that `builder.build(conf)` and `setStoreConfig` are correctly returning and mutating the SparkConf, and that `conf.setCHConfig` calls apply the intended ClickHouse settings. </summary> ```scala def build(conf: SparkConf): SparkConf = { val prefix = extractStorageType(diskType) val disk = if (useRocksDB) s"${prefix}2" else prefix val disk_cache = if (useRocksDB) s"${prefix}_cache2" else s"${prefix}_cache" val main_disk = if (useDiskcache) disk_cache else disk require(ENDPOINT != null, "ENDPOINT is null") require(METADATA_PATH != null, "METADATA_PATH is null") val settings = mutable.ArrayBuffer[(String, String)]() settings.appendAll( Seq( s"storage_configuration.disks.$disk.type" -> diskType, s"storage_configuration.disks.$disk.endpoint" -> s"$ENDPOINT", s"storage_configuration.disks.$disk.metadata_path" -> METADATA_PATH )) if (ak != null && sk != null) { settings.appendAll( Seq( s"storage_configuration.disks.$disk.access_key_id" -> ak, s"storage_configuration.disks.$disk.secret_access_key" -> sk )) } if (useDiskcache) { require(CACHE_PATH != null, "CACHE_PATH is null") settings.appendAll( Seq( s"storage_configuration.disks.$disk_cache.type" -> "cache", s"storage_configuration.disks.$disk_cache.disk" -> disk, s"storage_configuration.disks.$disk_cache.path" -> CACHE_PATH, s"storage_configuration.disks.$disk_cache.max_size" -> "10Gi" )) } settings.appendAll( Seq( s"storage_configuration.policies.$policyName.volumes" -> "main", s"storage_configuration.policies.$policyName.volumes.main.disk" -> main_disk )) if (useRocksDB) { settings.append(s"storage_configuration.disks.$disk.metadata_type" -> "rocksdb") } settings.foreach(setting => conf.setCHConfig(setting)) conf } ``` </details> <details><summary><a href='https://github.com/apache/incubator-gluten/pull/9520/files#diff-9c9cfdcecec7a203ecde18cc51c33062b5d458a815e86a286dfcd14ae986f908R95-R115'><strong>Disk Type Builder</strong></a> Ensure the new `StoreConfigBuilder` integration for S3 (with and without cache) sets all needed properties and returns the updated SparkConf for both cache and nocache policies. </summary> ```scala .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") } def builder(policyName: String): StoreConfigBuilder = new StoreConfigBuilder(policyName, GlutenObjectStorageConfig.S3_DISK_TYPE) def setStoreConfig(conf: SparkConf, BUCKET_NAME: String): SparkConf = { builder(STORE_POLICY) .withEndpoint(s"$MINIO_ENDPOINT$BUCKET_NAME/") .withMetadataPath(S3_METADATA_PATH) .withCachePath(S3_CACHE_PATH) .withAKSK(S3_ACCESS_KEY, S3_SECRET_KEY) .build(conf) builder(STORE_POLICY_NOCACHE) .withEndpoint(s"$MINIO_ENDPOINT$BUCKET_NAME/") .withMetadataPath(S3_METADATA_PATH) .withDiskcache(false) .withAKSK(S3_ACCESS_KEY, S3_SECRET_KEY) .build(conf) } ``` </details> <details><summary><a href='https://github.com/apache/incubator-gluten/pull/9520/files#diff-8b7d8a6024cef52679b24e9e4a7bbb15770302819ad9a9b14443524cf9d4a845R106-R121'><strong>Default Behavior Change</strong></a> Check that the new `MERGE_AFTER_INSERT` and `INSERT_WITHOUT_LOCAL_STORAGE` defaults align with existing workflows and do not break other MergeTree write paths. </summary> ```scala val MERGE_AFTER_INSERT = buildConf(runtimeSettings("mergetree.merge_after_insert")) .doc(s"""Merge after insert in each task. |Set to false If DeltaOptimizedWriterTransformer is used |""".stripMargin) .booleanConf .createWithDefault(true) val INSERT_WITHOUT_LOCAL_STORAGE = buildConf(runtimeSettings("mergetree.insert_without_local_storage")) .doc(s"""When insert into remote storage, don't write to local temporary storage first. |Set to true If DeltaOptimizedWriterTransformer is used |""".stripMargin) .booleanConf .createWithDefault(false) } ``` </details> </td></tr> </table> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
