chenshzh opened a new pull request, #8379:
URL: https://github.com/apache/hudi/pull/8379
### Change Logs
Fix remained ser/des concurrency conflicts when aysnc compaction/clustering
enabled.
#### 1. fix conflicts caused by ```WatermarkStatus```
Conflicts will result from ```Watermark```, ```LatencyMarker``` and
```WatermarkStatus```, the community has fix other issues except
```WatermarkStatus```. We will fix it in this PR.
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
```
private void processElement(StreamElement recordOrMark, DataOutput<T>
output) throws Exception {
if (recordOrMark.isRecord()) {
output.emitRecord(recordOrMark.asRecord());
} else if (recordOrMark.isWatermark()) {
statusWatermarkValve.inputWatermark(
recordOrMark.asWatermark(),
flattenedChannelIndices.get(lastChannel), output);
} else if (recordOrMark.isLatencyMarker()) {
output.emitLatencyMarker(recordOrMark.asLatencyMarker());
} else if (recordOrMark.isWatermarkStatus()) {
statusWatermarkValve.inputWatermarkStatus(
recordOrMark.asWatermarkStatus(),
flattenedChannelIndices.get(lastChannel),
output);
} else {
throw new UnsupportedOperationException("Unknown type of
StreamElement");
}
}
```
#### 2. Since that ```WatermarkStatus``` is introduced since Flink1.14,
this PR will add ```SafeAsyncOutputAdapter``` in multi modules, to coordinate
Flink1.13 and Flink1.14 above
The ```Output``` is actually the root cause of concurrency conflicts, so
let's just refactor it but not the compact/cluster operator to minimize the
impacts.
### Impact
none
### Risk level (write none, low medium or high below)
_If medium or high, explain what verification was done to mitigate the
risks._
### Documentation Update
fix bugs in multi Flink versions.
### Contributor's checklist
- [ ] Read through [contributor's
guide](https://hudi.apache.org/contribute/how-to-contribute)
- [ ] Change Logs and Impact were stated clearly
- [ ] Adequate tests were added if applicable
- [ ] CI passed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]