[
https://issues.apache.org/jira/browse/FLINK-28817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Michael updated FLINK-28817:
----------------------------
Description:
Scenario:
# CheckpointCoordinator - Completed checkpoint 14 for job
00000000000000000000000000000000
# HybridSource successfully completed processing a few SourceFactories, that
reads from s3
# HybridSourceSplitEnumerator.switchEnumerator failed with
com.amazonaws.SdkClientException: Unable to execute HTTP request: Read timed
out. This is intermittent error, it is usually fixed, when Flink recover from
checkpoint & repeat the operation.
# Flink starts recovering from checkpoint,
# HybridSourceSplitEnumerator receives
SourceReaderFinishedEvent\{sourceIndex=-1}
# Processing this event cause
2022/08/08 08:39:34.862 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught
exception in the SplitEnumerator for Source Source: hybrid-source while
handling operator event
SourceEventWrapper[SourceReaderFinishedEvent\\{sourceIndex=-1}] from subtask 6.
Triggering job failover.
java.lang.NullPointerException: Source for index=0 is not available from
sources: \{788=org.apache.flink.connector.file.src.SppFileSource@5a3803f3}
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
at
org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36)
at
org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:152)
at
org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:226)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$1(SourceCoordinator.java:182)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:344)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
I'm running my version of the Hybrid Sources with additional logging, so line
numbers & some names could be different from Flink Github.
My Observation: the problem is intermittent, sometimes it works ok, i.e.
SourceReaderFinishedEvent comes with correct sourceIndex. As I see from my log,
it happens if my SourceFactory.create() is executed BEFORE
HybridSourceSplitEnumerator - handleSourceEvent
SourceReaderFinishedEvent\{sourceIndex=-1}.
If HybridSourceSplitEnumerator - handleSourceEvent is executed before my
SourceFactory.create(), then sourceIndex=-1 in SourceReaderFinishedEvent
was:
Scenario:
# CheckpointCoordinator - Completed checkpoint 14 for job
00000000000000000000000000000000
# HybridSource successfully completed processing a few SourceFactories, that
reads from s3
# Next SourceFactory try to read contents of s3 dir, and it cause an error
Unable to execute HTTP request: Read timed out
# CheckpointCoordinator - Restoring job 00000000000000000000000000000000 from
Checkpoint 14
# HybridSourceSplitEnumerator - Restoring enumerator for sourceIndex=47
# This restoring fail, because of NullPointerException: in
HybridSourceSplitEnumerator.close:
# Because of this issue, all future restoring from checkpoint also failed
Extract from the log: --------------
2022/08/02 22:26:51.227 INFO o.a.f.r.c.CheckpointCoordinator - Restoring job
00000000000000000000000000000000 from Checkpoint 14 @ 1659478803949 for
00000000000000000000000000000000 located at
s3://spp-state-371299021277-tech-aidata-di/mb-backfill-jul-20-backfill-prd/2/checkpoints/00000000000000000000000000000000/chk-14.
2022/08/02 22:26:51.240 INFO o.a.f.r.c.CheckpointCoordinator - No master state
to restore
2022/08/02 22:26:51.240 INFO o.a.f.r.o.c.RecreateOnResetOperatorCoordinator -
Resetting coordinator to checkpoint.
2022/08/02 22:26:51.241 INFO o.a.f.r.s.c.SourceCoordinator - Closing
SourceCoordinator for source Source: hybrid-source.
2022/08/02 22:26:51.424 INFO o.a.f.r.s.c.SourceCoordinator - Restoring
SplitEnumerator of source Source: hybrid-source from checkpoint.
2022/08/02 22:26:51.425 INFO o.a.f.r.s.c.SourceCoordinator - Starting split
enumerator for source Source: hybrid-source.
2022/08/02 22:26:51.426 INFO c.i.d.s.f.s.c.b.HourlyFileSourceFactory - Reading
input data from path
s3://idl-kafka-connect-ued-raw-uw2-data-lake-prd/data/topics/sbseg-qbo-clickstream/d_20220729-2300
for 2022-07-29T23:00:00Z
2022/08/02 22:26:51.426 INFO o.a.f.c.b.s.h.HybridSourceSplitEnumerator -
Restoring enumerator for sourceIndex=47
2022/08/02 22:26:51.435 INFO o.a.f.runtime.jobmaster.JobMaster - Trying to
recover from a global failure.
org.apache.flink.util.FlinkException: Global failure triggered by
OperatorCoordinator for 'Source: hybrid-source -> decrypt -> map2Events ->
filterOutNulls -> assignTimestampsAndWatermarks -> logRawJson' (operator
fd9fbc680ee884c4eafd0b9c2d3d007f).
at
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:545)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
...
Caused by: java.lang.NullPointerException: null
at
org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.close(HybridSourceSplitEnumerator.java:246)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.close(SourceCoordinator.java:151)
at
org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:70)
at java.lang.Thread.run(Thread.java:750)
-----------------------------------
> NullPointerException in HybridSource when restoring from checkpoint
> -------------------------------------------------------------------
>
> Key: FLINK-28817
> URL: https://issues.apache.org/jira/browse/FLINK-28817
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Common
> Affects Versions: 1.14.4, 1.15.1
> Reporter: Michael
> Priority: Major
> Attachments: Preconditions-checkNotNull-error.zip,
> bf-29-JM-err-analysis.log
>
>
> Scenario:
> # CheckpointCoordinator - Completed checkpoint 14 for job
> 00000000000000000000000000000000
> # HybridSource successfully completed processing a few SourceFactories, that
> reads from s3
> # HybridSourceSplitEnumerator.switchEnumerator failed with
> com.amazonaws.SdkClientException: Unable to execute HTTP request: Read timed
> out. This is intermittent error, it is usually fixed, when Flink recover from
> checkpoint & repeat the operation.
> # Flink starts recovering from checkpoint,
> # HybridSourceSplitEnumerator receives
> SourceReaderFinishedEvent\{sourceIndex=-1}
> # Processing this event cause
> 2022/08/08 08:39:34.862 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught
> exception in the SplitEnumerator for Source Source: hybrid-source while
> handling operator event
> SourceEventWrapper[SourceReaderFinishedEvent\\{sourceIndex=-1}] from subtask
> 6. Triggering job failover.
> java.lang.NullPointerException: Source for index=0 is not available from
> sources: \{788=org.apache.flink.connector.file.src.SppFileSource@5a3803f3}
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
> at
> org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36)
> at
> org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:152)
> at
> org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:226)
> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$1(SourceCoordinator.java:182)
> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:344)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> I'm running my version of the Hybrid Sources with additional logging, so line
> numbers & some names could be different from Flink Github.
> My Observation: the problem is intermittent, sometimes it works ok, i.e.
> SourceReaderFinishedEvent comes with correct sourceIndex. As I see from my
> log, it happens if my SourceFactory.create() is executed BEFORE
> HybridSourceSplitEnumerator - handleSourceEvent
> SourceReaderFinishedEvent\{sourceIndex=-1}.
> If HybridSourceSplitEnumerator - handleSourceEvent is executed before my
> SourceFactory.create(), then sourceIndex=-1 in SourceReaderFinishedEvent
--
This message was sent by Atlassian Jira
(v8.20.10#820010)