[
https://issues.apache.org/jira/browse/FLINK-21569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jingsong Lee closed FLINK-21569.
--------------------------------
Resolution: Fixed
release-1.12: e621c70d530d52cdf8e8b638aaf02efdd3a19eb4
> Flink SQL with CSV file input job hangs
> ---------------------------------------
>
> Key: FLINK-21569
> URL: https://issues.apache.org/jira/browse/FLINK-21569
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table
> SQL / Runtime
> Affects Versions: 1.12.1
> Reporter: Nico Kruber
> Assignee: Caizhi Weng
> Priority: Minor
> Labels: auto-deprioritized-major, pull-request-available
> Fix For: 1.12.6
>
> Attachments: airports.csv, flights-small2.csv
>
>
> In extension to FLINK-21567, I actually also got the job to be stuck on
> cancellation by doing the following in the SQL client:
> * configure SQL client defaults to run with parallelism 2
> * execute the following statement
> {code}
> CREATE TABLE `airports` (
> `IATA_CODE` CHAR(3),
> `AIRPORT` STRING,
> `CITY` STRING,
> `STATE` CHAR(2),
> `COUNTRY` CHAR(3),
> `LATITUDE` DOUBLE NULL,
> `LONGITUDE` DOUBLE NULL,
> PRIMARY KEY (`IATA_CODE`) NOT ENFORCED
> ) WITH (
> 'connector' = 'filesystem',
> 'path' = 'file:///tmp/kaggle-flight-delay/airports.csv',
> 'format' = 'csv',
> 'csv.allow-comments' = 'true',
> 'csv.ignore-parse-errors' = 'true',
> 'csv.null-literal' = ''
> );
> CREATE TABLE `flights` (
> `_YEAR` CHAR(4),
> `_MONTH` CHAR(2),
> `_DAY` CHAR(2),
> `_DAY_OF_WEEK` TINYINT,
> `AIRLINE` CHAR(2),
> `FLIGHT_NUMBER` SMALLINT,
> `TAIL_NUMBER` CHAR(6),
> `ORIGIN_AIRPORT` CHAR(3),
> `DESTINATION_AIRPORT` CHAR(3),
> `_SCHEDULED_DEPARTURE` CHAR(4),
> `SCHEDULED_DEPARTURE` AS TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' ||
> `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' ||
> SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00'),
> `_DEPARTURE_TIME` CHAR(4),
> `DEPARTURE_DELAY` SMALLINT,
> `DEPARTURE_TIME` AS TIMESTAMPADD(MINUTE, CAST(`DEPARTURE_DELAY` AS INT),
> TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' ||
> SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' ||
> SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00')),
> `TAXI_OUT` SMALLINT,
> `WHEELS_OFF` CHAR(4),
> `SCHEDULED_TIME` SMALLINT,
> `ELAPSED_TIME` SMALLINT,
> `AIR_TIME` SMALLINT,
> `DISTANCE` SMALLINT,
> `WHEELS_ON` CHAR(4),
> `TAXI_IN` SMALLINT,
> `SCHEDULED_ARRIVAL` CHAR(4),
> `ARRIVAL_TIME` CHAR(4),
> `ARRIVAL_DELAY` SMALLINT,
> `DIVERTED` BOOLEAN,
> `CANCELLED` BOOLEAN,
> `CANCELLATION_REASON` CHAR(1),
> `AIR_SYSTEM_DELAY` SMALLINT,
> `SECURITY_DELAY` SMALLINT,
> `AIRLINE_DELAY` SMALLINT,
> `LATE_AIRCRAFT_DELAY` SMALLINT,
> `WEATHER_DELAY` SMALLINT
> ) WITH (
> 'connector' = 'filesystem',
> 'path' = 'file:///tmp/kaggle-flight-delay/flights-small2.csv',
> 'format' = 'csv',
> 'csv.null-literal' = ''
> );
> SELECT `ORIGIN_AIRPORT`, `AIRPORT`, `STATE`, `NUM_DELAYS`
> FROM (
> SELECT `ORIGIN_AIRPORT`, `AIRPORT`, `STATE`, COUNT(*) AS `NUM_DELAYS`,
> ROW_NUMBER() OVER (ORDER BY COUNT(*) DESC) AS rownum
> FROM flights, airports
> WHERE `ORIGIN_AIRPORT` = `IATA_CODE` AND `DEPARTURE_DELAY` > 0
> GROUP BY `ORIGIN_AIRPORT`, `AIRPORT`, `STATE`)
> WHERE rownum <= 10;
> {code}
> Results are shown in the CLI but after quitting the result view, the job
> seems stuck in CANCELLING until (at least) one of the TMs shuts itself down
> because a task wouldn't react to the cancelling signal. This appears in its
> TM logs:
> {code}
> 2021-03-02 18:39:19,451 WARN org.apache.flink.runtime.taskmanager.Task
> [] - Task 'Source: TableSourceScan(table=[[default_catalog,
> default_database, airports, project=[IATA_CODE, AIRPORT, STATE]]],
> fields=[IATA_CODE, AIRPORT, STATE]) (2/2)#0' did not react to cancelling
> signal for 30 seconds, but is stuck in method:
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:653)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> java.lang.Thread.run(Thread.java:748)
> ...
> 2021-03-02 18:39:49,447 ERROR
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Task did
> not exit gracefully within 180 + seconds.
> org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully
> within 180 + seconds.
> at
> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1685)
> [flink-dist_2.12-1.12.1.jar:1.12.1]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
> 2021-03-02 18:39:49,448 ERROR
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error
> occurred while executing the TaskManager. Shutting it down...
> org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully
> within 180 + seconds.
> at
> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1685)
> [flink-dist_2.12-1.12.1.jar:1.12.1]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)