[
https://issues.apache.org/jira/browse/FLINK-19777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17219698#comment-17219698
]
Jark Wu commented on FLINK-19777:
---------------------------------
If the job is failed before the window operator initialized, this might happen.
[~hailong wang]
> Fix NullPointException for WindowOperator.close()
> -------------------------------------------------
>
> Key: FLINK-19777
> URL: https://issues.apache.org/jira/browse/FLINK-19777
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.11.2
> Environment: jdk 1.8.0_262
> flink 1.11.1
> Reporter: frank wang
> Assignee: Jark Wu
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.12.0, 1.11.3
>
>
> i use flink sql run a job,the sql and metadata is :
> meta :
> 1>soure: kafka
> create table metric_source_window_table(
> `metricName` String,
> `namespace` String,
> `timestamp` BIGINT,
> `doubleValue` DOUBLE,
> `longValue` BIGINT,
> `metricsValue` String,
> `tags` MAP<String, String>,
> `meta` Map<String, String>,
> t as TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`/1000,'yyyy-MM-dd HH:mm:ss')),
> WATERMARK FOR t AS t) WITH (
> 'connector' = 'kafka',
> 'topic' = 'ai-platform',
> 'properties.bootstrap.servers' = 'xxx',
> 'properties.group.id' = 'metricgroup',
> 'scan.startup.mode'='earliest-offset',
> 'format' = 'json',
> 'json.fail-on-missing-field' = 'false',
> 'json.ignore-parse-errors' = 'true')
> 2>sink to clickhouse(the clickhouse-connector was developed by ourself)
> create table flink_metric_window_table(
> `timestamp` BIGINT,
> `longValue` BIGINT,
> `metricName` String,
> `metricsValueSum` DOUBLE,
> `metricsValueMin` DOUBLE,
> `metricsValueMax` DOUBLE,
> `tag_record_id` String,
> `tag_host_ip` String,
> `tag_instance` String,
> `tag_job_name` String,
> `tag_ai_app_name` String,
> `tag_namespace` String,
> `tag_ai_type` String,
> `tag_host_name` String,
> `tag_alarm_domain` String) WITH (
> 'connector.type' = 'clickhouse',
> 'connector.property-version' = '1',
> 'connector.url' = 'jdbc:clickhouse://xxx:8123/dataeye',
> 'connector.cluster'='ck_cluster',
> 'connector.write.flush.max-rows'='6000',
> 'connector.write.flush.interval'='1000',
> 'connector.table' = 'flink_metric_table_all')
> my sql is :
> insert into
> hive.temp_vipflink.flink_metric_window_table
> select
> cast(HOP_ROWTIME(t, INTERVAL '60' SECOND, INTERVAL '15' MINUTE) AS BIGINT)
> AS `timestamps`,
> sum(COALESCE( `longValue`, 0)) AS longValue,
> metricName,
> sum(IF(IS_DIGIT(metricsValue), cast(metricsValue AS DOUBLE), 0)) AS
> metricsValueSum,
> min(IF(IS_DIGIT(metricsValue), cast(metricsValue AS DOUBLE), 0)) AS
> metricsValueMin,
> max(IF(IS_DIGIT(metricsValue), cast(metricsValue AS DOUBLE), 0)) AS
> metricsValueMax,
> tags ['record_id'],
> tags ['host_ip'],
> tags ['instance'],
> tags ['job_name'],
> tags ['ai_app_name'],
> tags ['namespace'],
> tags ['ai_type'],
> tags ['host_name'],
> tags ['alarm_domain']
> from
> hive.temp_vipflink.metric_source_window_table
> group by
> metricName,
> tags ['record_id'],
> tags ['host_ip'],
> tags ['instance'],
> tags ['job_name'],
> tags ['ai_app_name'],
> tags ['namespace'],
> tags ['ai_type'],
> tags ['host_name'],
> tags ['alarm_domain'],
> HOP(t, INTERVAL '60' SECOND, INTERVAL '15' MINUTE)
>
> when i run this sql for a long hours, it will appear a exception like this:
> [2020-10-22 20:54:52.089] [ERROR] [GroupWindowAggregate(groupBy=[metricName,
> $f1, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9], window=[SlidingGroupWindow('w$,
> t, 900000, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime],
> select=[metricName, $f1, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9, SUM($f11) AS
> longValue, SUM($f12) AS metricsValueSum, MIN($f12) AS metricsValueMin,
> MAX($f12) AS metricsValueMax, start('w$) AS w$start, end('w$) AS w$end,
> rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) ->
> Calc(select=[CAST(CAST(w$rowtime)) AS timestamps, longValue, metricName,
> metricsValueSum, metricsValueMin, metricsValueMax, $f1 AS EXPR$6, $f2 AS
> EXPR$7, $f3 AS EXPR$8, $f4 AS EXPR$9, $f5 AS EXPR$10, $f6 AS EXPR$11, $f7 AS
> EXPR$12, $f8 AS EXPR$13, $f9 AS EXPR$14]) -> SinkConversionToTuple2 -> Sink:
> JdbcUpsertTableSink(timestamp, longValue, metricName, metricsValueSum,
> metricsValueMin, metricsValueMax, tag_record_id, tag_host_ip, tag_instance,
> tag_job_name, tag_ai_app_name, tag_namespace, tag_ai_type, tag_host_name,
> tag_alarm_domain) (23/44)]
> [org.apache.flink.streaming.runtime.tasks.StreamTask] >>> Error during
> disposal of stream operator. java.lang.NullPointerException: null at
> org.apache.flink.table.runtime.operators.window.WindowOperator.dispose(WindowOperator.java:318)
> ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:729)
> [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:645)
> [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
> [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at
> java.lang.Thread.run(Thread.java:748) [?:1.8.0_262]
>
> finally ,this job is error, and this job will be failed
--
This message was sent by Atlassian Jira
(v8.3.4#803005)