randomdev2026 commented on issue #15418:
URL: https://github.com/apache/iceberg/issues/15418#issuecomment-3959367081

   Thanks for your quick response. 
   
   Could be please give me a hint about how to check "Are you restarting Job2 
from state?"
   
   Currently my jobs are executed locally using Flink 2.0.1 from 
https://flink.apache.org/downloads/ using "flink run" on CLI, my 2nd Job is 
just terminated with Ctrl+C and additionaly via Flink Dashboard.
   There is currently no configuration according 
https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/state/task_failure_recovery/
   
   
   Some relevant logs (Logs from TaskManager taken from the Apache Flink 
Dashboard, and some from a file in 
logs/flink-myname-standalonesession-0-AM-C2FP39PT9K.log):
   
   
   Both Jobs are running, 1 Event has been produced in Kafka beforehand. Both 
Jobs doing their work
   
   ```
   2026-02-25 13:25:10,597 INFO  de.xyz.flink.Job1                              
              [] - Mapping Iceberg (Kafka Source, S3 Sink): 
Message(headers=RecordHeaders(headers = [], isReadOnly = false), key=, 
value=This is a predefined message., timestamp=2026-02-25T12:24:59.762Z, 
sourceTopic=my-topic)
   2026-02-25 13:25:14,351 INFO  
org.apache.iceberg.hadoop.HadoopTableOperations              [] - Committed a 
new metadata file s3a://bucket/mynamespace/my-topic/metadata/v2.metadata.json
   2026-02-25 13:25:14,386 INFO  org.apache.iceberg.SnapshotProducer            
              [] - Committed snapshot 3959837430340685647 (MergeAppend)
   2026-02-25 13:26:38,690 INFO  de.xyz.flink.Job2                              
              [] - Mapping Iceberg (S3 Source, ...): +I(,This is a predefined 
message.,2026-02-25T12:24:59.762)
   2026-02-25 13:26:53,757 INFO  
org.apache.iceberg.hadoop.HadoopTableOperations              [] - Committed a 
new metadata file s3a://bucket/mynamespace/my-topic/metadata/v3.metadata.json
   2026-02-25 13:26:53,783 INFO  org.apache.iceberg.SnapshotProducer            
              [] - Committed snapshot 2722258325086330201 (MergeAppend)
   2026-02-25 13:28:39,174 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 
dd7d24b8e1431422349f07ccf400c793 from job leader monitoring.
   ```
   
   Job2 has been stopped (last log). 10 Messages have been submitted to Kafka 
