[ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711073#comment-16711073
 ] 

ASF GitHub Bot commented on FLINK-11010:
----------------------------------------

lzqdename edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL 
timestamp is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-444762927
 
 
   let me show how to generate the wrong result
   
   ---
   **background**: processing time in tumbling window  flink:1.5.0
   
   the invoke stack is as follows:
     [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp 
(SqlFunctions.java:1,747)
     [2] 
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect 
(TimeWindowPropertyCollector.scala:53)
     [3] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply
 (IncrementalAggregateWindowFunction.scala:74)
     [4] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:72)
     [5] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:39)
     [6] 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process
 (InternalSingleValueWindowFunction.java:46)
     [7] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents
 (WindowOperator.java:550)
     [8] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime
 (WindowOperator.java:505)
     [9] 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime
 (HeapInternalTimerService.java:266)
     [10] 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run
 (SystemProcessingTimeService.java:281)
     [11] java.util.concurrent.Executors$RunnableAdapter.call 
(Executors.java:511)
     [12] java.util.concurrent.FutureTask.run (FutureTask.java:266)
     [13] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 
(ScheduledThreadPoolExecutor.java:180)
     [14] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run 
(ScheduledThreadPoolExecutor.java:293)
     [15] java.util.concurrent.ThreadPoolExecutor.runWorker 
(ThreadPoolExecutor.java:1,142)
     [16] java.util.concurrent.ThreadPoolExecutor$Worker.run 
(ThreadPoolExecutor.java:617)
     [17] java.lang.Thread.run (Thread.java:748)
   
   now ,we are at [1] 
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp 
(SqlFunctions.java:1,747)
   
   and the code is as follows:
   `  public static Timestamp internalToTimestamp(long v)
     {
       return new Timestamp(v - LOCAL_TZ.getOffset(v));
     }
   `
   let us print the value of windowStart:v 
   print v
    v = 1544074830000
   let us print the value of windowEnd:v 
   print v
    v = 1544074833000
   
   
   
   
   
   after this, come back to 
   [1] 
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect 
(TimeWindowPropertyCollector.scala:51)
   
   
   then,we will execute 
   `   
    if (windowStartOffset.isDefined) {
         output.setField(
           lastFieldPos + windowStartOffset.get,
           SqlFunctions.internalToTimestamp(windowStart))
       }
   
   if (windowEndOffset.isDefined) {
         output.setField(
           lastFieldPos + windowEndOffset.get,
           SqlFunctions.internalToTimestamp(windowEnd))
       }
   `
   
   before execute,the output is 
    output = 
"pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null"
   after execute,the output is
    output = 
"pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 
05:40:30.0,2018-12-06 05:40:33.0,null"
   
   so,do you think the 
   long value 1544074830000  translated to be 2018-12-06 05:40:30.0
   long value 1544074833000  translated to be 2018-12-06 05:40:33.0
    would be right?
   
   I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 
2018-12-06 13:40:33.0
   
   
   
   okay,let us continue
   
   now ,the data will be written to kafka,before write ,the data will be 
serialized
   let us see what happened!
   
   the call stack is as follows:
   [1] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp
 (DateSerializer.java:41)
     [2] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
 (DateSerializer.java:48)
     [3] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize
 (DateSerializer.java:15)
     [4] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue
 (DefaultSerializerProvider.java:130)
     [5] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValue
 (ObjectMapper.java:2,444)
     [6] 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.valueToTree
 (ObjectMapper.java:2,586)
     [7] org.apache.flink.formats.json.JsonRowSerializationSchema.convert 
(JsonRowSerializationSchema.java:189)
     [8] org.apache.flink.formats.json.JsonRowSerializationSchema.convertRow 
(JsonRowSerializationSchema.java:128)
     [9] org.apache.flink.formats.json.JsonRowSerializationSchema.serialize 
(JsonRowSerializationSchema.java:102)
     [10] org.apache.flink.formats.json.JsonRowSerializationSchema.serialize 
(JsonRowSerializationSchema.java:51)
     [11] 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper.serializeValue
 (KeyedSerializationSchemaWrapper.java:46)
     [12] 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke 
(FlinkKafkaProducer010.java:355)
     [13] org.apache.flink.streaming.api.operators.StreamSink.processElement 
