[ 
https://issues.apache.org/jira/browse/FLINK-19777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17220443#comment-17220443
 ] 

hailong wang commented on FLINK-19777:
--------------------------------------

You are right. More correctly, the dispose method is called before the open 
method when job is failed before the operator initialized. For the close method 
is invoked when job finished or is canceled, and when job is failed, the 
dispose method is invoked.

> Fix NullPointException for WindowOperator.close()
> -------------------------------------------------
>
>                 Key: FLINK-19777
>                 URL: https://issues.apache.org/jira/browse/FLINK-19777
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.11.2
>         Environment: jdk 1.8.0_262
> flink 1.11.1
>            Reporter: frank wang
>            Assignee: Jark Wu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.12.0, 1.11.3
>
>
> i use flink sql run a job,the sql and metadata is :
>  meta :
> 1>soure: kafka
>  create table metric_source_window_table(
> `metricName` String,
> `namespace` String,
> `timestamp` BIGINT,
> `doubleValue` DOUBLE,
> `longValue` BIGINT,
> `metricsValue` String,
> `tags` MAP<String, String>,
> `meta` Map<String, String>,
> t as TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`/1000,'yyyy-MM-dd HH:mm:ss')),
> WATERMARK FOR t AS t) WITH (
> 'connector' = 'kafka',
> 'topic' = 'ai-platform',
> 'properties.bootstrap.servers' = 'xxx',
> 'properties.group.id' = 'metricgroup',
> 'scan.startup.mode'='earliest-offset',
> 'format' = 'json',
> 'json.fail-on-missing-field' = 'false',
> 'json.ignore-parse-errors' = 'true')
> 2>sink to clickhouse(the clickhouse-connector was developed by ourself)
> create table flink_metric_window_table(
> `timestamp` BIGINT,
> `longValue` BIGINT,
> `metricName` String,
> `metricsValueSum` DOUBLE,
> `metricsValueMin` DOUBLE,
> `metricsValueMax` DOUBLE,
> `tag_record_id` String,
> `tag_host_ip` String,
> `tag_instance` String,
> `tag_job_name` String,
> `tag_ai_app_name` String,
> `tag_namespace` String,
> `tag_ai_type` String,
> `tag_host_name` String,
> `tag_alarm_domain` String) WITH (
> 'connector.type' = 'clickhouse',
> 'connector.property-version' = '1',
> 'connector.url' = 'jdbc:clickhouse://xxx:8123/dataeye',
> 'connector.cluster'='ck_cluster',
> 'connector.write.flush.max-rows'='6000',
> 'connector.write.flush.interval'='1000',
> 'connector.table' = 'flink_metric_table_all')
> my sql is :
> insert into
>  hive.temp_vipflink.flink_metric_window_table
> select
>  cast(HOP_ROWTIME(t, INTERVAL '60' SECOND, INTERVAL '15' MINUTE) AS BIGINT) 
> AS `timestamps`,
>  sum(COALESCE( `longValue`, 0)) AS longValue,
>  metricName,
>  sum(IF(IS_DIGIT(metricsValue), cast(metricsValue AS DOUBLE), 0)) AS 
> metricsValueSum,
>  min(IF(IS_DIGIT(metricsValue), cast(metricsValue AS DOUBLE), 0)) AS 
> metricsValueMin,
>  max(IF(IS_DIGIT(metricsValue), cast(metricsValue AS DOUBLE), 0)) AS 
> metricsValueMax,
>  tags ['record_id'],
>  tags ['host_ip'],
>  tags ['instance'],
>  tags ['job_name'],
>  tags ['ai_app_name'],
>  tags ['namespace'],
>  tags ['ai_type'],
>  tags ['host_name'],
>  tags ['alarm_domain']
> from
>  hive.temp_vipflink.metric_source_window_table
>  group by 
>  metricName,
>  tags ['record_id'],
>  tags ['host_ip'],
>  tags ['instance'],
>  tags ['job_name'],
>  tags ['ai_app_name'],
>  tags ['namespace'],
>  tags ['ai_type'],
>  tags ['host_name'],
>  tags ['alarm_domain'],
>  HOP(t, INTERVAL '60' SECOND, INTERVAL '15' MINUTE)
>  
> when i run this sql for a long hours, it will appear a exception like this:
> [2020-10-22 20:54:52.089] [ERROR] [GroupWindowAggregate(groupBy=[metricName, 
> $f1, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9], window=[SlidingGroupWindow('w$, 
> t, 900000, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], 
> select=[metricName, $f1, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9, SUM($f11) AS 
> longValue, SUM($f12) AS metricsValueSum, MIN($f12) AS metricsValueMin, 
> MAX($f12) AS metricsValueMax, start('w$) AS w$start, end('w$) AS w$end, 
> rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) -> 
> Calc(select=[CAST(CAST(w$rowtime)) AS timestamps, longValue, metricName, 
> metricsValueSum, metricsValueMin, metricsValueMax, $f1 AS EXPR$6, $f2 AS 
> EXPR$7, $f3 AS EXPR$8, $f4 AS EXPR$9, $f5 AS EXPR$10, $f6 AS EXPR$11, $f7 AS 
> EXPR$12, $f8 AS EXPR$13, $f9 AS EXPR$14]) -> SinkConversionToTuple2 -> Sink: 
> JdbcUpsertTableSink(timestamp, longValue, metricName, metricsValueSum, 
> metricsValueMin, metricsValueMax, tag_record_id, tag_host_ip, tag_instance, 
> tag_job_name, tag_ai_app_name, tag_namespace, tag_ai_type, tag_host_name, 
> tag_alarm_domain) (23/44)] 
> [org.apache.flink.streaming.runtime.tasks.StreamTask] >>> Error during 
> disposal of stream operator. java.lang.NullPointerException: null at 
> org.apache.flink.table.runtime.operators.window.WindowOperator.dispose(WindowOperator.java:318)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:729)
>  [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:645)
>  [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
>  [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
> [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
> [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at 
> java.lang.Thread.run(Thread.java:748) [?:1.8.0_262]
>  
> finally ,this job is error, and this job will be failed



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to