(will be printed next by Job1)
   
   ```
   2026-02-25 13:29:13,433 INFO  de.xyz.flink.Job1                              
              [] - Mapping Iceberg (Kafka Source, S3 Sink): 
Message(headers=RecordHeaders(headers = [], isReadOnly = false), key=, 
value=message--#1, timestamp=2026-02-25T12:29:13.385Z, sourceTopic=my-topic)
   2026-02-25 13:29:13,433 INFO  org.apache.hadoop.io.compress.CodecPool        
              [] - Got brand-new compressor [.zstd]
   2026-02-25 13:29:13,656 INFO  
org.apache.iceberg.flink.sink.IcebergFilesCommitter          [] - Start to 
flush snapshot state to state backend, table: 
icebergCatalog.mynamespace.my-topic, checkpointId: 25
   2026-02-25 13:29:13,670 INFO  
org.apache.iceberg.flink.sink.IcebergFilesCommitter          [] - Checkpoint 25 
completed. Attempting commit.
   2026-02-25 13:29:13,678 INFO  
org.apache.iceberg.flink.sink.IcebergFilesCommitter          [] - Committing 
append for checkpoint 25 to table icebergCatalog.mynamespace.my-topic branch 
main with summary: CommitSummary{dataFilesCount=1, dataFilesRecordCount=1, 
dataFilesByteCount=954, deleteFilesCount=0, deleteFilesRecordCount=0, 
deleteFilesByteCount=0}
   2026-02-25 13:29:13,781 INFO  
org.apache.iceberg.hadoop.HadoopTableOperations              [] - Committed a 
new metadata file s3a://bucket/mynamespace/my-topic/metadata/v5.metadata.json
   2026-02-25 13:29:13,817 INFO  org.apache.iceberg.SnapshotProducer            
              [] - Committed snapshot 2254417509816172352 (MergeAppend)
   ...
   2026-02-25 13:29:13,841 INFO  
org.apache.iceberg.flink.sink.IcebergFilesCommitter          [] - Committed 
append to table: icebergCatalog.mynamespace.my-topic, branch: main, 
checkpointId 25 in 162 ms
   2026-02-25 13:29:13,869 INFO  org.apache.iceberg.BaseMetastoreCatalog        
              [] - Table loaded by catalog: icebergCatalog.mynamespace.my-topic
   2026-02-25 13:29:14,260 INFO  de.xyz.flink.Job1                              
              [] - Mapping Iceberg (Kafka Source, S3 Sink): 
Message(headers=RecordHeaders(headers = [], isReadOnly = false), key=, 
value=message--#2, timestamp=2026-02-25T12:29:14.212Z, sourceTopic=my-topic)
   2026-02-25 13:29:14,261 INFO  org.apache.hadoop.io.compress.CodecPool        
              [] - Got brand-new compressor [.zstd]
   2026-02-25 13:29:15,114 INFO  de.xyz.flink.Job1                              
              [] - Mapping Iceberg (Kafka Source, S3 Sink): 
Message(headers=RecordHeaders(headers = [], isReadOnly = false), key=, 
value=message--#3, timestamp=2026-02-25T12:29:15.067Z, sourceTopic=my-topic)
   2026-02-25 13:29:15,935 INFO  de.xyz.flink.Job1                              
              [] - Mapping Iceberg (Kafka Source, S3 Sink): 
Message(headers=RecordHeaders(headers = [], isReadOnly = false), key=, 
value=message--#4, timestamp=2026-02-25T12:29:15.893Z, sourceTopic=my-topic)
   2026-02-25 13:29:16,804 INFO  de.xyz.flink.Job1                              
              [] - Mapping Iceberg (Kafka Source, S3 Sink): 
Message(headers=RecordHeaders(headers = [], isReadOnly = false), key=, 
value=message--#5, timestamp=2026-02-25T12:29:16.757Z, sourceTopic=my-topic)
   2026-02-25 13:29:17,670 INFO  de.xyz.flink.Job1                              
              [] - Mapping Iceberg (Kafka Source, S3 Sink): 
Message(headers=RecordHeaders(headers = [], isReadOnly = false), key=, 
value=message--#6, timestamp=2026-02-25T12:29:17.620Z, sourceTopic=my-topic)
   2026-02-25 13:29:18,505 INFO  de.xyz.flink.Job1                              
              [] - Mapping Iceberg (Kafka Source, S3 Sink): 
Message(headers=RecordHeaders(headers = [], isReadOnly = false), key=, 
value=message--#7, timestamp=2026-02-25T12:29:18.459Z, sourceTopic=my-topic)
   2026-02-25 13:29:19,371 INFO  de.xyz.flink.Job1                              
              [] - Mapping Iceberg (Kafka Source, S3 Sink): 
Message(headers=RecordHeaders(headers = [], isReadOnly = false), key=, 
value=message--#8, timestamp=2026-02-25T12:29:19.323Z, sourceTopic=my-topic)
   2026-02-25 13:29:20,228 INFO  de.xyz.flink.Job1                              
              [] - Mapping Iceberg (Kafka Source, S3 Sink): 
Message(headers=RecordHeaders(headers = [], isReadOnly = false), key=, 
value=message--#9, timestamp=2026-02-25T12:29:20.184Z, sourceTopic=my-topic)
   2026-02-25 13:29:21,088 INFO  de.xyz.flink.Job1                              
              [] - Mapping Iceberg (Kafka Source, S3 Sink): 
Message(headers=RecordHeaders(headers = [], isReadOnly = false), key=, 
value=message--#10, timestamp=2026-02-25T12:29:21.040Z, sourceTopic=my-topic)
   2026-02-25 13:29:23,652 INFO  
org.apache.iceberg.flink.sink.IcebergFilesCommitter          [] - Start to 
flush snapshot state to state backend, table: 
icebergCatalog.mynamespace.my-topic, checkpointId: 26
   2026-02-25 13:29:23,665 INFO  
org.apache.iceberg.flink.sink.IcebergFilesCommitter          [] - Checkpoint 26 
completed. Attempting commit.
   2026-02-25 13:29:23,672 INFO  
org.apache.iceberg.flink.sink.IcebergFilesCommitter          [] - Committing 
append for checkpoint 26 to table icebergCatalog.mynamespace.my-topic branch 
main with summary: CommitSummary{dataFilesCount=1, dataFilesRecordCount=9, 
dataFilesByteCount=1063, deleteFilesCount=0, deleteFilesRecordCount=0, 
deleteFilesByteCount=0}
   2026-02-25 13:29:23,771 INFO  
org.apache.iceberg.hadoop.HadoopTableOperations              [] - Committed a 
new metadata file s3a://bucket/mynamespace/my-topic/metadata/v6.metadata.json
   2026-02-25 13:29:23,797 INFO  org.apache.iceberg.SnapshotProducer            
              [] - Committed snapshot 8512307484827503552 (MergeAppend)
   ^^ i think this is my problem, from this snapshot it will be resumed later 
(see Logs from ContinuousSplitPlannerImpl below), but i think it should be 
started at 2722258325086330201 since this is the last snapshot Iceberg should 
have seen.
   ```
   
   Job2 has been started again:
   
   ```
   2026-02-25 13:30:05,344 INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
split(s) to reader: 
[IcebergSourceSplit{files=[SplitScanTask{file=s3a://bucket/mynamespace/my-topic/data/ts_day=2026-02-25/00000-0-e0324423-1386-4ddc-adf2-aaf212340021-00003.parquet,
 start=4, length=1059}], fileOffset=0, recordOffset=0}]
   2026-02-25 13:30:05,345 INFO  
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Starting split fetcher 0
   2026-02-25 13:30:05,345 INFO  
org.apache.iceberg.flink.source.reader.IcebergSourceSplitReader [] - Add 1 
splits to reader
   2026-02-25 13:30:05,430 INFO  org.apache.hadoop.io.compress.CodecPool        
              [] - Got brand-new decompressor [.zstd]
   2026-02-25 13:30:05,631 INFO  
org.apache.iceberg.flink.source.reader.IcebergSourceSplitReader [] - Split 
reader 0 finished split: 
IcebergSourceSplit{files=[SplitScanTask{file=s3a://bucket/mynamespace/my-topic/data/ts_day=2026-02-25/00000-0-e0324423-1386-4ddc-adf2-aaf212340021-00003.parquet,
 start=4, length=1059}]}
   2026-02-25 13:30:05,631 INFO  
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished reading from splits 
[IcebergSourceSplit{files=[SplitScanTask{file=s3a://bucket/mynamespace/my-topic/data/ts_day=2026-02-25/00000-0-e0324423-1386-4ddc-adf2-aaf212340021-00003.parquet,
 start=4, length=1059}]}]
   2026-02-25 13:30:05,631 INFO  
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - Legacy 
kryo serializer scala extensions are not available.
   2026-02-25 13:30:05,631 INFO  
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - Kryo 
serializer scala extensions are not available.
   2026-02-25 13:30:05,633 INFO  de.xyz.flink.Job2                              
              [] - Mapping Iceberg (S3 Source, ...): 
+I(,message--#2,2026-02-25T12:29:14.212)
   ^^there it starts not with message--#1
   ...
   ```
   
   
   Log from standalonesession:
   
   ```
   2026-02-25 13:29:59,835 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Receive slot 
request 1d7619d3b085e791b75651d54bfae874 for job 
ce9d5ffdb1d691b7984169442992c97b from resource manager with leader id 
00000000000000000000000000000000.
   Job2 is now running:
   ...
   2026-02-25 13:29:59,769 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Registering job manager 
[email protected]://flink@localhost:6123/user/rpc/jobmanager_4
 for job ce9d5ffdb1d691b7984169442992c97b.
   2026-02-25 13:29:59,769 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Registered job manager 
[email protected]://flink@localhost:6123/user/rpc/jobmanager_4
 for job ce9d5ffdb1d691b7984169442992c97b.
   2026-02-25 13:29:59,769 INFO  org.apache.flink.runtime.jobmaster.JobMaster   
              [] - JobManager successfully registered at ResourceManager, 
leader id: 00000000000000000000000000000000.
   2026-02-25 13:29:59,769 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager [] 
- Received resource requirements from job ce9d5ffdb1d691b7984169442992c97b: 
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, 
numberOfRequiredSlots=1}]
   2026-02-25 13:29:59,784 INFO  
org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl [] - Get 
starting snapshot id 8512307484827503552 based on strategy 
INCREMENTAL_FROM_LATEST_SNAPSHOT
   2026-02-25 13:29:59,784 INFO  
org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl [] - 
Start incremental scan with start snapshot (inclusive): id = 
8512307484827503552, timestamp = 1772022563748
   ```
   
   Is there maybe a problem running both Flink-Jobs on the same task manager? I 
found PR https://github.com/apache/iceberg/pull/15282 maybe this is what i need 
(CDC)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to