randomdev2026 commented on issue #15418: URL: https://github.com/apache/iceberg/issues/15418#issuecomment-3959367081
Thanks for your quick response. Could be please give me a hint about how to check "Are you restarting Job2 from state?" Currently my jobs are executed locally using Flink 2.0.1 from https://flink.apache.org/downloads/ using "flink run" on CLI, my 2nd Job is just terminated with Ctrl+C and additionaly via Flink Dashboard. There is currently no configuration according https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/state/task_failure_recovery/ Some relevant logs (Logs from TaskManager taken from the Apache Flink Dashboard, and some from a file in logs/flink-myname-standalonesession-0-AM-C2FP39PT9K.log): Both Jobs are running, 1 Event has been produced in Kafka beforehand. Both Jobs doing their work ``` 2026-02-25 13:25:10,597 INFO de.xyz.flink.Job1 [] - Mapping Iceberg (Kafka Source, S3 Sink): Message(headers=RecordHeaders(headers = [], isReadOnly = false), key=, value=This is a predefined message., timestamp=2026-02-25T12:24:59.762Z, sourceTopic=my-topic) 2026-02-25 13:25:14,351 INFO org.apache.iceberg.hadoop.HadoopTableOperations [] - Committed a new metadata file s3a://bucket/mynamespace/my-topic/metadata/v2.metadata.json 2026-02-25 13:25:14,386 INFO org.apache.iceberg.SnapshotProducer [] - Committed snapshot 3959837430340685647 (MergeAppend) 2026-02-25 13:26:38,690 INFO de.xyz.flink.Job2 [] - Mapping Iceberg (S3 Source, ...): +I(,This is a predefined message.,2026-02-25T12:24:59.762) 2026-02-25 13:26:53,757 INFO org.apache.iceberg.hadoop.HadoopTableOperations [] - Committed a new metadata file s3a://bucket/mynamespace/my-topic/metadata/v3.metadata.json 2026-02-25 13:26:53,783 INFO org.apache.iceberg.SnapshotProducer [] - Committed snapshot 2722258325086330201 (MergeAppend) 2026-02-25 13:28:39,174 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job dd7d24b8e1431422349f07ccf400c793 from job leader monitoring. ``` Job2 has been stopped (last log). 10 Messages have been submitted to Kafka (will be printed next by Job1) ``` 2026-02-25 13:29:13,433 INFO de.xyz.flink.Job1 [] - Mapping Iceberg (Kafka Source, S3 Sink): Message(headers=RecordHeaders(headers = [], isReadOnly = false), key=, value=message--#1, timestamp=2026-02-25T12:29:13.385Z, sourceTopic=my-topic) 2026-02-25 13:29:13,433 INFO org.apache.hadoop.io.compress.CodecPool [] - Got brand-new compressor [.zstd] 2026-02-25 13:29:13,656 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter [] - Start to flush snapshot state to state backend, table: icebergCatalog.mynamespace.my-topic, checkpointId: 25 2026-02-25 13:29:13,670 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter [] - Checkpoint 25 completed. Attempting commit. 2026-02-25 13:29:13,678 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter [] - Committing append for checkpoint 25 to table icebergCatalog.mynamespace.my-topic branch main with summary: CommitSummary{dataFilesCount=1, dataFilesRecordCount=1, dataFilesByteCount=954, deleteFilesCount=0, deleteFilesRecordCount=0, deleteFilesByteCount=0} 2026-02-25 13:29:13,781 INFO org.apache.iceberg.hadoop.HadoopTableOperations [] - Committed a new metadata file s3a://bucket/mynamespace/my-topic/metadata/v5.metadata.json 2026-02-25 13:29:13,817 INFO org.apache.iceberg.SnapshotProducer [] - Committed snapshot 2254417509816172352 (MergeAppend) ... 2026-02-25 13:29:13,841 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter [] - Committed append to table: icebergCatalog.mynamespace.my-topic, branch: main, checkpointId 25 in 162 ms 2026-02-25 13:29:13,869 INFO org.apache.iceberg.BaseMetastoreCatalog [] - Table loaded by catalog: icebergCatalog.mynamespace.my-topic 2026-02-25 13:29:14,260 INFO de.xyz.flink.Job1 [] - Mapping Iceberg (Kafka Source, S3 Sink): Message(headers=RecordHeaders(headers = [], isReadOnly = false), key=, value=message--#2, timestamp=2026-02-25T12:29:14.212Z, sourceTopic=my-topic) 2026-02-25 13:29:14,261 INFO org.apache.hadoop.io.compress.CodecPool [] - Got brand-new compressor [.zstd] 2026-02-25 13:29:15,114 INFO de.xyz.flink.Job1 [] - Mapping Iceberg (Kafka Source, S3 Sink): Message(headers=RecordHeaders(headers = [], isReadOnly = false), key=, value=message--#3, timestamp=2026-02-25T12:29:15.067Z, sourceTopic=my-topic) 2026-02-25 13:29:15,935 INFO de.xyz.flink.Job1 [] - Mapping Iceberg (Kafka Source, S3 Sink): Message(headers=RecordHeaders(headers = [], isReadOnly = false), key=, value=message--#4, timestamp=2026-02-25T12:29:15.893Z, sourceTopic=my-topic) 2026-02-25 13:29:16,804 INFO de.xyz.flink.Job1 [] - Mapping Iceberg (Kafka Source, S3 Sink): Message(headers=RecordHeaders(headers = [], isReadOnly = false), key=, value=message--#5, timestamp=2026-02-25T12:29:16.757Z, sourceTopic=my-topic) 2026-02-25 13:29:17,670 INFO de.xyz.flink.Job1 [] - Mapping Iceberg (Kafka Source, S3 Sink): Message(headers=RecordHeaders(headers = [], isReadOnly = false), key=, value=message--#6, timestamp=2026-02-25T12:29:17.620Z, sourceTopic=my-topic) 2026-02-25 13:29:18,505 INFO de.xyz.flink.Job1 [] - Mapping Iceberg (Kafka Source, S3 Sink): Message(headers=RecordHeaders(headers = [], isReadOnly = false), key=, value=message--#7, timestamp=2026-02-25T12:29:18.459Z, sourceTopic=my-topic) 2026-02-25 13:29:19,371 INFO de.xyz.flink.Job1 [] - Mapping Iceberg (Kafka Source, S3 Sink): Message(headers=RecordHeaders(headers = [], isReadOnly = false), key=, value=message--#8, timestamp=2026-02-25T12:29:19.323Z, sourceTopic=my-topic) 2026-02-25 13:29:20,228 INFO de.xyz.flink.Job1 [] - Mapping Iceberg (Kafka Source, S3 Sink): Message(headers=RecordHeaders(headers = [], isReadOnly = false), key=, value=message--#9, timestamp=2026-02-25T12:29:20.184Z, sourceTopic=my-topic) 2026-02-25 13:29:21,088 INFO de.xyz.flink.Job1 [] - Mapping Iceberg (Kafka Source, S3 Sink): Message(headers=RecordHeaders(headers = [], isReadOnly = false), key=, value=message--#10, timestamp=2026-02-25T12:29:21.040Z, sourceTopic=my-topic) 2026-02-25 13:29:23,652 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter [] - Start to flush snapshot state to state backend, table: icebergCatalog.mynamespace.my-topic, checkpointId: 26 2026-02-25 13:29:23,665 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter [] - Checkpoint 26 completed. Attempting commit. 2026-02-25 13:29:23,672 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter [] - Committing append for checkpoint 26 to table icebergCatalog.mynamespace.my-topic branch main with summary: CommitSummary{dataFilesCount=1, dataFilesRecordCount=9, dataFilesByteCount=1063, deleteFilesCount=0, deleteFilesRecordCount=0, deleteFilesByteCount=0} 2026-02-25 13:29:23,771 INFO org.apache.iceberg.hadoop.HadoopTableOperations [] - Committed a new metadata file s3a://bucket/mynamespace/my-topic/metadata/v6.metadata.json 2026-02-25 13:29:23,797 INFO org.apache.iceberg.SnapshotProducer [] - Committed snapshot 8512307484827503552 (MergeAppend) ^^ i think this is my problem, from this snapshot it will be resumed later (see Logs from ContinuousSplitPlannerImpl below), but i think it should be started at 2722258325086330201 since this is the last snapshot Iceberg should have seen. ``` Job2 has been started again: ``` 2026-02-25 13:30:05,344 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [IcebergSourceSplit{files=[SplitScanTask{file=s3a://bucket/mynamespace/my-topic/data/ts_day=2026-02-25/00000-0-e0324423-1386-4ddc-adf2-aaf212340021-00003.parquet, start=4, length=1059}], fileOffset=0, recordOffset=0}] 2026-02-25 13:30:05,345 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 0 2026-02-25 13:30:05,345 INFO org.apache.iceberg.flink.source.reader.IcebergSourceSplitReader [] - Add 1 splits to reader 2026-02-25 13:30:05,430 INFO org.apache.hadoop.io.compress.CodecPool [] - Got brand-new decompressor [.zstd] 2026-02-25 13:30:05,631 INFO org.apache.iceberg.flink.source.reader.IcebergSourceSplitReader [] - Split reader 0 finished split: IcebergSourceSplit{files=[SplitScanTask{file=s3a://bucket/mynamespace/my-topic/data/ts_day=2026-02-25/00000-0-e0324423-1386-4ddc-adf2-aaf212340021-00003.parquet, start=4, length=1059}]} 2026-02-25 13:30:05,631 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [IcebergSourceSplit{files=[SplitScanTask{file=s3a://bucket/mynamespace/my-topic/data/ts_day=2026-02-25/00000-0-e0324423-1386-4ddc-adf2-aaf212340021-00003.parquet, start=4, length=1059}]}] 2026-02-25 13:30:05,631 INFO org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - Legacy kryo serializer scala extensions are not available. 2026-02-25 13:30:05,631 INFO org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - Kryo serializer scala extensions are not available. 2026-02-25 13:30:05,633 INFO de.xyz.flink.Job2 [] - Mapping Iceberg (S3 Source, ...): +I(,message--#2,2026-02-25T12:29:14.212) ^^there it starts not with message--#1 ... ``` Log from standalonesession: ``` 2026-02-25 13:29:59,835 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Receive slot request 1d7619d3b085e791b75651d54bfae874 for job ce9d5ffdb1d691b7984169442992c97b from resource manager with leader id 00000000000000000000000000000000. Job2 is now running: ... 2026-02-25 13:29:59,769 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registering job manager [email protected]://flink@localhost:6123/user/rpc/jobmanager_4 for job ce9d5ffdb1d691b7984169442992c97b. 2026-02-25 13:29:59,769 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registered job manager [email protected]://flink@localhost:6123/user/rpc/jobmanager_4 for job ce9d5ffdb1d691b7984169442992c97b. 2026-02-25 13:29:59,769 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000. 2026-02-25 13:29:59,769 INFO org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager [] - Received resource requirements from job ce9d5ffdb1d691b7984169442992c97b: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}] 2026-02-25 13:29:59,784 INFO org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl [] - Get starting snapshot id 8512307484827503552 based on strategy INCREMENTAL_FROM_LATEST_SNAPSHOT 2026-02-25 13:29:59,784 INFO org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl [] - Start incremental scan with start snapshot (inclusive): id = 8512307484827503552, timestamp = 1772022563748 ``` Is there maybe a problem running both Flink-Jobs on the same task manager? I found PR https://github.com/apache/iceberg/pull/15282 maybe this is what i need (CDC)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
