[
https://issues.apache.org/jira/browse/FLINK-30514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Leonard Xu reassigned FLINK-30514:
----------------------------------
Assignee: Chen Zhang
> HybridSource savepoint recovery sequence
> ----------------------------------------
>
> Key: FLINK-30514
> URL: https://issues.apache.org/jira/browse/FLINK-30514
> Project: Flink
> Issue Type: Bug
> Components: Connectors / HybridSource
> Affects Versions: 1.16.0, 1.15.2, 1.15.3
> Reporter: Denis Golovachev
> Assignee: Chen Zhang
> Priority: Minor
> Labels: auto-deprioritized-major, pull-request-available
>
> {{org.apache.flink.connector.base.source.hybrid.HybridSourceReader}}
> accumulates splits during recovery in
> {{{}org.apache.flink.connector.base.source.hybrid.HybridSourceReader#restoredSplits{}}}.
> As a next step it creates a reader and pushes all {{recoveredSplits to}} the
> reader.
> {{org.apache.flink.connector.base.source.hybrid.HybridSourceReader#setCurrentReader}}
> Instantiation sequence of the {{setCurrentReader}} is following
> - {{reader.start()}}
> - {{reader.addSplits()}}
> Seems like it doesn't work if we use {{FileSourceReader}} as an underlying
> reader.
> {{FileSourceReader#start()}} method checks if any splits are available to
> read and executes {{sendSplitRequest}} if empty. Current
> {{HybridSourceReader}} recovery sequence is not aligned with this.
> So, every time we recover we immediately jump to the next splits.
> Let me show you some logs. In this experiment job should have started with
> files inside the 1000000 bucket but jumped to the bucket number 2000000
> Job Manager
> {code:java}
> 2022-12-27 13:38:47.155 StaticFileSplitEnumerator - Assigned split to
> subtask 1 : FileSourceSplit:
> s3a://bucket/2000000/part-00001-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
> [0, 97489087) hosts=[localhost] ID=0000000032 position=null
> 2022-12-27 13:38:47.156 StaticFileSplitEnumerator - Assigned split to
> subtask 9 : FileSourceSplit:
> s3a://bucket/2000000/part-00002-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
> [0, 97342071) hosts=[localhost] ID=0000000033 position=null
> 2022-12-27 13:38:47.156 StaticFileSplitEnumerator - Assigned split to
> subtask 6 : FileSourceSplit:
> s3a://bucket/2000000/part-00000-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
> [0, 97377047) hosts=[localhost] ID=0000000031 position=null
> 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to
> subtask 5 : FileSourceSplit:
> s3a://bucket/2000000/part-00003-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
> [0, 97406878) hosts=[localhost] ID=0000000034 position=null
> 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to
> subtask 2 : FileSourceSplit:
> s3a://bucket/2000000/part-00009-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
> [0, 97536205) hosts=[localhost] ID=0000000040 position=null
> 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to
> subtask 4 : FileSourceSplit:
> s3a://bucket/2000000/part-00004-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
> [0, 97420601) hosts=[localhost] ID=0000000035 position=null
> 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to
> subtask 8 : FileSourceSplit:
> s3a://bucket/2000000/part-00005-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
> [0, 97472256) hosts=[localhost] ID=0000000036 position=null
> 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to
> subtask 3 : FileSourceSplit:
> s3a://bucket/2000000/part-00006-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
> [0, 97495880) hosts=[localhost] ID=0000000037 position=null
> 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to
> subtask 0 : FileSourceSplit:
> s3a://bucket/2000000/part-00007-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
> [0, 97389425) hosts=[localhost] ID=0000000038 position=null
> 2022-12-27 13:38:47.158 StaticFileSplitEnumerator - Assigned split to
> subtask 7 : FileSourceSplit:
> s3a://bucket/2000000/part-00008-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
> [0, 97428709) hosts=[localhost] ID=0000000039 position=null
> {code}
> Task Manager
> {code:java}
> 2246:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader:
> [FileSourceSplit:
> s3a://bucket/1000000/part-00007-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
> [0, 79887236) hosts=[localhost] ID=0000000018
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226029]
> 2247:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader:
> [FileSourceSplit:
> s3a://bucket/1000000/part-00000-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
> [0, 79987191) hosts=[localhost] ID=0000000011
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226030]
> 2248:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader:
> [FileSourceSplit:
> s3a://bucket/1000000/part-00009-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
> [0, 80247830) hosts=[localhost] ID=0000000020
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226535]
> 2249:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader:
> [FileSourceSplit:
> s3a://bucket/1000000/part-00004-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
> [0, 80055663) hosts=[localhost] ID=0000000015
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226712]
> 2250:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader:
> [FileSourceSplit:
> s3a://bucket/1000000/part-00005-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
> [0, 80022187) hosts=[localhost] ID=0000000016
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226346]
> 2251:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader:
> [FileSourceSplit:
> s3a://bucket/1000000/part-00006-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
> [0, 80109242) hosts=[localhost] ID=0000000017
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227284]
> 2252:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader:
> [FileSourceSplit:
> s3a://bucket/1000000/part-00001-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
> [0, 79980911) hosts=[localhost] ID=0000000012
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226429]
> 2253:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader:
> [FileSourceSplit:
> s3a://bucket/1000000/part-00003-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
> [0, 79996693) hosts=[localhost] ID=0000000014
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227154]
> 2254:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader:
> [FileSourceSplit:
> s3a://bucket/1000000/part-00002-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
> [0, 80040476) hosts=[localhost] ID=0000000013
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=225920]
> 2255:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader:
> [FileSourceSplit:
> s3a://bucket/1000000/part-00008-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
> [0, 79986997) hosts=[localhost] ID=0000000019
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226278]
> 2265:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change
> SplitAddition:[[FileSourceSplit:
> s3a://bucket/1000000/part-00006-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
> [0, 80109242) hosts=[localhost] ID=0000000017
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227284]]
> 2266:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change
> SplitAddition:[[FileSourceSplit:
> s3a://bucket/1000000/part-00004-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
> [0, 80055663) hosts=[localhost] ID=0000000015
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226712]]
> 2267:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change
> SplitAddition:[[FileSourceSplit:
> s3a://bucket/1000000/part-00005-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
> [0, 80022187) hosts=[localhost] ID=0000000016
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226346]]
> 2268:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change
> SplitAddition:[[FileSourceSplit:
> s3a://bucket/1000000/part-00003-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
> [0, 79996693) hosts=[localhost] ID=0000000014
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227154]]
> 2269:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change
> SplitAddition:[[FileSourceSplit:
> s3a://bucket/1000000/part-00009-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
> [0, 80247830) hosts=[localhost] ID=0000000020
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226535]]
> 2270:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change
> SplitAddition:[[FileSourceSplit:
> s3a://bucket/1000000/part-00002-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
> [0, 80040476) hosts=[localhost] ID=0000000013
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=225920]]
> 2271:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change
> SplitAddition:[[FileSourceSplit:
> s3a://bucket/1000000/part-00007-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
> [0, 79887236) hosts=[localhost] ID=0000000018
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226029]]
> 2272:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change
> SplitAddition:[[FileSourceSplit:
> s3a://bucket/1000000/part-00001-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
> [0, 79980911) hosts=[localhost] ID=0000000012
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226429]]
> 2273:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change
> SplitAddition:[[FileSourceSplit:
> s3a://bucket/1000000/part-00008-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
> [0, 79986997) hosts=[localhost] ID=0000000019
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226278]]
> 2275:2022-12-27 13:38:47.116 FileSourceSplitReader - Handling split change
> SplitAddition:[[FileSourceSplit:
> s3a://bucket/1000000/part-00000-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet
> [0, 79987191) hosts=[localhost] ID=0000000011
> position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226030]]
> 2281:2022-12-27 13:38:47.160 SourceReaderBase - Adding split(s) to reader:
> [FileSourceSplit:
> s3a://bucket/2000000/part-00009-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
> [0, 97536205) hosts=[localhost] ID=0000000040 position=null]
> 2282:2022-12-27 13:38:47.160 SourceReaderBase - Adding split(s) to reader:
> [FileSourceSplit:
> s3a://bucket/2000000/part-00001-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
> [0, 97489087) hosts=[localhost] ID=0000000032 position=null]
> 2283:2022-12-27 13:38:47.159 SourceReaderBase - Adding split(s) to reader:
> [FileSourceSplit:
> s3a://bucket/2000000/part-00002-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
> [0, 97342071) hosts=[localhost] ID=0000000033 position=null]
> 2284:2022-12-27 13:38:47.160 SourceReaderBase - Adding split(s) to reader:
> [FileSourceSplit:
> s3a://bucket/2000000/part-00000-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
> [0, 97377047) hosts=[localhost] ID=0000000031 position=null]
> 2285:2022-12-27 13:38:47.160 SourceReaderBase - Adding split(s) to reader:
> [FileSourceSplit:
> s3a://bucket/2000000/part-00003-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
> [0, 97406878) hosts=[localhost] ID=0000000034 position=null]
> 2288:2022-12-27 13:38:47.161 SourceReaderBase - Adding split(s) to reader:
> [FileSourceSplit:
> s3a://bucket/2000000/part-00005-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
> [0, 97472256) hosts=[localhost] ID=0000000036 position=null]
> 2289:2022-12-27 13:38:47.161 SourceReaderBase - Adding split(s) to reader:
> [FileSourceSplit:
> s3a://bucket/2000000/part-00004-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
> [0, 97420601) hosts=[localhost] ID=0000000035 position=null]
> 2292:2022-12-27 13:38:47.162 SourceReaderBase - Adding split(s) to reader:
> [FileSourceSplit:
> s3a://bucket/2000000/part-00006-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
> [0, 97495880) hosts=[localhost] ID=0000000037 position=null]
> 2293:2022-12-27 13:38:47.163 SourceReaderBase - Adding split(s) to reader:
> [FileSourceSplit:
> s3a://bucket/2000000/part-00007-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
> [0, 97389425) hosts=[localhost] ID=0000000038 position=null]
> 2295:2022-12-27 13:38:47.163 SourceReaderBase - Adding split(s) to reader:
> [FileSourceSplit:
> s3a://bucket/2000000/part-00008-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet
> [0, 97428709) hosts=[localhost] ID=0000000039 position=null]
> {code}
> Same logs in github gist:
> [https://gist.github.com/WonderBeat/ddfdc852556997b09451d48766b54183]
> This can be fixed with a simple reordering in the
> {{{}HybridSourceReader#createReader{}}}. {{"reader.addSplits}} ->
> {{reader.start"}} sounds logical, wdyt?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)