[ 
https://issues.apache.org/jira/browse/FLINK-10299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ambition updated FLINK-10299:
-----------------------------
    Description: 
Flink sql deal with User behavior data collection, such as:
{code:java}
{
    "event_id": "session_start",
    "timestamp": "-",    // error data,
    "viewport_height": "667",
     "viewport_width": "-"    //error data
}
{code}
Causing exception info :
{code:java}
2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO 
executiongraph.ExecutionGraph (ExecutionGraph.java:tryRestartOrFail(1511)) - 
Could not restart the job Flink Streaming Job 
(6f0248219c631158f6e38f2dca0beb91) because the restart strategy prevented it.
java.lang.ClassCastException: java.lang.String cannot be cast to 
java.sql.Timestamp
at 
org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer.copy(SqlTimestampSerializer.java:27)
at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95)
at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO 
checkpoint.CheckpointCoordinator (CheckpointCoordinator.java:shutdown(320)) - 
Stopping checkpoint coordinator for job 6f0248219c631158f6e38f2dca0beb91.
2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO 
checkpoint.StandaloneCompletedCheckpointStore 
(StandaloneCompletedCheckpointStore.java:shutdown(102)) - Shutting down
{code}
Use Flink  checkpoint function and Uncatch exception lead to  Could not restart 
this job,  so just error data happen exception set null, like under image.hope 
flink commiter provide better solution。

!image-2018-09-07-17-47-04-343.png!

 

  was:
Flink sql deal with User behavior data collection, such as:
{code:java}
{
    "event_id": "session_start",
    "timestamp": "-",    // error data,
    "viewport_height": "667",
     "viewport_width": "-"    //error data
}
{code}
Causing exception info :
{code:java}
2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO 
executiongraph.ExecutionGraph (ExecutionGraph.java:tryRestartOrFail(1511)) - 
Could not restart the job Flink Streaming Job 
(6f0248219c631158f6e38f2dca0beb91) because the restart strategy prevented it.
java.lang.ClassCastException: java.lang.String cannot be cast to 
java.sql.Timestamp
at 
org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer.copy(SqlTimestampSerializer.java:27)
at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95)
at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO 
checkpoint.CheckpointCoordinator (CheckpointCoordinator.java:shutdown(320)) - 
Stopping checkpoint coordinator for job 6f0248219c631158f6e38f2dca0beb91.
2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO 
checkpoint.StandaloneCompletedCheckpointStore 
(StandaloneCompletedCheckpointStore.java:shutdown(102)) - Shutting down
{code}
we use Flink  checkpoint function and Uncatch exception lead to  Could not 
restart this job,  so we just simple ,hope flink commiter provide better 
solution。

!image-2018-09-07-17-47-04-343.png!

 


> RowSerializer.copy data value cast exception
> --------------------------------------------
>
>                 Key: FLINK-10299
>                 URL: https://issues.apache.org/jira/browse/FLINK-10299
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.6.0
>            Reporter: ambition
>            Priority: Minor
>         Attachments: image-2018-09-07-17-47-04-343.png
>
>
> Flink sql deal with User behavior data collection, such as:
> {code:java}
> {
>     "event_id": "session_start",
>     "timestamp": "-",    // error data,
>     "viewport_height": "667",
>      "viewport_width": "-"    //error data
> }
> {code}
> Causing exception info :
> {code:java}
> 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO 
> executiongraph.ExecutionGraph (ExecutionGraph.java:tryRestartOrFail(1511)) - 
> Could not restart the job Flink Streaming Job 
> (6f0248219c631158f6e38f2dca0beb91) because the restart strategy prevented it.
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.sql.Timestamp
> at 
> org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer.copy(SqlTimestampSerializer.java:27)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95)
> at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO 
> checkpoint.CheckpointCoordinator (CheckpointCoordinator.java:shutdown(320)) - 
> Stopping checkpoint coordinator for job 6f0248219c631158f6e38f2dca0beb91.
> 2018-09-07 10:47:01,834 [flink-akka.actor.default-dispatcher-2] INFO 
> checkpoint.StandaloneCompletedCheckpointStore 
> (StandaloneCompletedCheckpointStore.java:shutdown(102)) - Shutting down
> {code}
> Use Flink  checkpoint function and Uncatch exception lead to  Could not 
> restart this job,  so just error data happen exception set null, like under 
> image.hope flink commiter provide better solution。
> !image-2018-09-07-17-47-04-343.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to