minihippo commented on a change in pull request #3173:
URL: https://github.com/apache/hudi/pull/3173#discussion_r745617761



##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/AbstractSparkDeltaCommitActionExecutor.java
##########
@@ -74,23 +74,25 @@ public Partitioner getUpsertPartitioner(WorkloadProfile 
profile) {
   public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String 
fileId,
       Iterator<HoodieRecord<T>> recordItr) throws IOException {
     LOG.info("Merging updates for commit " + instantTime + " for file " + 
fileId);
-
-    if (!table.getIndex().canIndexLogFiles() && 
mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
+    if (!table.getIndex().canIndexLogFiles() && mergeOnReadUpsertPartitioner 
!= null
+        && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
       LOG.info("Small file corrections for updates for commit " + instantTime 
+ " for file " + fileId);
       return super.handleUpdate(partitionPath, fileId, recordItr);
-    } else {
+    } else if (recordItr.hasNext()) {

Review comment:
       will fix

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/AbstractSparkDeltaCommitActionExecutor.java
##########
@@ -74,23 +74,25 @@ public Partitioner getUpsertPartitioner(WorkloadProfile 
profile) {
   public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String 
fileId,
       Iterator<HoodieRecord<T>> recordItr) throws IOException {
     LOG.info("Merging updates for commit " + instantTime + " for file " + 
fileId);
-
-    if (!table.getIndex().canIndexLogFiles() && 
mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
+    if (!table.getIndex().canIndexLogFiles() && mergeOnReadUpsertPartitioner 
!= null
+        && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
       LOG.info("Small file corrections for updates for commit " + instantTime 
+ " for file " + fileId);
       return super.handleUpdate(partitionPath, fileId, recordItr);
-    } else {
+    } else if (recordItr.hasNext()) {
       HoodieAppendHandle<?,?,?,?> appendHandle = new 
HoodieAppendHandle<>(config, instantTime, table,
           partitionPath, fileId, recordItr, taskContextSupplier);
       appendHandle.doAppend();
       return Collections.singletonList(appendHandle.close()).iterator();
+    } else {
+      return super.handleUpdate(partitionPath, fileId, recordItr);
     }
   }
 
   @Override
   public Iterator<List<WriteStatus>> handleInsert(String idPfx, 
Iterator<HoodieRecord<T>> recordItr)
       throws Exception {
     // If canIndexLogFiles, write inserts to log files else write inserts to 
base files
-    if (table.getIndex().canIndexLogFiles()) {
+    if (table.getIndex().canIndexLogFiles() && recordItr.hasNext()) {

Review comment:
       will fix




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to