AhtyTheCoders opened a new issue, #14463:
URL: https://github.com/apache/iceberg/issues/14463

   ### Query engine
   
   I have a Flink application using multiple HybridSource classes to derive 
historical data from Iceberg and then keep updating online from kafka source, 
so there are 10 HybridSources in my code that essentially are getting joined ( 
connected in Flink terms ) one way or another and sink to Clickhouse. When I 
was deriving entire history from KafkaSource alone, everything was working 
fine, but when I switched to HybridSource checkpoints began failing with the 
following problem:
   ```[Checkpoint Timer] WARN 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager - Failed to 
trigger or complete checkpoint 1 for job 000000004e9d64eb0000000000000000. (0 
consecutive failed attempts so far) 
org.apache.flink.runtime.checkpoint.CheckpointException: An Exception occurred 
while triggering the checkpoint. IO-problem detected. at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.getCheckpointException(CheckpointCoordinator.java:2511)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:1200)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:1178)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$7(CheckpointCoordinator.java:905)
 at 
java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
 at 
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.jav
 a:907) at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
 at org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:64) at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:829) Caused by: 
java.io.IOException: Serialization failed because the record length would 
exceed 2GB (max addressable array size in Java). at 
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:348)
 at 
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.
 java:124) at 
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:115)
 at 
org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer.serializePendingSplits(IcebergEnumeratorStateSerializer.java:153)
 at 
org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer.serializeV2(IcebergEnumeratorStateSerializer.java:97)
 at 
org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer.serialize(IcebergEnumeratorStateSerializer.java:58)
 at 
org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer.serialize(IcebergEnumeratorStateSerializer.java:34)
 at 
org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.snapshotState(HybridSourceSplitEnumerator.java:180)
 at 
org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.snapshotState(HybridSourceSplitEnumerator.java:69)
 at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.toBytes(SourceCoordinator.java:580)
  at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$checkpointCoordinator$7(SourceCoordinator.java:424)
 at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:533)
 at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
 ... 6 more```
   
   The code essentially looks like this:
   
   
   ### Question
   
   I have a Flink application using multiple HybridSource classes to derive 
historical data from Iceberg and then keep updating online from kafka source, 
so there are 10 HybridSources in my code that essentially are getting joined ( 
connected in Flink terms ) one way or another and sink to Clickhouse. When I 
was deriving entire history from KafkaSource alone, everything was working 
fine, but when I switched to HybridSource checkpoints began failing with the 
following problem:
   ```
   [Checkpoint Timer] WARN 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager - Failed to 
trigger or complete checkpoint 1 for job 000000004e9d64eb0000000000000000. (0 
consecutive failed attempts so far) 
org.apache.flink.runtime.checkpoint.CheckpointException: An Exception occurred 
while triggering the checkpoint. IO-problem detected. at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.getCheckpointException(CheckpointCoordinator.java:2511)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:1200)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:1178)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$7(CheckpointCoordinator.java:905)
 at 
java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
 at 
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:9
 07) at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
 at org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:64) at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:829) Caused by: 
java.io.IOException: Serialization failed because the record length would 
exceed 2GB (max addressable array size in Java). at 
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:348)
 at 
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.jav
 a:124) at 
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:115)
 at 
org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer.serializePendingSplits(IcebergEnumeratorStateSerializer.java:153)
 at 
org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer.serializeV2(IcebergEnumeratorStateSerializer.java:97)
 at 
org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer.serialize(IcebergEnumeratorStateSerializer.java:58)
 at 
org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer.serialize(IcebergEnumeratorStateSerializer.java:34)
 at 
org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.snapshotState(HybridSourceSplitEnumerator.java:180)
 at 
org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.snapshotState(HybridSourceSplitEnumerator.java:69)
 at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.toBytes(SourceCoordinator.java:580)
 at
  
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$checkpointCoordinator$7(SourceCoordinator.java:424)
 at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:533)
 at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
 ... 6 more
   ```
   
   The code essentially looks like this:
   
   ```
   public static <
               PK,
               E extends Entity<PK>>
       DataStream<E> getProtobufConfluentCleanCdcStream(
               final StreamExecutionEnvironment env,
               final HistoricalDashboardSourceConfig<E> config
       ) throws IOException {
           final TableLoader tableLoader = 
TableLoader.fromCatalog(IcebergConfig.getCatalogLoader(), 
config.historyConfig.icebergConfig.getTableIdentifier());
   
           tableLoader.open();
   
           final Table table = tableLoader.loadTable();
           final Schema schema = table.schema();
   
           Source<Tuple2<E, Long>, ?, ?> icebergSource = IcebergSource
                   .forOutputType(new RowDataConverter<Tuple2<E, Long>>() {
                       @Override
                       public TypeInformation<Tuple2<E, Long>> 
getProducedType() {
                           return new 
TupleTypeInfo<>(TypeInformation.of(config.entityClass), 
BasicTypeInfo.LONG_TYPE_INFO);
                       }
   
                       @Override
                       public Tuple2<E, Long> apply(RowData rowData) {
                           return 
Tuple2.of(config.historyConfig.mapper.apply(rowData, schema), Long.MIN_VALUE);
                       }
                   })
                   .tableLoader(tableLoader)
                   .assignerFactory(new SimpleSplitAssignerFactory())
                   .streaming(false)
                   .build();
   
           tableLoader.close();
   
           Source<Tuple2<E, Long>, ?, ?> cdcSource = KafkaSource.<Tuple2<E, 
Long>>builder()
                   .setBootstrapServers(config.cdcConfig.topicConfig.brokers)
                   .setTopics(config.cdcConfig.topicConfig.topic)
                   .setGroupId(config.cdcConfig.topicConfig.consumerGroup)
                   .setProperties(config.cdcConfig.topicConfig.props)
                   .setStartingOffsets(OffsetsInitializer.earliest())
                   .setDeserializer(new 
CdcKafkaProtobufConfluentDeserializer<>(config.cdcConfig.topicConfig.schemaRegistry,
 config.cdcConfig.mapper, config.entityClass))
                   .build();
   
           HybridSource<Tuple2<E, Long>> hybrid = HybridSource
                   .builder(icebergSource)
                   .addSource(cdcSource)
                   .build();
   
           return env.fromSource(hybrid, WatermarkStrategy.noWatermarks(), 
config.cdcConfig.topicConfig.topic + "_hybrid")
                   .setParallelism(config.historyConfig.parallelism)
                   .returns(new 
TupleTypeInfo<>(TypeInformation.of(config.entityClass), 
BasicTypeInfo.LONG_TYPE_INFO))
                   .keyBy(e -> e.f0.getPrimaryKey())
                   .process(new FilterLate<>())
                   .uid(config.cdcConfig.topicConfig.topic + "_hybrid")
                   .returns(TypeInformation.of(config.entityClass));
   
       }
   ```
   
   As I said there are 10 of such sources that are read in flink streaming 
