minihippo commented on a change in pull request #3173:
URL: https://github.com/apache/hudi/pull/3173#discussion_r745617761
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/AbstractSparkDeltaCommitActionExecutor.java
##########
@@ -74,23 +74,25 @@ public Partitioner getUpsertPartitioner(WorkloadProfile
profile) {
public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String
fileId,
Iterator<HoodieRecord<T>> recordItr) throws IOException {
LOG.info("Merging updates for commit " + instantTime + " for file " +
fileId);
-
- if (!table.getIndex().canIndexLogFiles() &&
mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
+ if (!table.getIndex().canIndexLogFiles() && mergeOnReadUpsertPartitioner
!= null
+ && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
LOG.info("Small file corrections for updates for commit " + instantTime
+ " for file " + fileId);
return super.handleUpdate(partitionPath, fileId, recordItr);
- } else {
+ } else if (recordItr.hasNext()) {
Review comment:
will fix
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/AbstractSparkDeltaCommitActionExecutor.java
##########
@@ -74,23 +74,25 @@ public Partitioner getUpsertPartitioner(WorkloadProfile
profile) {
public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String
fileId,
Iterator<HoodieRecord<T>> recordItr) throws IOException {
LOG.info("Merging updates for commit " + instantTime + " for file " +
fileId);
-
- if (!table.getIndex().canIndexLogFiles() &&
mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
+ if (!table.getIndex().canIndexLogFiles() && mergeOnReadUpsertPartitioner
!= null
+ && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
LOG.info("Small file corrections for updates for commit " + instantTime
+ " for file " + fileId);
return super.handleUpdate(partitionPath, fileId, recordItr);
- } else {
+ } else if (recordItr.hasNext()) {
HoodieAppendHandle<?,?,?,?> appendHandle = new
HoodieAppendHandle<>(config, instantTime, table,
partitionPath, fileId, recordItr, taskContextSupplier);
appendHandle.doAppend();
return Collections.singletonList(appendHandle.close()).iterator();
+ } else {
+ return super.handleUpdate(partitionPath, fileId, recordItr);
}
}
@Override
public Iterator<List<WriteStatus>> handleInsert(String idPfx,
Iterator<HoodieRecord<T>> recordItr)
throws Exception {
// If canIndexLogFiles, write inserts to log files else write inserts to
base files
- if (table.getIndex().canIndexLogFiles()) {
+ if (table.getIndex().canIndexLogFiles() && recordItr.hasNext()) {
Review comment:
will fix
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]