(StreamSink.java:56)
     [14] 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator
 (OperatorChain.java:560)
     [15] 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect
 (OperatorChain.java:535)
     [16] 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect
 (OperatorChain.java:515)
     [17] 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect
 (AbstractStreamOperator.java:679)
     [18] 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect
 (AbstractStreamOperator.java:657)
     [19] org.apache.flink.streaming.api.operators.StreamMap.processElement 
(StreamMap.java:41)
     [20] 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator
 (OperatorChain.java:560)
     [21] 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect
 (OperatorChain.java:535)
     [22] 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect
 (OperatorChain.java:515)
     [23] 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect
 (AbstractStreamOperator.java:679)
     [24] 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect
 (AbstractStreamOperator.java:657)
     [25] org.apache.flink.streaming.api.operators.TimestampedCollector.collect 
(TimestampedCollector.java:51)
     [26] org.apache.flink.table.runtime.CRowWrappingCollector.collect 
(CRowWrappingCollector.scala:37)
     [27] org.apache.flink.table.runtime.CRowWrappingCollector.collect 
(CRowWrappingCollector.scala:28)
     [28] DataStreamCalcRule$88.processElement (null)
     [29] org.apache.flink.table.runtime.CRowProcessRunner.processElement 
(CRowProcessRunner.scala:66)
     [30] org.apache.flink.table.runtime.CRowProcessRunner.processElement 
(CRowProcessRunner.scala:35)
     [31] 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement 
(ProcessOperator.java:66)
     [32] 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator
 (OperatorChain.java:560)
     [33] 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect
 (OperatorChain.java:535)
     [34] 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect
 (OperatorChain.java:515)
     [35] 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect
 (AbstractStreamOperator.java:679)
     [36] 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect
 (AbstractStreamOperator.java:657)
     [37] org.apache.flink.streaming.api.operators.TimestampedCollector.collect 
(TimestampedCollector.java:51)
     [38] 
org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect 
(TimeWindowPropertyCollector.scala:65)
     [39] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply
 (IncrementalAggregateWindowFunction.scala:74)
     [40] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:72)
     [41] 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply
 (IncrementalAggregateTimeWindowFunction.scala:39)
     [42] 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process
 (InternalSingleValueWindowFunction.java:46)
     [43] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents
 (WindowOperator.java:550)
     [44] 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime
 (WindowOperator.java:505)
     [45] 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime
 (HeapInternalTimerService.java:266)
     [46] 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run
 (SystemProcessingTimeService.java:281)
     [47] java.util.concurrent.Executors$RunnableAdapter.call 
(Executors.java:511)
     [48] java.util.concurrent.FutureTask.run (FutureTask.java:266)
     [49] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 
(ScheduledThreadPoolExecutor.java:180)
     [50] 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run 
(ScheduledThreadPoolExecutor.java:293)
     [51] java.util.concurrent.ThreadPoolExecutor.runWorker 
(ThreadPoolExecutor.java:1,142)
     [52] java.util.concurrent.ThreadPoolExecutor$Worker.run 
(ThreadPoolExecutor.java:617)
     [53] java.lang.Thread.run (Thread.java:748)
   
   and the code is as follows:
     protected long _timestamp(Date value)
     {
       return value == null ? 0L : value.getTime();
     }
   
   here,use windowEnd for example,the value is  
   value = "2018-12-06 05:40:33.0"
    value.getTime() = 1544046033000
   
   see,the initial value is 1544074833000   and the final value is 1544046033000
   
   the minus value is 28800000,       --->    8 hours ,because I am in China.
   
------------------------------------------------------------------------------------------
   
   why? the key reason is SqlFunctions.internalToTimestamp
     public static Timestamp internalToTimestamp(long v)
     {
       return new Timestamp(v - LOCAL_TZ.getOffset(v));
     }
   
   in the code, It minus the LOCAL_TZ ,  I think it is redundant!
   
   
   
   
   
   
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink SQL timestamp is inconsistent with currentProcessingTime()
> ----------------------------------------------------------------
>
>                 Key: FLINK-11010
>                 URL: https://issues.apache.org/jira/browse/FLINK-11010
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.6.2
>            Reporter: lamber-ken
>            Assignee: lamber-ken
>            Priority: Major
>              Labels: pull-request-available
>
> Flink SQL timestamp is inconsistent with currentProcessingTime().
>  
> the ProcessingTime is just implemented by invoking System.currentTimeMillis() 
> but the long value will be automatically wrapped to a Timestamp with the 
> following statement: 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to