mao-liu opened a new issue, #6565:
URL: https://github.com/apache/paimon/issues/6565

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/paimon/issues) 
and found nothing similar.
   
   
   ### Paimon version
   
   Paimon 1.2
   
   ### Compute Engine
   
   Flink 1.20
   
   
[test_logs.txt](https://github.com/user-attachments/files/23430526/test_logs.txt)
   
   ### Minimal reproduce step
   
   ```kotlin
   package com.atlassian.plato.data.job.ingest.sink
   
   import io.kotest.core.spec.style.StringSpec
   import org.apache.flink.api.common.RuntimeExecutionMode
   import org.apache.flink.api.common.typeinfo.TypeInformation
   import org.apache.flink.configuration.Configuration
   import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
   import org.apache.flink.table.data.GenericRowData
   import org.apache.flink.table.data.RowData
   import org.apache.flink.table.data.StringData
   import org.apache.flink.test.util.MiniClusterWithClientResource
   import org.apache.paimon.flink.FlinkConnectorOptions
   import org.apache.paimon.flink.sink.FlinkSinkBuilder
   import org.apache.paimon.fs.local.LocalFileIO
   import org.apache.paimon.schema.Schema
   import org.apache.paimon.schema.SchemaManager
   import org.apache.paimon.table.CatalogEnvironment
   import org.apache.paimon.table.FileStoreTable
   import org.apache.paimon.table.FileStoreTableFactory
   import org.apache.paimon.types.DataTypes
   import java.io.File
   import org.apache.paimon.fs.Path as PaimonPath
   import org.apache.paimon.options.Options as PaimonOptions
   
   class PaimonCommitterResourcesTest : StringSpec() {
   
       private val basePath = "/tmp/paimon_sink_commiter_resources_test"
       private val paimonOptions =
           PaimonOptions(
               mapOf(
                   FlinkConnectorOptions.SINK_COMMITTER_CPU.key() to "2",
                   FlinkConnectorOptions.SINK_COMMITTER_MEMORY.key() to "2000 
mb",
               ),
           )
   
       private val miniFlinkCluster =
           MiniClusterWithClientResource(
               MiniClusterResourceConfiguration
                   .Builder()
                   .setNumberSlotsPerTaskManager(1)
                   .setNumberTaskManagers(2)
                   .build(),
           )
   
       init {
           beforeSpec {
               miniFlinkCluster.before()
               File(basePath).deleteRecursively()
           }
   
           afterSpec {
               miniFlinkCluster.after()
               File(basePath).deleteRecursively()
           }
   
           "streaming mode sets global committer resources" {
               val tablePath = "$basePath/test_streaming"
   
               val table = createTestPaimonTable(tablePath)
               val env =
                   StreamExecutionEnvironment
                       .getExecutionEnvironment()
                       .setParallelism(2)
                       .setRuntimeMode(RuntimeExecutionMode.STREAMING)
   
               writeRecords(env, table)
   
               // from logs
               // [flink-pekko.actor.default-dispatcher-6] INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Allocated slot 
for c56c8f145715dcc5253b375dc2bff016 with resources ResourceProfile{cpuCores=2, 
taskHeapMemory=1.953gb (2097152000 bytes), taskOffHeapMemory=0 bytes, 
managedMemory=0 bytes, networkMemory=320.000kb (327680 bytes)}.
               // [flink-pekko.actor.default-dispatcher-6] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Global 
Committer : test_streaming -> end: Writer (1/1) (attempt #0) with attempt id 
f29d6a3854e6e59149aa4d5bb957c66f_d14a00a5530c873d55f22dd652832843_0_0 and 
vertex id d14a00a5530c873d55f22dd652832843_0 to 
fa43ad1b-8e10-4190-a2f5-d71a4c6aa939 @ localhost (dataPort=63616) with 
allocation id c56c8f145715dcc5253b375dc2bff016
           }
   
           "batch mode sets global committer resources" {
               val tablePath = "$basePath/test_batch"
   
               val table = createTestPaimonTable(tablePath)
   
               val batchConfiguration =
                   Configuration.fromMap(
                       mapOf(
                           "fine-grained.shuffle-mode.all-blocking" to "true",
                       ),
                   )
   
               val env =
                   StreamExecutionEnvironment
                       .getExecutionEnvironment(batchConfiguration)
                       .setParallelism(2)
                       .setRuntimeMode(RuntimeExecutionMode.BATCH)
   
               writeRecords(env, table)
   
               // from logs
               // [flink-pekko.actor.default-dispatcher-7] INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Allocated slot 
for 2cb8bf8ffd5ef76b5f02f0b3f2d3ae35 with resources 
ResourceProfile{taskHeapMemory=1024.000gb (1099511627776 bytes), 
taskOffHeapMemory=1024.000gb (1099511627776 bytes), managedMemory=80.000mb 
(83886080 bytes), networkMemory=64.000mb (67108864 bytes)}.
               // [flink-pekko.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: 
Collection Source -> Map (1/1) (attempt #0) with attempt id 
5964ede24e0614729c00b730978155d0_cbc357ccb763df2852fee8c4fc7d55f2_0_0 and 
vertex id cbc357ccb763df2852fee8c4fc7d55f2_0 to 
18dd7510-fb44-4a04-a422-3c218344346f @ localhost (dataPort=63615) with 
allocation id 2cb8bf8ffd5ef76b5f02f0b3f2d3ae35
               // [flink-pekko.actor.default-dispatcher-8] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying 
dynamic-bucket-assigner (1/1) (attempt #0) with attempt id 
5964ede24e0614729c00b730978155d0_9dd63673dd41ea021b896d5203f3ba7c_0_0 and 
vertex id 9dd63673dd41ea021b896d5203f3ba7c_0 to 
18dd7510-fb44-4a04-a422-3c218344346f @ localhost (dataPort=63615) with 
allocation id 2cb8bf8ffd5ef76b5f02f0b3f2d3ae35
               // [flink-pekko.actor.default-dispatcher-6] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Writer : 
test_batch (1/1) (attempt #0) with attempt id 
5964ede24e0614729c00b730978155d0_1a936cb48657826a536f331e9fb33b5e_0_0 and 
vertex id 1a936cb48657826a536f331e9fb33b5e_0 to 
18dd7510-fb44-4a04-a422-3c218344346f @ localhost (dataPort=63615) with 
allocation id 2cb8bf8ffd5ef76b5f02f0b3f2d3ae35
               // [flink-pekko.actor.default-dispatcher-6] INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Global 
Committer : test_batch -> end: Writer (1/1) (attempt #0) with attempt id 
5964ede24e0614729c00b730978155d0_d14a00a5530c873d55f22dd652832843_0_0 and 
vertex id d14a00a5530c873d55f22dd652832843_0 to 
18dd7510-fb44-4a04-a422-3c218344346f @ localhost (dataPort=63615) with 
allocation id 2cb8bf8ffd5ef76b5f02f0b3f2d3ae35
   
           }
       }
   
       private fun createTestPaimonTable(tablePath: String): FileStoreTable {
           val paimonPath = PaimonPath(tablePath)
           val fileIO = LocalFileIO.create()
   
           val schemaBuilder = Schema.newBuilder()
           schemaBuilder.primaryKey("partition_key", "primary_key")
           schemaBuilder.partitionKeys("partition_key")
           schemaBuilder.column("partition_key", DataTypes.STRING())
           schemaBuilder.column("primary_key", DataTypes.STRING())
           schemaBuilder.column("value", DataTypes.STRING())
   
           val schemaManager = SchemaManager(fileIO, paimonPath)
           schemaManager.createTable(schemaBuilder.build(), true)
   
           return FileStoreTableFactory.create(fileIO, paimonPath, 
schemaManager.latest().get(), paimonOptions, CatalogEnvironment.empty())
       }
   
       private fun writeRecords(
           env: StreamExecutionEnvironment,
           table: FileStoreTable,
       ) {
           val testRowData =
               listOf(
                   createRowData("a", "aa", "aaaa"),
                   createRowData("a", "ab", "abab"),
                   createRowData("b", "bb", "bbbb"),
               )
   
           FlinkSinkBuilder(table)
               .forRowData(
                   env
                       .fromData(testRowData)
                       .returns(TypeInformation.of(RowData::class.java)),
               ).build()
           env.execute("write")
       }
   
       private fun createRowData(
           partitionKey: String,
           primaryKey: String,
           value: String,
       ): RowData {
           val rowData = GenericRowData(3)
           rowData.setField(0, StringData.fromString(partitionKey))
           rowData.setField(1, StringData.fromString(primaryKey))
           rowData.setField(2, StringData.fromString(value))
           return rowData
       }
   }
   ```
   
   ### What doesn't meet your expectations?
   
   The settings `sink.commiter-cpu` and `sink.commiter-memory` are very 
important for compaction when the table and the size of the commitable is very 
large.
   
   When setting these values in a streaming job, the resource configurations 
are applied correctly, and the global committer gets more resources.
   
   When setting these values in a batch job, the global committer instead gets 
allocated to an existing slot, and does not get the specified resources.
   
   This behaviour is reproducible on the Flink local MiniCluster. See logs 
attached and as inline comments. Tracing the allocation id for the Global 
Committer operator, we can see that it gets the specified resources in 
streaming mode, but not batch mode.
   
   ### Anything else?
   
   We are handling very large tables in our jobs. Since we have multiple 
writers, we therefore need to rely on dedicated compaction jobs in batch mode.
   
   We use high writer parallelism (scale wide), and need to allocate more 
resources for the global committer (scale tall).
   
   We have found that full compaction can struggle to complete in the Global 
Committer step if the committable gets too large (e.g. if compaction hasn't ran 
frequently enough). Sometimes, the Global Committer takes longer time than the 
Compaction writer step, leading to very poor resource efficiency.
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to