mao-liu opened a new issue, #6565: URL: https://github.com/apache/paimon/issues/6565
### Search before asking - [x] I searched in the [issues](https://github.com/apache/paimon/issues) and found nothing similar. ### Paimon version Paimon 1.2 ### Compute Engine Flink 1.20 [test_logs.txt](https://github.com/user-attachments/files/23430526/test_logs.txt) ### Minimal reproduce step ```kotlin package com.atlassian.plato.data.job.ingest.sink import io.kotest.core.spec.style.StringSpec import org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.data.GenericRowData import org.apache.flink.table.data.RowData import org.apache.flink.table.data.StringData import org.apache.flink.test.util.MiniClusterWithClientResource import org.apache.paimon.flink.FlinkConnectorOptions import org.apache.paimon.flink.sink.FlinkSinkBuilder import org.apache.paimon.fs.local.LocalFileIO import org.apache.paimon.schema.Schema import org.apache.paimon.schema.SchemaManager import org.apache.paimon.table.CatalogEnvironment import org.apache.paimon.table.FileStoreTable import org.apache.paimon.table.FileStoreTableFactory import org.apache.paimon.types.DataTypes import java.io.File import org.apache.paimon.fs.Path as PaimonPath import org.apache.paimon.options.Options as PaimonOptions class PaimonCommitterResourcesTest : StringSpec() { private val basePath = "/tmp/paimon_sink_commiter_resources_test" private val paimonOptions = PaimonOptions( mapOf( FlinkConnectorOptions.SINK_COMMITTER_CPU.key() to "2", FlinkConnectorOptions.SINK_COMMITTER_MEMORY.key() to "2000 mb", ), ) private val miniFlinkCluster = MiniClusterWithClientResource( MiniClusterResourceConfiguration .Builder() .setNumberSlotsPerTaskManager(1) .setNumberTaskManagers(2) .build(), ) init { beforeSpec { miniFlinkCluster.before() File(basePath).deleteRecursively() } afterSpec { miniFlinkCluster.after() File(basePath).deleteRecursively() } "streaming mode sets global committer resources" { val tablePath = "$basePath/test_streaming" val table = createTestPaimonTable(tablePath) val env = StreamExecutionEnvironment .getExecutionEnvironment() .setParallelism(2) .setRuntimeMode(RuntimeExecutionMode.STREAMING) writeRecords(env, table) // from logs // [flink-pekko.actor.default-dispatcher-6] INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Allocated slot for c56c8f145715dcc5253b375dc2bff016 with resources ResourceProfile{cpuCores=2, taskHeapMemory=1.953gb (2097152000 bytes), taskOffHeapMemory=0 bytes, managedMemory=0 bytes, networkMemory=320.000kb (327680 bytes)}. // [flink-pekko.actor.default-dispatcher-6] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Global Committer : test_streaming -> end: Writer (1/1) (attempt #0) with attempt id f29d6a3854e6e59149aa4d5bb957c66f_d14a00a5530c873d55f22dd652832843_0_0 and vertex id d14a00a5530c873d55f22dd652832843_0 to fa43ad1b-8e10-4190-a2f5-d71a4c6aa939 @ localhost (dataPort=63616) with allocation id c56c8f145715dcc5253b375dc2bff016 } "batch mode sets global committer resources" { val tablePath = "$basePath/test_batch" val table = createTestPaimonTable(tablePath) val batchConfiguration = Configuration.fromMap( mapOf( "fine-grained.shuffle-mode.all-blocking" to "true", ), ) val env = StreamExecutionEnvironment .getExecutionEnvironment(batchConfiguration) .setParallelism(2) .setRuntimeMode(RuntimeExecutionMode.BATCH) writeRecords(env, table) // from logs // [flink-pekko.actor.default-dispatcher-7] INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Allocated slot for 2cb8bf8ffd5ef76b5f02f0b3f2d3ae35 with resources ResourceProfile{taskHeapMemory=1024.000gb (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes), managedMemory=80.000mb (83886080 bytes), networkMemory=64.000mb (67108864 bytes)}. // [flink-pekko.actor.default-dispatcher-4] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: Collection Source -> Map (1/1) (attempt #0) with attempt id 5964ede24e0614729c00b730978155d0_cbc357ccb763df2852fee8c4fc7d55f2_0_0 and vertex id cbc357ccb763df2852fee8c4fc7d55f2_0 to 18dd7510-fb44-4a04-a422-3c218344346f @ localhost (dataPort=63615) with allocation id 2cb8bf8ffd5ef76b5f02f0b3f2d3ae35 // [flink-pekko.actor.default-dispatcher-8] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying dynamic-bucket-assigner (1/1) (attempt #0) with attempt id 5964ede24e0614729c00b730978155d0_9dd63673dd41ea021b896d5203f3ba7c_0_0 and vertex id 9dd63673dd41ea021b896d5203f3ba7c_0 to 18dd7510-fb44-4a04-a422-3c218344346f @ localhost (dataPort=63615) with allocation id 2cb8bf8ffd5ef76b5f02f0b3f2d3ae35 // [flink-pekko.actor.default-dispatcher-6] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Writer : test_batch (1/1) (attempt #0) with attempt id 5964ede24e0614729c00b730978155d0_1a936cb48657826a536f331e9fb33b5e_0_0 and vertex id 1a936cb48657826a536f331e9fb33b5e_0 to 18dd7510-fb44-4a04-a422-3c218344346f @ localhost (dataPort=63615) with allocation id 2cb8bf8ffd5ef76b5f02f0b3f2d3ae35 // [flink-pekko.actor.default-dispatcher-6] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Global Committer : test_batch -> end: Writer (1/1) (attempt #0) with attempt id 5964ede24e0614729c00b730978155d0_d14a00a5530c873d55f22dd652832843_0_0 and vertex id d14a00a5530c873d55f22dd652832843_0 to 18dd7510-fb44-4a04-a422-3c218344346f @ localhost (dataPort=63615) with allocation id 2cb8bf8ffd5ef76b5f02f0b3f2d3ae35 } } private fun createTestPaimonTable(tablePath: String): FileStoreTable { val paimonPath = PaimonPath(tablePath) val fileIO = LocalFileIO.create() val schemaBuilder = Schema.newBuilder() schemaBuilder.primaryKey("partition_key", "primary_key") schemaBuilder.partitionKeys("partition_key") schemaBuilder.column("partition_key", DataTypes.STRING()) schemaBuilder.column("primary_key", DataTypes.STRING()) schemaBuilder.column("value", DataTypes.STRING()) val schemaManager = SchemaManager(fileIO, paimonPath) schemaManager.createTable(schemaBuilder.build(), true) return FileStoreTableFactory.create(fileIO, paimonPath, schemaManager.latest().get(), paimonOptions, CatalogEnvironment.empty()) } private fun writeRecords( env: StreamExecutionEnvironment, table: FileStoreTable, ) { val testRowData = listOf( createRowData("a", "aa", "aaaa"), createRowData("a", "ab", "abab"), createRowData("b", "bb", "bbbb"), ) FlinkSinkBuilder(table) .forRowData( env .fromData(testRowData) .returns(TypeInformation.of(RowData::class.java)), ).build() env.execute("write") } private fun createRowData( partitionKey: String, primaryKey: String, value: String, ): RowData { val rowData = GenericRowData(3) rowData.setField(0, StringData.fromString(partitionKey)) rowData.setField(1, StringData.fromString(primaryKey)) rowData.setField(2, StringData.fromString(value)) return rowData } } ``` ### What doesn't meet your expectations? The settings `sink.commiter-cpu` and `sink.commiter-memory` are very important for compaction when the table and the size of the commitable is very large. When setting these values in a streaming job, the resource configurations are applied correctly, and the global committer gets more resources. When setting these values in a batch job, the global committer instead gets allocated to an existing slot, and does not get the specified resources. This behaviour is reproducible on the Flink local MiniCluster. See logs attached and as inline comments. Tracing the allocation id for the Global Committer operator, we can see that it gets the specified resources in streaming mode, but not batch mode. ### Anything else? We are handling very large tables in our jobs. Since we have multiple writers, we therefore need to rely on dedicated compaction jobs in batch mode. We use high writer parallelism (scale wide), and need to allocate more resources for the global committer (scale tall). We have found that full compaction can struggle to complete in the Global Committer step if the committable gets too large (e.g. if compaction hasn't ran frequently enough). Sometimes, the Global Committer takes longer time than the Compaction writer step, leading to very poor resource efficiency. ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
