realdengziqi opened a new pull request #18982:
URL: https://github.com/apache/flink/pull/18982


   Co-authored-by: Lin WanNi <linwannid...@foxmail.com>
   Co-authored-by: Guo YuanFang <1650213...@qq.com>
   
   ## What is the purpose of the change
   The goal of this PR is to fix the bug that:  the element couldn't be 
assigned to the correct window-start, if it's *timestamp - offset + windowSize 
< 0*.
   
   This bug located at 
_org.apache.flink.streaming.api.windowing.windows.TimeWindow_ .
   
    This problem will be triggered by the negative timestamp, and is caused by 
the calculation method of remainder in the JAVA compiler. 
   
   Specifically, when we try to calculate the window-start of an incoming 
element,  if _timestamp - offset + windowSize < 0_, based on the current 
calculation formula for window-start, **the element will be right shifted to 
the next window, which has a start time larger than the timestamp of current 
element**, seems violated the assignment principle for elements on window.
   
   
![image](https://user-images.githubusercontent.com/42276568/156824315-b3d277ce-1775-426d-a86e-76535e58b55e.png)
   
   This problem can be fixed by modifying the calculation formula inside the 
getWindowStartWithOffset() method as below:
   ```java
   public static long getWindowStartWithOffset(long timestamp, long offset, 
long windowSize) {
       return timestamp
               - (timestamp - offset) % windowSize
               - (windowSize & (timestamp - offset) >> 63);
   }
   ```
   After this modify, for the element who has negative timestamp, we can still 
get the correct window-start. Like the below graph showing:
   
![image](https://user-images.githubusercontent.com/42276568/156824911-8625f715-618b-4bf0-a7bd-85f3d7bde21b.png)
   
   ## Brief change log
   - Fix getWindowStartWithOffset in *TimeWindow.java*
   
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as the tests in the 
flink-streaming-java [mvn clean verify]
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to