application mode.
   
   What I tried on my own that didn't work:
   1. Vary parameters splitSize, splitLookback, splitOpenFileCost, 
splitComparator. 
   2. Tried to give more machines up to 400 cores, 3TB RAM, 50TB disk.
   3. Researched Iceberg tables for amount of files for the biggest table = 
18700, amount of manifests for the biggest table <= 100, max file in bytes = 
350MB.
   
   The job is resource intensive, the biggest iceberg source data amount is 
20TB, next is 6TB, other ones are lower than 1TB. There is no Data Skew and all 
operations are constant inside the business logic after reading sources.
   
   Flink submit looks like this:
   `flink run-application -t yarn-application 
-Djobmanager.memory.process.size=100000m -Djobmanager.memory.heap.size=97000m 
-Djobmanager.memory.jvm-overhead.min=1024m 
-Djobmanager.memory.jvm-overhead.max=4096m -Dtaskmanager.numberOfTaskSlots=13 
-Dtaskmanager.network.size=0.15 -Dtaskmanager.network.size.min=128mb 
-Dtaskmanager.network.size.max=12gb -Dtaskmanager.memory.task.heap.size=58000m 
-Dtaskmanager.memory.process.size=117000m 
-Dtaskmanager.memory.managed.fraction=0.25 
-Dtaskmanager.memory.jvm-overhead.min=256m 
-Dtaskmanager.memory.jvm-overhead.max=4096m 
-Dtaskmanager.memory.task.off-heap.size=2g 
-Dtaskmanager.memory.framework.off-heap.size=2g -Dparallelism.default=260 
-Dyarn.containers.vcores=14 -Dheartbeat.interval=30000 
-Dheartbeat.timeout=90000 -Dstate.backend.type=rocksdb 
-Dstate.backend.rocksdb.localdir=/mnt/s3,/mnt1/s3,/mnt2/s3,/mnt3/s3,/mnt4/s3 
-Dstate.backend.incremental=true -Dexecution.checkpointing.storage=filesystem 
-Dexecution.checkpointing.interval=20min -Dexecutio
 n.checkpointing.min-pause=10min 
-Dexecution.checkpointing.max-concurrent-checkpoints=1 
-Dexecution.checkpointing.mode=AT_LEAST_ONCE 
-Dexecution.checkpointing.timeout=120min 
-Dexecution.checkpointing.tolerable-failed-checkpoints=10 
-Dexecution.checkpointing.dir=s3a://checkpoints-dir 
-Dfs.s3a.endpoint=s3.eu-central-1.amazonaws.com 
-Dfs.s3a.readahead.range=104857600 -Dfs.s3a.block.size=134217728 
-Dfs.s3a.threads.max=20 -Dfs.s3a.connection.maximum=150 
-Dfs.s3a.connection.timeout=5000 -Dfs.s3a.fast.upload.buffer=disk 
-Dfs.s3a.committer.magic.enabled=true -Dfs.s3a.committer.name=magic 
-Dfs.s3a.multipart.size=104857600 -Dfs.s3a.aws.region=eu-central-1 
-Dpekko.ask.timeout=600s -Dyarn.application.name=app_name 
-Daws.credentialsCache.enabled=true -Daws.credentialsCache.maxSize=100 
-Daws.credentialsCache.ttl=15m -Daws.imds.client.numRetries=100 
-Daws.imds.client.connectionTimeout=40000 -Daws.imds.client.readTimeout=40000 
--class com.bytefrontier.dataplatform.dashboard.client_attached_agg.Clien
 tAttachedAgg s3://artifacts-dp/preprod/flink/jars/client_attached_agg.jar`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to