[ 
https://issues.apache.org/jira/browse/FLINK-12898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16873363#comment-16873363
 ] 

Timo Walther edited comment on FLINK-12898 at 6/26/19 2:05 PM:
---------------------------------------------------------------

This seems like acceptable behavior to me. In those cases, you should implement 
a custom rowtime extractor ({{timestampsFromExtractor(TimestampExtractor 
extractor)}}) that might wrap the existing one and corrects those mistakes. For 
the next time, please use the user mailing list for further discussion of such 
problems because the thing you reported is actually not a bug.


was (Author: twalthr):
This seems like acceptable behavior to me. In those cases, you should implement 
a custom rowtime extractor ({{timestampsFromExtractor(TimestampExtractor 
extractor)}}) that might wrap the existing one and corrects those mistakes.

> FlinkSQL job fails when rowTime field encounters dirty data
> -----------------------------------------------------------
>
>                 Key: FLINK-12898
>                 URL: https://issues.apache.org/jira/browse/FLINK-12898
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.7.2
>            Reporter: Mai
>            Assignee: Mai
>            Priority: Minor
>
> I use FlinkSQL to process Kafka data in the following format:
>   
> |  id|  server_time|
> |  1 |2019-05-15 10:00:00|
> |  2 |2019-05-15 10:00:00|
> .......
>   
>  and I define rowtime from the  server_time field:
>  new Schema()
>      .field("rowtime", Types.SQL_TIMESTAMP)
>         .rowtime(new Rowtime().timestampsFromField("server_time"))
>      .field("id", Types.String)
>      .field("server_time", Types.String)
>   
>  when dirty data arrives, such as :
> |  id   |  server_time|
> |  99 |11.22.33.44 |
>  
>  My FlinkSQL job fails with exception:
> {code:java}
> java.lang.NumberFormatException: For input string: "11.22.33.44"
> at 
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> at java.lang.Integer.parseInt(Integer.java:580)
> at java.lang.Integer.parseInt(Integer.java:615)
> at 
> org.apache.calcite.avatica.util.DateTimeUtils.dateStringToUnixDate(DateTimeUtils.java:625)
> at 
> org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate(DateTimeUtils.java:715)
> at DataStreamSourceConversion$288.processElement(Unknown Source)
> at 
> org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
> at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:187)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:152)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748){code}
>   
>  Because my flink job use EXACTLY_ONCE, so the job is re-executed from the 
> last checkpoint, consumes dirty data again, fails again, and keeps looping 
> like this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to