chenshzh opened a new pull request, #8379:
URL: https://github.com/apache/hudi/pull/8379

   ### Change Logs
   
   Fix remained ser/des concurrency conflicts when aysnc compaction/clustering 
enabled.
   
   #### 1.  fix conflicts caused by ```WatermarkStatus```
   
   Conflicts will result from ```Watermark```, ```LatencyMarker``` and 
```WatermarkStatus```, the community has fix other issues except 
```WatermarkStatus```. We will fix it in this PR.
       
   
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
   
   ```
       private void processElement(StreamElement recordOrMark, DataOutput<T> 
output) throws Exception {
           if (recordOrMark.isRecord()) {
               output.emitRecord(recordOrMark.asRecord());
           } else if (recordOrMark.isWatermark()) {
               statusWatermarkValve.inputWatermark(
                       recordOrMark.asWatermark(), 
flattenedChannelIndices.get(lastChannel), output);
           } else if (recordOrMark.isLatencyMarker()) {
               output.emitLatencyMarker(recordOrMark.asLatencyMarker());
           } else if (recordOrMark.isWatermarkStatus()) {
               statusWatermarkValve.inputWatermarkStatus(
                       recordOrMark.asWatermarkStatus(),
                       flattenedChannelIndices.get(lastChannel),
                       output);
           } else {
               throw new UnsupportedOperationException("Unknown type of 
StreamElement");
           }
       }
   ```
   
   ####  2. Since that ```WatermarkStatus```  is introduced since Flink1.14, 
this PR will add ```SafeAsyncOutputAdapter``` in multi modules, to coordinate 
Flink1.13 and Flink1.14 above
   
   The ```Output```  is actually the root cause of concurrency conflicts, so 
let's just refactor it but not the compact/cluster operator to minimize the 
impacts.
   
   ### Impact
   
   none
   
   ### Risk level (write none, low medium or high below)
   
   _If medium or high, explain what verification was done to mitigate the 
risks._
   
   ### Documentation Update
   
   fix bugs in multi Flink versions.
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Change Logs and Impact were stated clearly
   - [ ] Adequate tests were added if applicable
   - [ ] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to