lzqdename commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-444762927 let me show how to generate the wrong result --- **background**: processing time in tumbling window flink:1.5.0 the invoke stack is as follows: [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) [2] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:53) [3] org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply (IncrementalAggregateWindowFunction.scala:74) [4] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:72) [5] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:39) [6] org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process (InternalSingleValueWindowFunction.java:46) [7] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents (WindowOperator.java:550) [8] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime (WindowOperator.java:505) [9] org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime (HeapInternalTimerService.java:266) [10] org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run (SystemProcessingTimeService.java:281) [11] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511) [12] java.util.concurrent.FutureTask.run (FutureTask.java:266) [13] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 (ScheduledThreadPoolExecutor.java:180) [14] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293) [15] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142) [16] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617) [17] java.lang.Thread.run (Thread.java:748) now ,we are at [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747) and the code is as follows: ` public static Timestamp internalToTimestamp(long v) { return new Timestamp(v - LOCAL_TZ.getOffset(v)); } ` let us print the value of windowStart:v print v v = 1544074830000 let us print the value of windowEnd:v print v v = 1544074833000 after this, come back to [1] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:51) then,we will execute ` if (windowStartOffset.isDefined) { output.setField( lastFieldPos + windowStartOffset.get, SqlFunctions.internalToTimestamp(windowStart)) } ` before execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null" after execute,the output is output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 05:40:30.0,2018-12-06 05:40:33.0,null" so,do you think the long value 1544074830000 translated to be 2018-12-06 05:40:30.0 long value 1544074833000 translated to be 2018-12-06 05:40:33.0 would be right? I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 2018-12-06 13:40:33.0 okay,let us continue now ,the data will be write to kafka,before write ,the data will be serialized let us see what happened! the call stack is as follows: ` [1] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp (DateSerializer.java:41) [2] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:48) [3] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:15) [4] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue (DefaultSerializerProvider.java:130) [5] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValue (ObjectMapper.java:2,444) [6] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.valueToTree (ObjectMapper.java:2,586) [7] org.apache.flink.formats.json.JsonRowSerializationSchema.convert (JsonRowSerializationSchema.java:189) [8] org.apache.flink.formats.json.JsonRowSerializationSchema.convertRow (JsonRowSerializationSchema.java:128) [9] org.apache.flink.formats.json.JsonRowSerializationSchema.serialize (JsonRowSerializationSchema.java:102) [10] org.apache.flink.formats.json.JsonRowSerializationSchema.serialize (JsonRowSerializationSchema.java:51) [11] org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper.serializeValue (KeyedSerializationSchemaWrapper.java:46) [12] org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke (FlinkKafkaProducer010.java:355) [13] org.apache.flink.streaming.api.operators.StreamSink.processElement (StreamSink.java:56) [14] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560) [15] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535) [16] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515) [17] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679) [18] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657) [19] org.apache.flink.streaming.api.operators.StreamMap.processElement (StreamMap.java:41) [20] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560) [21] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535) [22] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515) [23] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679) [24] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657) [25] org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51) [26] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:37) [27] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:28) [28] DataStreamCalcRule$88.processElement (null) [29] org.apache.flink.table.runtime.CRowProcessRunner.processElement (CRowProcessRunner.scala:66) [30] org.apache.flink.table.runtime.CRowProcessRunner.processElement (CRowProcessRunner.scala:35) [31] org.apache.flink.streaming.api.operators.ProcessOperator.processElement (ProcessOperator.java:66) [32] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560) [33] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535) [34] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515) [35] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679) [36] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657) [37] org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51) [38] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:65) [39] org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply (IncrementalAggregateWindowFunction.scala:74) [40] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:72) [41] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:39) [42] org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process (InternalSingleValueWindowFunction.java:46) [43] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents (WindowOperator.java:550) [44] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime (WindowOperator.java:505) [45] org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime (HeapInternalTimerService.java:266) [46] org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run (SystemProcessingTimeService.java:281) [47] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511) [48] java.util.concurrent.FutureTask.run (FutureTask.java:266) [49] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 (ScheduledThreadPoolExecutor.java:180) [50] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293) [51] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142) [52] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617) [53] java.lang.Thread.run (Thread.java:748) ` and the code is as follows: ` protected long _timestamp(Date value) { return value == null ? 0L : value.getTime(); }` here,use windowEnd for example,the value is value = "2018-12-06 05:40:33.0" value.getTime() = 1544046033000 see,the initial value is 1544074833000 and the final value is 1544046033000 the minus value is 28800000, ---> 8 hours ,because I am in China. ------------------------------------------------------------------------------------------ why? the key reason is SqlFunctions.internalToTimestamp public static Timestamp internalToTimestamp(long v) { return new Timestamp(v - LOCAL_TZ.getOffset(v)); } in the code, It minus the LOCAL_TZ , I think it is redundant!
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services