AhtyTheCoders opened a new issue, #14464:
URL: https://github.com/apache/iceberg/issues/14464
### Query engine
Flink 1.20.0 launched in EMR 7.10.0.
```
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime-${flink.base.version}</artifactId>
<version>1.8.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws-bundle</artifactId>
<version>1.8.1</version>
<scope>provided</scope>
</dependency>
```
### Question
I have a Flink application using multiple HybridSource classes to derive
historical data from Iceberg and then keep updating online from kafka source,
so there are 10 HybridSources in my code that essentially are getting joined (
connected in Flink terms ) one way or another and sink to Clickhouse. When I
was deriving entire history from KafkaSource alone, everything was working
fine, but when I switched to HybridSource checkpoints began failing with the
following problem:
```
[Checkpoint Timer] WARN
org.apache.flink.runtime.checkpoint.CheckpointFailureManager - Failed to
trigger or complete checkpoint 1 for job 000000004e9d64eb0000000000000000. (0
consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: An Exception occurred
while triggering the checkpoint. IO-problem detected. at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.getCheckpointException(CheckpointCoordinator.java:2511)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:1200)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:1178)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$7(CheckpointCoordinator.java:905)
at
java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
at
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:9
07) at
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:64) at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829) Caused by:
java.io.IOException: Serialization failed because the record length would
exceed 2GB (max addressable array size in Java). at
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:348)
at
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.jav
a:124) at
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:115)
at
org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer.serializePendingSplits(IcebergEnumeratorStateSerializer.java:153)
at
org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer.serializeV2(IcebergEnumeratorStateSerializer.java:97)
at
org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer.serialize(IcebergEnumeratorStateSerializer.java:58)
at
org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer.serialize(IcebergEnumeratorStateSerializer.java:34)
at
org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.snapshotState(HybridSourceSplitEnumerator.java:180)
at
org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.snapshotState(HybridSourceSplitEnumerator.java:69)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.toBytes(SourceCoordinator.java:580)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$checkpointCoordinator$7(SourceCoordinator.java:424)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:533)
at
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
... 6 more
```
The code essentially looks like this:
```
public static <
PK,
E extends Entity<PK>>
DataStream<E> getProtobufConfluentCleanCdcStream(
final StreamExecutionEnvironment env,
final HistoricalDashboardSourceConfig<E> config
) throws IOException {
final TableLoader tableLoader =
TableLoader.fromCatalog(IcebergConfig.getCatalogLoader(),
config.historyConfig.icebergConfig.getTableIdentifier());
tableLoader.open();
final Table table = tableLoader.loadTable();
final Schema schema = table.schema();
Source<Tuple2<E, Long>, ?, ?> icebergSource = IcebergSource
.forOutputType(new RowDataConverter<Tuple2<E, Long>>() {
@Override
public TypeInformation<Tuple2<E, Long>>
getProducedType() {
return new
TupleTypeInfo<>(TypeInformation.of(config.entityClass),
BasicTypeInfo.LONG_TYPE_INFO);
}
@Override
public Tuple2<E, Long> apply(RowData rowData) {
return
Tuple2.of(config.historyConfig.mapper.apply(rowData, schema), Long.MIN_VALUE);
}
})
.tableLoader(tableLoader)
.assignerFactory(new SimpleSplitAssignerFactory())
.streaming(false)
.build();
tableLoader.close();
Source<Tuple2<E, Long>, ?, ?> cdcSource = KafkaSource.<Tuple2<E,
Long>>builder()
.setBootstrapServers(config.cdcConfig.topicConfig.brokers)
.setTopics(config.cdcConfig.topicConfig.topic)
.setGroupId(config.cdcConfig.topicConfig.consumerGroup)
.setProperties(config.cdcConfig.topicConfig.props)
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(new
CdcKafkaProtobufConfluentDeserializer<>(config.cdcConfig.topicConfig.schemaRegistry,
config.cdcConfig.mapper, config.entityClass))
.build();
HybridSource<Tuple2<E, Long>> hybrid = HybridSource
.builder(icebergSource)
.addSource(cdcSource)
.build();
return env.fromSource(hybrid, WatermarkStrategy.noWatermarks(),
config.cdcConfig.topicConfig.topic + "_hybrid")
.setParallelism(config.historyConfig.parallelism)
.returns(new
TupleTypeInfo<>(TypeInformation.of(config.entityClass),
BasicTypeInfo.LONG_TYPE_INFO))
.keyBy(e -> e.f0.getPrimaryKey())
.process(new FilterLate<>())
.uid(config.cdcConfig.topicConfig.topic + "_hybrid")
.returns(TypeInformation.of(config.entityClass));
}
```
As I said there are 10 of such sources that are read in flink streaming
application mode.
What I tried on my own that didn't work:
Vary parameters splitSize, splitLookback, splitOpenFileCost, splitComparator.
Tried to give more machines up to 400 cores, 3TB RAM, 50TB disk.
Researched Iceberg tables for amount of files for the biggest table = 18700,
amount of manifests for the biggest table <= 100, max file in bytes = 350MB.
The job is resource intensive, the biggest iceberg source data amount is
20TB, next is 6TB, other ones are lower than 1TB. There is no Data Skew and all
operations are constant within the business logic when joining those streams
after reading sources.
Flink submit looks like this:
```flink run-application -t yarn-application
-Djobmanager.memory.process.size=100000m -Djobmanager.memory.heap.size=97000m
-Djobmanager.memory.jvm-overhead.min=1024m
-Djobmanager.memory.jvm-overhead.max=4096m -Dtaskmanager.numberOfTaskSlots=13
-Dtaskmanager.network.size=0.15 -Dtaskmanager.network.size.min=128mb
-Dtaskmanager.network.size.max=12gb -Dtaskmanager.memory.task.heap.size=58000m
-Dtaskmanager.memory.process.size=117000m
-Dtaskmanager.memory.managed.fraction=0.25
-Dtaskmanager.memory.jvm-overhead.min=256m
-Dtaskmanager.memory.jvm-overhead.max=4096m
-Dtaskmanager.memory.task.off-heap.size=2g
-Dtaskmanager.memory.framework.off-heap.size=2g -Dparallelism.default=260
-Dyarn.containers.vcores=14 -Dheartbeat.interval=30000
-Dheartbeat.timeout=90000 -Dstate.backend.type=rocksdb
-Dstate.backend.rocksdb.localdir=/mnt/s3,/mnt1/s3,/mnt2/s3,/mnt3/s3,/mnt4/s3
-Dstate.backend.incremental=true -Dexecution.checkpointing.storage=filesystem
-Dexecution.checkpointing.interval=20min -Dexecut
ion.checkpointing.min-pause=10min
-Dexecution.checkpointing.max-concurrent-checkpoints=1
-Dexecution.checkpointing.mode=AT_LEAST_ONCE
-Dexecution.checkpointing.timeout=120min
-Dexecution.checkpointing.tolerable-failed-checkpoints=10
-Dexecution.checkpointing.dir=s3a://checkpoints-dir
-Dfs.s3a.endpoint=s3.eu-central-1.amazonaws.com
-Dfs.s3a.readahead.range=104857600 -Dfs.s3a.block.size=134217728
-Dfs.s3a.threads.max=20 -Dfs.s3a.connection.maximum=150
-Dfs.s3a.connection.timeout=5000 -Dfs.s3a.fast.upload.buffer=disk
-Dfs.s3a.committer.magic.enabled=true -Dfs.s3a.committer.name=magic
-Dfs.s3a.multipart.size=104857600 -Dfs.s3a.aws.region=eu-central-1
-Dpekko.ask.timeout=600s -Dyarn.application.name=app_name
-Daws.credentialsCache.enabled=true -Daws.credentialsCache.maxSize=100
-Daws.credentialsCache.ttl=15m -Daws.imds.client.numRetries=100
-Daws.imds.client.connectionTimeout=40000 -Daws.imds.client.readTimeout=40000
--class myMainClass s3://my_jar.jar```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]