[ 
https://issues.apache.org/jira/browse/FLINK-30514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-30514:
-----------------------------------
    Labels: pull-request-available stale-major  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 60 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> HybridSource savepoint recovery sequence
> ----------------------------------------
>
>                 Key: FLINK-30514
>                 URL: https://issues.apache.org/jira/browse/FLINK-30514
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem
>    Affects Versions: 1.16.0, 1.15.2, 1.15.3
>            Reporter: Denis Golovachev
>            Priority: Major
>              Labels: pull-request-available, stale-major
>
> {{org.apache.flink.connector.base.source.hybrid.HybridSourceReader}} 
> accumulates splits during recovery in
> {{{}org.apache.flink.connector.base.source.hybrid.HybridSourceReader#restoredSplits{}}}.
> As a next step it creates a reader and pushes all {{recoveredSplits to}} the 
> reader.
> {{org.apache.flink.connector.base.source.hybrid.HybridSourceReader#setCurrentReader}}
> Instantiation sequence of the {{setCurrentReader}} is following
>  - {{reader.start()}}
>  - {{reader.addSplits()}}
> Seems like it doesn't work if we use {{FileSourceReader}} as an underlying 
> reader.
> {{FileSourceReader#start()}} method checks if any splits are available to 
> read and executes {{sendSplitRequest}} if empty. Current 
> {{HybridSourceReader}} recovery sequence is not aligned with this.
> So, every time we recover we immediately jump to the next splits. 
> Let me show you some logs. In this experiment job should have started with 
> files inside the 1000000 bucket but jumped to the bucket number 2000000
> Job Manager
> {code:java}
> 2022-12-27 13:38:47.155 StaticFileSplitEnumerator  - Assigned split to 
> subtask 1 : FileSourceSplit: 
> s3a://bucket/2000000/part-00001-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97489087)  hosts=[localhost] ID=0000000032 position=null
> 2022-12-27 13:38:47.156 StaticFileSplitEnumerator  - Assigned split to 
> subtask 9 : FileSourceSplit: 
> s3a://bucket/2000000/part-00002-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97342071)  hosts=[localhost] ID=0000000033 position=null
> 2022-12-27 13:38:47.156 StaticFileSplitEnumerator  - Assigned split to 
> subtask 6 : FileSourceSplit: 
> s3a://bucket/2000000/part-00000-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97377047)  hosts=[localhost] ID=0000000031 position=null
> 2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to 
> subtask 5 : FileSourceSplit: 
> s3a://bucket/2000000/part-00003-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97406878)  hosts=[localhost] ID=0000000034 position=null
> 2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to 
> subtask 2 : FileSourceSplit: 
> s3a://bucket/2000000/part-00009-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97536205)  hosts=[localhost] ID=0000000040 position=null
> 2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to 
> subtask 4 : FileSourceSplit: 
> s3a://bucket/2000000/part-00004-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97420601)  hosts=[localhost] ID=0000000035 position=null
> 2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to 
> subtask 8 : FileSourceSplit: 
> s3a://bucket/2000000/part-00005-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97472256)  hosts=[localhost] ID=0000000036 position=null
> 2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to 
> subtask 3 : FileSourceSplit: 
> s3a://bucket/2000000/part-00006-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97495880)  hosts=[localhost] ID=0000000037 position=null
> 2022-12-27 13:38:47.157 StaticFileSplitEnumerator  - Assigned split to 
> subtask 0 : FileSourceSplit: 
> s3a://bucket/2000000/part-00007-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97389425)  hosts=[localhost] ID=0000000038 position=null
> 2022-12-27 13:38:47.158 StaticFileSplitEnumerator  - Assigned split to 
> subtask 7 : FileSourceSplit: 
> s3a://bucket/2000000/part-00008-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97428709)  hosts=[localhost] ID=0000000039 position=null
> {code}
> Task Manager
> {code:java}
> 2246:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
> [FileSourceSplit: 
> s3a://bucket/1000000/part-00007-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
>  [0, 79887236)  hosts=[localhost] ID=0000000018 
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226029]
> 2247:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
> [FileSourceSplit: 
> s3a://bucket/1000000/part-00000-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
>  [0, 79987191)  hosts=[localhost] ID=0000000011 
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226030]
> 2248:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
> [FileSourceSplit: 
> s3a://bucket/1000000/part-00009-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
>  [0, 80247830)  hosts=[localhost] ID=0000000020 
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226535]
> 2249:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
> [FileSourceSplit: 
> s3a://bucket/1000000/part-00004-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
>  [0, 80055663)  hosts=[localhost] ID=0000000015 
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226712]
> 2250:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
> [FileSourceSplit: 
> s3a://bucket/1000000/part-00005-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
>  [0, 80022187)  hosts=[localhost] ID=0000000016 
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226346]
> 2251:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
> [FileSourceSplit: 
> s3a://bucket/1000000/part-00006-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
>  [0, 80109242)  hosts=[localhost] ID=0000000017 
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227284]
> 2252:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
> [FileSourceSplit: 
> s3a://bucket/1000000/part-00001-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
>  [0, 79980911)  hosts=[localhost] ID=0000000012 
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226429]
> 2253:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
> [FileSourceSplit: 
> s3a://bucket/1000000/part-00003-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
>  [0, 79996693)  hosts=[localhost] ID=0000000014 
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227154]
> 2254:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
> [FileSourceSplit: 
> s3a://bucket/1000000/part-00002-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
>  [0, 80040476)  hosts=[localhost] ID=0000000013 
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=225920]
> 2255:2022-12-27 13:38:47.110 SourceReaderBase  - Adding split(s) to reader: 
> [FileSourceSplit: 
> s3a://bucket/1000000/part-00008-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
>  [0, 79986997)  hosts=[localhost] ID=0000000019 
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226278]
> 2265:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change 
> SplitAddition:[[FileSourceSplit: 
> s3a://bucket/1000000/part-00006-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
>  [0, 80109242)  hosts=[localhost] ID=0000000017 
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227284]]
> 2266:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change 
> SplitAddition:[[FileSourceSplit: 
> s3a://bucket/1000000/part-00004-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
>  [0, 80055663)  hosts=[localhost] ID=0000000015 
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226712]]
> 2267:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change 
> SplitAddition:[[FileSourceSplit: 
> s3a://bucket/1000000/part-00005-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
>  [0, 80022187)  hosts=[localhost] ID=0000000016 
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226346]]
> 2268:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change 
> SplitAddition:[[FileSourceSplit: 
> s3a://bucket/1000000/part-00003-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
>  [0, 79996693)  hosts=[localhost] ID=0000000014 
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227154]]
> 2269:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change 
> SplitAddition:[[FileSourceSplit: 
> s3a://bucket/1000000/part-00009-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
>  [0, 80247830)  hosts=[localhost] ID=0000000020 
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226535]]
> 2270:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change 
> SplitAddition:[[FileSourceSplit: 
> s3a://bucket/1000000/part-00002-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
>  [0, 80040476)  hosts=[localhost] ID=0000000013 
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=225920]]
> 2271:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change 
> SplitAddition:[[FileSourceSplit: 
> s3a://bucket/1000000/part-00007-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
>  [0, 79887236)  hosts=[localhost] ID=0000000018 
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226029]]
> 2272:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change 
> SplitAddition:[[FileSourceSplit: 
> s3a://bucket/1000000/part-00001-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
>  [0, 79980911)  hosts=[localhost] ID=0000000012 
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226429]]
> 2273:2022-12-27 13:38:47.115 FileSourceSplitReader  - Handling split change 
> SplitAddition:[[FileSourceSplit: 
> s3a://bucket/1000000/part-00008-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
>  [0, 79986997)  hosts=[localhost] ID=0000000019 
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226278]]
> 2275:2022-12-27 13:38:47.116 FileSourceSplitReader  - Handling split change 
> SplitAddition:[[FileSourceSplit: 
> s3a://bucket/1000000/part-00000-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
>  [0, 79987191)  hosts=[localhost] ID=0000000011 
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226030]]
> 2281:2022-12-27 13:38:47.160 SourceReaderBase  - Adding split(s) to reader: 
> [FileSourceSplit: 
> s3a://bucket/2000000/part-00009-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97536205)  hosts=[localhost] ID=0000000040 position=null]
> 2282:2022-12-27 13:38:47.160 SourceReaderBase  - Adding split(s) to reader: 
> [FileSourceSplit: 
> s3a://bucket/2000000/part-00001-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97489087)  hosts=[localhost] ID=0000000032 position=null]
> 2283:2022-12-27 13:38:47.159 SourceReaderBase  - Adding split(s) to reader: 
> [FileSourceSplit: 
> s3a://bucket/2000000/part-00002-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97342071)  hosts=[localhost] ID=0000000033 position=null]
> 2284:2022-12-27 13:38:47.160 SourceReaderBase  - Adding split(s) to reader: 
> [FileSourceSplit: 
> s3a://bucket/2000000/part-00000-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97377047)  hosts=[localhost] ID=0000000031 position=null]
> 2285:2022-12-27 13:38:47.160 SourceReaderBase  - Adding split(s) to reader: 
> [FileSourceSplit: 
> s3a://bucket/2000000/part-00003-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97406878)  hosts=[localhost] ID=0000000034 position=null]
> 2288:2022-12-27 13:38:47.161 SourceReaderBase  - Adding split(s) to reader: 
> [FileSourceSplit: 
> s3a://bucket/2000000/part-00005-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97472256)  hosts=[localhost] ID=0000000036 position=null]
> 2289:2022-12-27 13:38:47.161 SourceReaderBase  - Adding split(s) to reader: 
> [FileSourceSplit: 
> s3a://bucket/2000000/part-00004-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97420601)  hosts=[localhost] ID=0000000035 position=null]
> 2292:2022-12-27 13:38:47.162 SourceReaderBase  - Adding split(s) to reader: 
> [FileSourceSplit: 
> s3a://bucket/2000000/part-00006-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97495880)  hosts=[localhost] ID=0000000037 position=null]
> 2293:2022-12-27 13:38:47.163 SourceReaderBase  - Adding split(s) to reader: 
> [FileSourceSplit: 
> s3a://bucket/2000000/part-00007-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97389425)  hosts=[localhost] ID=0000000038 position=null]
> 2295:2022-12-27 13:38:47.163 SourceReaderBase  - Adding split(s) to reader: 
> [FileSourceSplit: 
> s3a://bucket/2000000/part-00008-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
>  [0, 97428709)  hosts=[localhost] ID=0000000039 position=null]
> {code}
> Same logs in github gist: 
> [https://gist.github.com/WonderBeat/ddfdc852556997b09451d48766b54183]
> This can be fixed with a simple reordering in the 
> {{{}HybridSourceReader#createReader{}}}. {{"reader.addSplits}} -> 
> {{reader.start"}} sounds logical, wdyt?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to