[
https://issues.apache.org/jira/browse/FLINK-21569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17387849#comment-17387849
]
Caizhi Weng edited comment on FLINK-21569 at 7/27/21, 7:37 AM:
---------------------------------------------------------------
Hi all!
I've looked into this issue today and found that this is caused by [a
bug|https://github.com/FasterXML/jackson-dataformats-text/issues/191] in
Jackson 2.10. This bug will be triggered when the 4000th character is the '\n'
in a csv input split.
As Jackson 2.11 and above solves this issue and in Flink 1.13 we've updated
Jackson to 2.12, I'd like to also update Jackson version here in Flink 1.12.
However I see in FLINK-21020 we only update Jackson version in Flink 1.11 and
1.12 from 2.10.1 to 2.10.5, not a higher version. Is there any consideration
when updating dependency version just for bug fixes? [~chesnay] [~rmetzger] I'd
like to seek for your advice. Thanks.
P.S.: If we do not allow parsing errors then we'll see the following stack
trace, which points to that Jackson bug.
{code:java}
java.lang.RuntimeException: Failed to fetch next result
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
at
org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:117)
at
org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)
at
org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:149)
at
org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:154)
at org.apache.flink.table.client.MyTest.myTest(MyTest.java:42)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Caused by: java.io.IOException: Failed to fetch job execution result
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:169)
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118)
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
... 28 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:167)
... 30 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at
java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614)
at
java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983)
at
org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:114)
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:166)
... 30 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
at akka.actor.Actor$class.aroundReceive(Actor.scala)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Failed to deserialize CSV row.
at
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:261)
at
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:162)
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 4000
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.skipLinesWhenNeeded(CsvDecoder.java:527)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.startNewLine(CsvDecoder.java:499)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser._handleObjectRowEnd(CsvParser.java:1067)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser._handleNextEntry(CsvParser.java:858)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser.nextFieldName(CsvParser.java:665)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:250)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:68)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:15)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator.nextValue(MappingIterator.java:280)
at
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:252)
... 5 more
{code}
was (Author: tsreaper):
Hi all!
I've looked into this issue today and found that this is caused by [a
bug|[https://github.com/FasterXML/jackson-dataformats-text/issues/191]] in
Jackson 2.10. This bug will be triggered when the 4000th character is the '\n'
in a csv input split.
As Jackson 2.11 and above solves this issue and in Flink 1.13 we've updated
Jackson to 2.12, I'd like to also update Jackson version here in Flink 1.12.
However I see in FLINK-21020 we only update Jackson version in Flink 1.11 and
1.12 from 2.10.1 to 2.10.5, not a higher version. Is there any consideration
when updating dependency version just for bug fixes? [~chesnay] [~rmetzger] I'd
like to seek for your advice. Thanks.
P.S.: If we do not allow parsing errors then we'll see the following stack
trace, which points to that Jackson bug.
{code:java}
java.lang.RuntimeException: Failed to fetch next result
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
at
org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:117)
at
org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)
at
org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:149)
at
org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:154)
at org.apache.flink.table.client.MyTest.myTest(MyTest.java:42)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Caused by: java.io.IOException: Failed to fetch job execution result
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:169)
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118)
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
... 28 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:167)
... 30 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at
java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614)
at
java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983)
at
org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:114)
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:166)
... 30 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
at akka.actor.Actor$class.aroundReceive(Actor.scala)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Failed to deserialize CSV row.
at
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:261)
at
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:162)
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 4000
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.skipLinesWhenNeeded(CsvDecoder.java:527)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.startNewLine(CsvDecoder.java:499)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser._handleObjectRowEnd(CsvParser.java:1067)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser._handleNextEntry(CsvParser.java:858)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser.nextFieldName(CsvParser.java:665)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:250)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:68)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:15)
at
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator.nextValue(MappingIterator.java:280)
at
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:252)
... 5 more
{code}
> Flink SQL with CSV file input job hangs
> ---------------------------------------
>
> Key: FLINK-21569
> URL: https://issues.apache.org/jira/browse/FLINK-21569
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table
> SQL / Runtime
> Affects Versions: 1.12.1
> Reporter: Nico Kruber
> Priority: Minor
> Labels: auto-deprioritized-major
> Attachments: airports.csv, flights-small2.csv
>
>
> In extension to FLINK-21567, I actually also got the job to be stuck on
> cancellation by doing the following in the SQL client:
> * configure SQL client defaults to run with parallelism 2
> * execute the following statement
> {code}
> CREATE TABLE `airports` (
> `IATA_CODE` CHAR(3),
> `AIRPORT` STRING,
> `CITY` STRING,
> `STATE` CHAR(2),
> `COUNTRY` CHAR(3),
> `LATITUDE` DOUBLE NULL,
> `LONGITUDE` DOUBLE NULL,
> PRIMARY KEY (`IATA_CODE`) NOT ENFORCED
> ) WITH (
> 'connector' = 'filesystem',
> 'path' = 'file:///tmp/kaggle-flight-delay/airports.csv',
> 'format' = 'csv',
> 'csv.allow-comments' = 'true',
> 'csv.ignore-parse-errors' = 'true',
> 'csv.null-literal' = ''
> );
> CREATE TABLE `flights` (
> `_YEAR` CHAR(4),
> `_MONTH` CHAR(2),
> `_DAY` CHAR(2),
> `_DAY_OF_WEEK` TINYINT,
> `AIRLINE` CHAR(2),
> `FLIGHT_NUMBER` SMALLINT,
> `TAIL_NUMBER` CHAR(6),
> `ORIGIN_AIRPORT` CHAR(3),
> `DESTINATION_AIRPORT` CHAR(3),
> `_SCHEDULED_DEPARTURE` CHAR(4),
> `SCHEDULED_DEPARTURE` AS TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' ||
> `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' ||
> SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00'),
> `_DEPARTURE_TIME` CHAR(4),
> `DEPARTURE_DELAY` SMALLINT,
> `DEPARTURE_TIME` AS TIMESTAMPADD(MINUTE, CAST(`DEPARTURE_DELAY` AS INT),
> TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' ||
> SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' ||
> SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00')),
> `TAXI_OUT` SMALLINT,
> `WHEELS_OFF` CHAR(4),
> `SCHEDULED_TIME` SMALLINT,
> `ELAPSED_TIME` SMALLINT,
> `AIR_TIME` SMALLINT,
> `DISTANCE` SMALLINT,
> `WHEELS_ON` CHAR(4),
> `TAXI_IN` SMALLINT,
> `SCHEDULED_ARRIVAL` CHAR(4),
> `ARRIVAL_TIME` CHAR(4),
> `ARRIVAL_DELAY` SMALLINT,
> `DIVERTED` BOOLEAN,
> `CANCELLED` BOOLEAN,
> `CANCELLATION_REASON` CHAR(1),
> `AIR_SYSTEM_DELAY` SMALLINT,
> `SECURITY_DELAY` SMALLINT,
> `AIRLINE_DELAY` SMALLINT,
> `LATE_AIRCRAFT_DELAY` SMALLINT,
> `WEATHER_DELAY` SMALLINT
> ) WITH (
> 'connector' = 'filesystem',
> 'path' = 'file:///tmp/kaggle-flight-delay/flights-small2.csv',
> 'format' = 'csv',
> 'csv.null-literal' = ''
> );
> SELECT `ORIGIN_AIRPORT`, `AIRPORT`, `STATE`, `NUM_DELAYS`
> FROM (
> SELECT `ORIGIN_AIRPORT`, `AIRPORT`, `STATE`, COUNT(*) AS `NUM_DELAYS`,
> ROW_NUMBER() OVER (ORDER BY COUNT(*) DESC) AS rownum
> FROM flights, airports
> WHERE `ORIGIN_AIRPORT` = `IATA_CODE` AND `DEPARTURE_DELAY` > 0
> GROUP BY `ORIGIN_AIRPORT`, `AIRPORT`, `STATE`)
> WHERE rownum <= 10;
> {code}
> Results are shown in the CLI but after quitting the result view, the job
> seems stuck in CANCELLING until (at least) one of the TMs shuts itself down
> because a task wouldn't react to the cancelling signal. This appears in its
> TM logs:
> {code}
> 2021-03-02 18:39:19,451 WARN org.apache.flink.runtime.taskmanager.Task
> [] - Task 'Source: TableSourceScan(table=[[default_catalog,
> default_database, airports, project=[IATA_CODE, AIRPORT, STATE]]],
> fields=[IATA_CODE, AIRPORT, STATE]) (2/2)#0' did not react to cancelling
> signal for 30 seconds, but is stuck in method:
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:653)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> java.lang.Thread.run(Thread.java:748)
> ...
> 2021-03-02 18:39:49,447 ERROR
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Task did
> not exit gracefully within 180 + seconds.
> org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully
> within 180 + seconds.
> at
> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1685)
> [flink-dist_2.12-1.12.1.jar:1.12.1]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
> 2021-03-02 18:39:49,448 ERROR
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error
> occurred while executing the TaskManager. Shutting it down...
> org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully
> within 180 + seconds.
> at
> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1685)
> [flink-dist_2.12-1.12.1.jar:1.12.1]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)