danny0405 commented on code in PR #13590:
URL: https://github.com/apache/hudi/pull/13590#discussion_r2587578939
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java:
##########
@@ -207,24 +207,32 @@ private void initBuffer() {
private void initWriteFunction() {
final String writeOperation = this.config.get(FlinkOptions.OPERATION);
+ WriteFunction writeFunctionImpl;
switch (WriteOperationType.fromValue(writeOperation)) {
case INSERT:
- this.writeFunction = (records, bucketInfo, instantTime) ->
this.writeClient.insert(records, bucketInfo, instantTime);
+ writeFunctionImpl = (records, bucketInfo, instantTime) ->
this.writeClient.insert(records, bucketInfo, instantTime);
break;
case UPSERT:
case DELETE: // shares the code path with UPSERT
case DELETE_PREPPED:
- this.writeFunction = (records, bucketInfo, instantTime) ->
this.writeClient.upsert(records, bucketInfo, instantTime);
+ writeFunctionImpl = (records, bucketInfo, instantTime) ->
this.writeClient.upsert(records, bucketInfo, instantTime);
break;
case INSERT_OVERWRITE:
- this.writeFunction = (records, bucketInfo, instantTime) ->
this.writeClient.insertOverwrite(records, bucketInfo, instantTime);
+ writeFunctionImpl = (records, bucketInfo, instantTime) ->
this.writeClient.insertOverwrite(records, bucketInfo, instantTime);
break;
case INSERT_OVERWRITE_TABLE:
- this.writeFunction = (records, bucketInfo, instantTime) ->
this.writeClient.insertOverwriteTable(records, bucketInfo, instantTime);
+ writeFunctionImpl = (records, bucketInfo, instantTime) ->
this.writeClient.insertOverwriteTable(records, bucketInfo, instantTime);
break;
default:
throw new RuntimeException("Unsupported write operation : " +
writeOperation);
}
+ this.writeFunction = (records, bucketInfo, instant) -> {
+ if (!records.hasNext()) {
Review Comment:
do we have better solution to get rid of the wrapper func?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]