15767714253 opened a new issue, #6739: URL: https://github.com/apache/seatunnel/issues/6739
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened 2024-04-22 15:58:45,262 INFO org.apache.seatunnel.translation.flink.source.BaseSeaTunnelSourceFunction [] - Consumer subtask 1 has no restore state. 2024-04-22 15:58:45,262 INFO org.apache.seatunnel.translation.flink.source.BaseSeaTunnelSourceFunction [] - Consumer subtask 0 has no restore state. 2024-04-22 15:58:45,612 INFO org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient [] - connect influxdb successful. sever version :1.8.10. 2024-04-22 15:58:45,612 INFO org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient [] - connect influxdb successful. sever version :1.8.10. 2024-04-22 15:58:45,617 INFO org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxdbSourceReader [] - connect influxdb successful. sever version :1.8.10. 2024-04-22 15:58:45,617 DEBUG org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxDBSourceSplitEnumerator [] - Register reader 1 to InfluxDBSourceSplitEnumerator. 2024-04-22 15:58:45,617 INFO org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxdbSourceReader [] - connect influxdb successful. sever version :1.8.10. 2024-04-22 15:58:45,617 DEBUG org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxDBSourceSplitEnumerator [] - Register reader 0 to InfluxDBSourceSplitEnumerator. 2024-04-22 15:58:45,618 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (2/2)#0 (3ab54d7a6da584f3a24702c9740a02a8) switched from INITIALIZING to RUNNING. 2024-04-22 15:58:45,618 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (1/2)#0 (973fad63e088d82ecf6bbdcf1169a6d0) switched from INITIALIZING to RUNNING. 2024-04-22 15:58:45,622 DEBUG org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxDBSourceSplitEnumerator [] - Assign pendingSplits to readers [1] 2024-04-22 15:58:45,622 DEBUG org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxDBSourceSplitEnumerator [] - Assign pendingSplits to readers [0] 2024-04-22 15:58:45,622 DEBUG org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxDBSourceSplitEnumerator [] - No more splits to assign. Sending NoMoreSplitsEvent to reader [1]. 2024-04-22 15:58:45,622 DEBUG org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxDBSourceSplitEnumerator [] - No more splits to assign. Sending NoMoreSplitsEvent to reader [0]. 2024-04-22 15:58:45,622 INFO org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxdbSourceReader [] - Reader received NoMoreSplits event. 2024-04-22 15:58:45,622 INFO org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxdbSourceReader [] - Reader received NoMoreSplits event. 2024-04-22 15:58:45,627 INFO org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxdbSourceReader [] - Closed the bounded influxDB source 2024-04-22 15:58:45,627 INFO org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxdbSourceReader [] - Closed the bounded influxDB source 2024-04-22 15:58:45,632 DEBUG org.apache.seatunnel.translation.source.ParallelSource [] - Parallel source runs complete. 2024-04-22 15:58:45,632 DEBUG org.apache.seatunnel.translation.source.ParallelSource [] - Parallel source runs complete. 2024-04-22 15:58:45,632 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask [] - Finished task Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (2/2)#0 2024-04-22 15:58:45,632 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask [] - Finished task Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (1/2)#0 2024-04-22 15:58:45,633 DEBUG org.apache.seatunnel.translation.flink.source.BaseSeaTunnelSourceFunction [] - Cancel the SeaTunnelSourceFunction of Flink. 2024-04-22 15:58:45,633 DEBUG org.apache.seatunnel.translation.flink.source.BaseSeaTunnelSourceFunction [] - Cancel the SeaTunnelSourceFunction of Flink. 2024-04-22 15:58:45,634 DEBUG org.apache.seatunnel.translation.source.ParallelSource [] - Close the thread pool resource. 2024-04-22 15:58:45,634 DEBUG org.apache.seatunnel.translation.source.ParallelSource [] - Close the thread pool resource. 2024-04-22 15:58:45,634 DEBUG org.apache.seatunnel.translation.source.ParallelSource [] - Close the split enumerator for the Apache SeaTunnel source. 2024-04-22 15:58:45,634 DEBUG org.apache.seatunnel.translation.source.ParallelSource [] - Close the split enumerator for the Apache SeaTunnel source. 2024-04-22 15:58:45,634 DEBUG org.apache.seatunnel.translation.source.ParallelSource [] - Close the data reader for the Apache SeaTunnel source. 2024-04-22 15:58:45,634 DEBUG org.apache.seatunnel.translation.source.ParallelSource [] - Close the data reader for the Apache SeaTunnel source. 2024-04-22 15:58:45,635 DEBUG org.apache.seatunnel.translation.flink.source.BaseSeaTunnelSourceFunction [] - Close the SeaTunnelSourceFunction of Flink. 2024-04-22 15:58:45,635 DEBUG org.apache.seatunnel.translation.flink.source.BaseSeaTunnelSourceFunction [] - Close the SeaTunnelSourceFunction of Flink. 2024-04-22 15:58:45,636 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask [] - Closed operators for task Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (2/2)#0 2024-04-22 15:58:45,636 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask [] - Closed operators for task Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (1/2)#0 2024-04-22 15:58:45,639 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (1/2)#0 (973fad63e088d82ecf6bbdcf1169a6d0) switched from RUNNING to FINISHED. 2024-04-22 15:58:45,639 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (2/2)#0 (3ab54d7a6da584f3a24702c9740a02a8) switched from RUNNING to FINISHED. 2024-04-22 15:58:45,639 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (1/2)#0 (973fad63e088d82ecf6bbdcf1169a6d0). 2024-04-22 15:58:45,639 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (2/2)#0 (3ab54d7a6da584f3a24702c9740a02a8). 2024-04-22 15:58:45,639 DEBUG org.apache.flink.runtime.taskmanager.Task [] - Release task Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (2/2)#0 network resources (state: FINISHED). 2024-04-22 15:58:45,639 DEBUG org.apache.flink.runtime.taskmanager.Task [] - Release task Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (1/2)#0 network resources (state: FINISHED). ### SeaTunnel Version 2.3.0 ### SeaTunnel Config ```conf env { execution.parallelism = 2 job.mode = "BATCH" checkpoint.interval = 10000 } source { InfluxDB { result_table_name = "akr1_aux_meter_data" url = "https://ts-wz94cg9q58l69yn36.influxdata.tsdb.aliyuncs.com:8086" sql = "SELECT time,device_inst_code,active_power,apparent_power,frequency,la,power_factor,p_total_cap,reactive_power,r_total_cap,ua,create_time from akr1_aux_meter_data" database = "ess" username = "readess" password = "Hd@20230407" query_timeout_sec = 60 connect_timeout_ms = 150000 tag_key = "device_inst_code" tag_sql = "show tag values from akr1_aux_meter_data with key = \"device_inst_code\"" partition_num = 100 fields { time = BIGINT device_inst_code = string active_power = DOUBLE apparent_power = DOUBLE frequency = DOUBLE la = DOUBLE power_factor = DOUBLE p_total_cap = DOUBLE reactive_power = DOUBLE r_total_cap = DOUBLE ua = DOUBLE create_time = DOUBLE } } } transform { sql { source_table_name = "akr1_aux_meter_data" sql = "SELECT `time`/1000000 as `time`,device_inst_code,active_power,apparent_power,frequency,la,power_factor,p_total_cap,reactive_power,r_total_cap,ua,create_time from akr1_aux_meter_data" } } sink { Doris { nodeUrls = ["172.18.180.224:8030"] username = root password = "HDYL@2023#0614" database = "ess" table = "akr1_aux_meter_data" batch_max_rows = 2000 sink.properties.format = "JSON" sink.properties.strip_outer_array = true } } ``` ### Running Command ```shell ./start-seatunnel-flink-connector-v2.sh -c ../jobs/influxdb_prod_20240422/akr1_aux_meter_data.yaml ``` ### Error Exception ```log 没有任何报错。直接没数据。结束 ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
