This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bdf8d9bb316 [HUDI-5297] Deprecate InternalWriteStatus and re-use 
WriteStatus (#7336)
bdf8d9bb316 is described below

commit bdf8d9bb3165d51ea4ea54c785debd7798332e49
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Thu Jun 22 22:35:49 2023 -0400

    [HUDI-5297] Deprecate InternalWriteStatus and re-use WriteStatus (#7336)
    
    * Consolidated HoodieInternalWriteStatus with WriteSatus
    * Added HoodieRecordDelegate to be stored in WriteStatus instead of 
HoodieRecord.
    
    ---------
    
    Co-authored-by: Raymond Xu <[email protected]>
---
 .../hudi/client/FailOnFirstErrorWriteStatus.java   |  11 +-
 .../hudi/client/HoodieInternalWriteStatus.java     | 172 ---------------------
 .../java/org/apache/hudi/client/WriteStatus.java   | 108 +++++++++----
 .../index/inmemory/HoodieInMemoryHashIndex.java    |  12 +-
 .../org/apache/hudi/io/HoodieAppendHandle.java     |   5 +-
 .../org/apache/hudi/io/HoodieCreateHandle.java     |   3 +-
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |  14 +-
 .../metadata/HoodieBackedTableMetadataWriter.java  |  20 +--
 .../java/org/apache/hudi/table/HoodieTable.java    |  15 ++
 .../io/storage/row/HoodieRowDataCreateHandle.java  |  25 ++-
 .../hudi/index/hbase/SparkHoodieHBaseIndex.java    |  15 +-
 .../hudi/io/storage/row/HoodieRowCreateHandle.java |  26 ++--
 .../commit/BulkInsertDataInternalWriterHelper.java |   6 +-
 .../hudi/HoodieDatasetBulkInsertHelper.scala       |   2 +-
 .../hudi/client/TestHoodieInternalWriteStatus.java |  89 -----------
 .../apache/hudi/client/TestHoodieReadClient.java   |   8 +-
 .../org/apache/hudi/client/TestWriteStatus.java    |  59 ++++++-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |  17 +-
 .../hudi/client/functional/TestHoodieIndex.java    |   1 -
 .../index/hbase/TestSparkHoodieHBaseIndex.java     |   4 +-
 .../org/apache/hudi/io/TestHoodieMergeHandle.java  |   5 +-
 .../io/storage/row/TestHoodieRowCreateHandle.java  |  11 +-
 .../org/apache/hudi/common/model/HoodieRecord.java |   8 +-
 .../hudi/common/model/HoodieRecordDelegate.java    | 123 +++++++++++++++
 .../hudi/sink/bulk/BulkInsertWriterHelper.java     |  28 +---
 .../hudi/internal/BaseWriterCommitMessage.java     |   8 +-
 .../internal/DataSourceInternalWriterHelper.java   |   7 +-
 .../HoodieBulkInsertInternalWriterTestBase.java    |   9 +-
 .../internal/HoodieDataSourceInternalWriter.java   |   9 +-
 .../hudi/internal/HoodieWriterCommitMessage.java   |   4 +-
 .../HoodieDataSourceInternalBatchWrite.java        |   9 +-
 .../spark3/internal/HoodieWriterCommitMessage.java |   4 +-
 .../org/apache/hudi/utilities/TableSizeStats.java  |   3 +-
 33 files changed, 416 insertions(+), 424 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/FailOnFirstErrorWriteStatus.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/FailOnFirstErrorWriteStatus.java
index 40ccae0f000..7643c935f25 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/FailOnFirstErrorWriteStatus.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/FailOnFirstErrorWriteStatus.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,7 +41,13 @@ public class FailOnFirstErrorWriteStatus extends WriteStatus 
{
   @Override
   public void markFailure(HoodieRecord record, Throwable t, Option<Map<String, 
String>> optionalRecordMetadata) {
     LOG.error(String.format("Error writing record %s with data %s and 
optionalRecordMetadata %s", record, record.getData(),
-            optionalRecordMetadata.orElse(Collections.emptyMap()), t));
-    throw new HoodieException("Error writing record " + record + ": " + 
t.getMessage());
+        optionalRecordMetadata.orElse(Collections.emptyMap())), t);
+    throw new HoodieException("Error writing record " + record, t);
+  }
+
+  @Override
+  public void markFailure(String recordKey, String partitionPath, Throwable t) 
{
+    LOG.error(String.format("Error writing record %s and partition %s", 
recordKey, partitionPath), t);
+    throw new HoodieException("Error writing record `" + recordKey + "` 
partitionPath `" + partitionPath + "`", t);
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieInternalWriteStatus.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieInternalWriteStatus.java
deleted file mode 100644
index 103124bf28e..00000000000
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieInternalWriteStatus.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.client;
-
-import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.util.collection.Pair;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-/**
- * Hoodie's internal write status used in datasource implementation of bulk 
insert.
- */
-public class HoodieInternalWriteStatus implements Serializable {
-
-  private static final long serialVersionUID = 1L;
-  private static final long RANDOM_SEED = 9038412832L;
-
-  private String fileId;
-  private String partitionPath;
-  private List<String> successRecordKeys = new ArrayList<>();
-  private List<Pair<String, Throwable>> failedRecordKeys = new ArrayList<>();
-
-  private HoodieWriteStat stat;
-
-  private long totalRecords = 0;
-  private long totalErrorRecords = 0;
-  private Throwable globalError = null;
-
-  private final double failureFraction;
-  private final boolean trackSuccessRecords;
-  private final transient Random random;
-
-  public HoodieInternalWriteStatus(Boolean trackSuccessRecords, Double 
failureFraction) {
-    this.trackSuccessRecords = trackSuccessRecords;
-    this.failureFraction = failureFraction;
-    this.random = new Random(RANDOM_SEED);
-  }
-
-  public boolean isTrackingSuccessfulWrites() {
-    return trackSuccessRecords;
-  }
-
-  public void markSuccess(String recordKey) {
-    if (trackSuccessRecords) {
-      this.successRecordKeys.add(recordKey);
-    }
-    totalRecords++;
-  }
-
-  public void markSuccess() {
-    totalRecords++;
-  }
-
-  public void markFailure(String recordKey, Throwable t) {
-    if (failedRecordKeys.isEmpty() || (random.nextDouble() <= 
failureFraction)) {
-      failedRecordKeys.add(Pair.of(recordKey, t));
-    }
-    totalRecords++;
-    totalErrorRecords++;
-  }
-
-  public boolean hasErrors() {
-    return failedRecordKeys.size() != 0;
-  }
-
-  public HoodieWriteStat getStat() {
-    return stat;
-  }
-
-  public void setStat(HoodieWriteStat stat) {
-    this.stat = stat;
-  }
-
-  public String getFileId() {
-    return fileId;
-  }
-
-  public void setFileId(String fileId) {
-    this.fileId = fileId;
-  }
-
-  public String getPartitionPath() {
-    return partitionPath;
-  }
-
-  public void setPartitionPath(String partitionPath) {
-    this.partitionPath = partitionPath;
-  }
-
-  public List<String> getSuccessRecordKeys() {
-    return successRecordKeys;
-  }
-
-  public List<Pair<String, Throwable>> getFailedRecordKeys() {
-    return failedRecordKeys;
-  }
-
-  public void setFailedRecordKeys(List<Pair<String, Throwable>> 
failedRecordKeys) {
-    this.failedRecordKeys = failedRecordKeys;
-  }
-
-  public long getTotalRecords() {
-    return totalRecords;
-  }
-
-  public void setTotalRecords(long totalRecords) {
-    this.totalRecords = totalRecords;
-  }
-
-  public long getTotalErrorRecords() {
-    return totalErrorRecords;
-  }
-
-  public void setTotalErrorRecords(long totalErrorRecords) {
-    this.totalErrorRecords = totalErrorRecords;
-  }
-
-  public Throwable getGlobalError() {
-    return globalError;
-  }
-
-  public void setGlobalError(Throwable globalError) {
-    this.globalError = globalError;
-  }
-
-  public void setSuccessRecordKeys(List<String> successRecordKeys) {
-    this.successRecordKeys = successRecordKeys;
-  }
-
-  public double getFailureFraction() {
-    return failureFraction;
-  }
-
-  public boolean isTrackSuccessRecords() {
-    return trackSuccessRecords;
-  }
-
-  @Override
-  public String toString() {
-    return "PartitionPath " + partitionPath + ", FileID " + fileId + ", 
Success records "
-        + totalRecords + ", errored Rows " + totalErrorRecords
-        + ", global error " + (globalError != null);
-  }
-
-  public WriteStatus toWriteStatus() {
-    WriteStatus status = new WriteStatus(trackSuccessRecords, failureFraction);
-    status.setFileId(fileId);
-    status.setTotalRecords(totalRecords);
-    status.setPartitionPath(partitionPath);
-    status.setStat(stat);
-    return status;
-  }
-}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
index a606133c6e7..eac71cba191 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
@@ -20,12 +20,14 @@ package org.apache.hudi.client;
 
 import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.util.DateTimeUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,12 +35,15 @@ import org.slf4j.LoggerFactory;
 import java.io.Serializable;
 import java.time.DateTimeException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
 
 import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
 
 /**
  * Status of a write operation.
@@ -52,9 +57,9 @@ public class WriteStatus implements Serializable {
 
   private final HashMap<HoodieKey, Throwable> errors = new HashMap<>();
 
-  private final List<HoodieRecord> writtenRecords = new ArrayList<>();
+  private final List<HoodieRecordDelegate> writtenRecordDelegates = new 
ArrayList<>();
 
-  private final List<HoodieRecord> failedRecords = new ArrayList<>();
+  private final List<Pair<HoodieRecordDelegate, Throwable>> failedRecords = 
new ArrayList<>();
 
   private Throwable globalError = null;
 
@@ -87,38 +92,55 @@ public class WriteStatus implements Serializable {
    * Mark write as success, optionally using given parameters for the purpose 
of calculating some aggregate metrics.
    * This method is not meant to cache passed arguments, since WriteStatus 
objects are collected in Spark Driver.
    *
-   * @param record deflated {@code HoodieRecord} containing information that 
uniquely identifies it.
+   * @param record                 deflated {@code HoodieRecord} containing 
information that uniquely identifies it.
    * @param optionalRecordMetadata optional metadata related to data contained 
in {@link HoodieRecord} before deflation.
    */
   public void markSuccess(HoodieRecord record, Option<Map<String, String>> 
optionalRecordMetadata) {
     if (trackSuccessRecords) {
-      writtenRecords.add(record);
+      
writtenRecordDelegates.add(HoodieRecordDelegate.fromHoodieRecord(record));
     }
+    updateStatsForSuccess(optionalRecordMetadata);
+  }
+
+  /**
+   * Used by native write handles like HoodieRowCreateHandle and 
HoodieRowDataCreateHandle.
+   *
+   * @see WriteStatus#markSuccess(HoodieRecord, Option)
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public void markSuccess(HoodieRecordDelegate recordDelegate, 
Option<Map<String, String>> optionalRecordMetadata) {
+    if (trackSuccessRecords) {
+      writtenRecordDelegates.add(Objects.requireNonNull(recordDelegate));
+    }
+    updateStatsForSuccess(optionalRecordMetadata);
+  }
+
+  private void updateStatsForSuccess(Option<Map<String, String>> 
optionalRecordMetadata) {
     totalRecords++;
 
     // get the min and max event time for calculating latency and freshness
-    if (optionalRecordMetadata.isPresent()) {
-      String eventTimeVal = 
optionalRecordMetadata.get().getOrDefault(METADATA_EVENT_TIME_KEY, null);
-      try {
-        if (!StringUtils.isNullOrEmpty(eventTimeVal)) {
-          int length = eventTimeVal.length();
-          long millisEventTime;
-          // eventTimeVal in seconds unit
-          if (length == 10) {
-            millisEventTime = Long.parseLong(eventTimeVal) * 1000;
-          } else if (length == 13) {
-            // eventTimeVal in millis unit
-            millisEventTime = Long.parseLong(eventTimeVal);
-          } else {
-            throw new IllegalArgumentException("not support event_time 
format:" + eventTimeVal);
-          }
-          long eventTime = 
DateTimeUtils.parseDateTime(Long.toString(millisEventTime)).toEpochMilli();
-          stat.setMinEventTime(eventTime);
-          stat.setMaxEventTime(eventTime);
-        }
-      } catch (DateTimeException | IllegalArgumentException e) {
-        LOG.debug(String.format("Fail to parse event time value: %s", 
eventTimeVal), e);
+    String eventTimeVal = optionalRecordMetadata.orElse(Collections.emptyMap())
+        .getOrDefault(METADATA_EVENT_TIME_KEY, null);
+    if (isNullOrEmpty(eventTimeVal)) {
+      return;
+    }
+    try {
+      int length = eventTimeVal.length();
+      long millisEventTime;
+      // eventTimeVal in seconds unit
+      if (length == 10) {
+        millisEventTime = Long.parseLong(eventTimeVal) * 1000;
+      } else if (length == 13) {
+        // eventTimeVal in millis unit
+        millisEventTime = Long.parseLong(eventTimeVal);
+      } else {
+        throw new IllegalArgumentException("not support event_time format:" + 
eventTimeVal);
       }
+      long eventTime = 
DateTimeUtils.parseDateTime(Long.toString(millisEventTime)).toEpochMilli();
+      stat.setMinEventTime(eventTime);
+      stat.setMaxEventTime(eventTime);
+    } catch (DateTimeException | IllegalArgumentException e) {
+      LOG.debug(String.format("Fail to parse event time value: %s", 
eventTimeVal), e);
     }
   }
 
@@ -126,15 +148,35 @@ public class WriteStatus implements Serializable {
    * Mark write as failed, optionally using given parameters for the purpose 
of calculating some aggregate metrics. This
    * method is not meant to cache passed arguments, since WriteStatus objects 
are collected in Spark Driver.
    *
-   * @param record deflated {@code HoodieRecord} containing information that 
uniquely identifies it.
+   * @param record                 deflated {@code HoodieRecord} containing 
information that uniquely identifies it.
    * @param optionalRecordMetadata optional metadata related to data contained 
in {@link HoodieRecord} before deflation.
    */
   public void markFailure(HoodieRecord record, Throwable t, Option<Map<String, 
String>> optionalRecordMetadata) {
     if (failedRecords.isEmpty() || (random.nextDouble() <= failureFraction)) {
       // Guaranteed to have at-least one error
-      failedRecords.add(record);
+      failedRecords.add(Pair.of(HoodieRecordDelegate.fromHoodieRecord(record), 
t));
       errors.put(record.getKey(), t);
     }
+    updateStatsForFailure();
+  }
+
+  /**
+   * Used by native write handles like HoodieRowCreateHandle and 
HoodieRowDataCreateHandle.
+   *
+   * @see WriteStatus#markFailure(HoodieRecord, Throwable, Option)
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public void markFailure(String recordKey, String partitionPath, Throwable t) 
{
+    if (failedRecords.isEmpty() || (random.nextDouble() <= failureFraction)) {
+      // Guaranteed to have at-least one error
+      HoodieRecordDelegate recordDelegate = 
HoodieRecordDelegate.create(recordKey, partitionPath);
+      failedRecords.add(Pair.of(recordDelegate, t));
+      errors.put(recordDelegate.getHoodieKey(), t);
+    }
+    updateStatsForFailure();
+  }
+
+  private void updateStatsForFailure() {
     totalRecords++;
     totalErrorRecords++;
   }
@@ -171,11 +213,11 @@ public class WriteStatus implements Serializable {
     this.globalError = t;
   }
 
-  public List<HoodieRecord> getWrittenRecords() {
-    return writtenRecords;
+  public List<HoodieRecordDelegate> getWrittenRecordDelegates() {
+    return writtenRecordDelegates;
   }
 
-  public List<HoodieRecord> getFailedRecords() {
+  public List<Pair<HoodieRecordDelegate, Throwable>> getFailedRecords() {
     return failedRecords;
   }
 
@@ -211,6 +253,10 @@ public class WriteStatus implements Serializable {
     this.totalErrorRecords = totalErrorRecords;
   }
 
+  public boolean isTrackingSuccessfulWrites() {
+    return trackSuccessRecords;
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder("WriteStatus {");
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/inmemory/HoodieInMemoryHashIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/inmemory/HoodieInMemoryHashIndex.java
index 67e201d1afb..67766c033a1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/inmemory/HoodieInMemoryHashIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/inmemory/HoodieInMemoryHashIndex.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
@@ -83,15 +84,14 @@ public class HoodieInMemoryHashIndex
       HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context,
       HoodieTable hoodieTable) {
     return writeStatuses.map(writeStatus -> {
-      for (HoodieRecord record : writeStatus.getWrittenRecords()) {
-        if (!writeStatus.isErrored(record.getKey())) {
-          HoodieKey key = record.getKey();
-          Option<HoodieRecordLocation> newLocation = record.getNewLocation();
+      for (HoodieRecordDelegate recordDelegate : 
writeStatus.getWrittenRecordDelegates()) {
+        if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) {
+          Option<HoodieRecordLocation> newLocation = 
recordDelegate.getNewLocation();
           if (newLocation.isPresent()) {
-            recordLocationMap.put(key, newLocation.get());
+            recordLocationMap.put(recordDelegate.getHoodieKey(), 
newLocation.get());
           } else {
             // Delete existing index for a deleted record
-            recordLocationMap.remove(key);
+            recordLocationMap.remove(recordDelegate.getHoodieKey());
           }
         }
       }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index f57dab94f1f..24c2c193c6d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -33,7 +33,6 @@ import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodiePayloadProps;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
-import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.model.MetadataValues;
@@ -305,7 +304,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     stat.setLogFiles(new ArrayList<>(prevStat.getLogFiles()));
 
     this.writeStatus = (WriteStatus) 
ReflectionUtils.loadClass(config.getWriteStatusClassName(),
-        !hoodieTable.getIndex().isImplicitWithStorage(), 
config.getWriteStatusFailureFraction());
+        hoodieTable.shouldTrackSuccessRecords(), 
config.getWriteStatusFailureFraction());
     this.writeStatus.setFileId(fileId);
     this.writeStatus.setPartitionPath(partitionPath);
     this.writeStatus.setStat(stat);
@@ -560,7 +559,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     // update the new location of the record, so we know where to find it next
     if (needsUpdateLocation()) {
       record.unseal();
-      record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
+      record.setNewLocation(newRecordLocation);
       record.seal();
     }
     // fetch the ordering val first in case the record was deflated.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 97fd9878b4f..bdb35641f26 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
 import org.apache.hudi.common.model.IOType;
@@ -152,7 +151,7 @@ public class HoodieCreateHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
 
         // Update the new location of record, so we know where to find it next
         record.unseal();
-        record.setNewLocation(new HoodieRecordLocation(instantTime, 
writeStatus.getFileId()));
+        record.setNewLocation(newRecordLocation);
         record.seal();
 
         recordsWritten++;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 4a635417943..a63050ded5f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -54,7 +55,6 @@ import java.util.Collections;
 import java.util.List;
 
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
-import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getMetadataPartitionsNeedingWriteStatusTracking;
 
 /**
  * Base class for all write operations logically performed at the file group 
level.
@@ -72,6 +72,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends 
HoodieIOHandle<T, I,
 
   protected HoodieTimer timer;
   protected WriteStatus writeStatus;
+  protected HoodieRecordLocation newRecordLocation;
   protected final String partitionPath;
   protected final String fileId;
   protected final String writeToken;
@@ -96,18 +97,13 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends 
HoodieIOHandle<T, I,
     this.writeSchema = overriddenSchema.orElseGet(() -> 
getWriteSchema(config));
     this.writeSchemaWithMetaFields = 
HoodieAvroUtils.addMetadataFields(writeSchema, 
config.allowOperationMetadataField());
     this.timer = HoodieTimer.start();
+    this.newRecordLocation = new HoodieRecordLocation(instantTime, fileId);
     this.taskContextSupplier = taskContextSupplier;
     this.writeToken = makeWriteToken();
     this.schemaOnReadEnabled = 
!isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
     this.recordMerger = config.getRecordMerger();
-
-    // We need to track written records within WriteStatus in two cases:
-    // 1. When the HoodieIndex being used is not implicit with storage
-    // 2. If any of the metadata table partitions (record index, etc) which 
require written record tracking are enabled
-    final boolean trackSuccessRecords = 
!hoodieTable.getIndex().isImplicitWithStorage()
-        || 
getMetadataPartitionsNeedingWriteStatusTracking(config.getMetadataConfig(), 
hoodieTable.getMetaClient());
     this.writeStatus = (WriteStatus) 
ReflectionUtils.loadClass(config.getWriteStatusClassName(),
-        trackSuccessRecords, config.getWriteStatusFailureFraction());
+        hoodieTable.shouldTrackSuccessRecords(), 
config.getWriteStatusFailureFraction());
   }
 
   /**
@@ -218,7 +214,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends 
HoodieIOHandle<T, I,
   public HoodieTableMetaClient getHoodieTableMetaClient() {
     return hoodieTable.getMetaClient();
   }
-  
+
   public String getFileId() {
     return this.fileId;
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index aeaf1806936..f9d4514c7e3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -35,10 +35,10 @@ import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -1161,20 +1161,19 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
   private HoodieData<HoodieRecord> 
getRecordIndexUpdates(HoodieData<WriteStatus> writeStatuses) {
     return writeStatuses.flatMap(writeStatus -> {
       List<HoodieRecord> recordList = new LinkedList<>();
-      for (HoodieRecord writtenRecord : writeStatus.getWrittenRecords()) {
-        if (!writeStatus.isErrored(writtenRecord.getKey())) {
+      for (HoodieRecordDelegate recordDelegate : 
writeStatus.getWrittenRecordDelegates()) {
+        if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) {
           HoodieRecord hoodieRecord;
-          HoodieKey key = writtenRecord.getKey();
-          Option<HoodieRecordLocation> newLocation = 
writtenRecord.getNewLocation();
+          Option<HoodieRecordLocation> newLocation = 
recordDelegate.getNewLocation();
           if (newLocation.isPresent()) {
-            if (writtenRecord.getCurrentLocation() != null) {
+            if (recordDelegate.getCurrentLocation().isPresent()) {
               // This is an update, no need to update index if the location 
has not changed
               // newLocation should have the same fileID as currentLocation. 
The instantTimes differ as newLocation's
               // instantTime refers to the current commit which was completed.
-              if 
(!writtenRecord.getCurrentLocation().getFileId().equals(newLocation.get().getFileId()))
 {
+              if 
(!recordDelegate.getCurrentLocation().get().getFileId().equals(newLocation.get().getFileId()))
 {
                 final String msg = String.format("Detected update in location 
of record with key %s from %s "
                         + " to %s. The fileID should not change.",
-                    writtenRecord.getKey(), 
writtenRecord.getCurrentLocation(), newLocation.get());
+                    recordDelegate, recordDelegate.getCurrentLocation().get(), 
newLocation.get());
                 LOG.error(msg);
                 throw new HoodieMetadataException(msg);
               } else {
@@ -1183,11 +1182,12 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
               }
             }
 
-            hoodieRecord = 
HoodieMetadataPayload.createRecordIndexUpdate(key.getRecordKey(), 
key.getPartitionPath(),
+            hoodieRecord = HoodieMetadataPayload.createRecordIndexUpdate(
+                recordDelegate.getRecordKey(), 
recordDelegate.getPartitionPath(),
                 newLocation.get().getFileId(), 
newLocation.get().getInstantTime());
           } else {
             // Delete existing index for a deleted record
-            hoodieRecord = 
HoodieMetadataPayload.createRecordIndexDelete(key.getRecordKey());
+            hoodieRecord = 
HoodieMetadataPayload.createRecordIndexDelete(recordDelegate.getRecordKey());
           }
 
           recordList.add(hoodieRecord);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index d31a5c44e07..8e5371532ed 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -110,6 +110,7 @@ import static 
org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PART
 import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getMetadataPartitionsNeedingWriteStatusTracking;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
 
 /**
@@ -1040,6 +1041,20 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
     return this.metadata;
   }
 
+  /**
+   * When {@link HoodieTableConfig#POPULATE_META_FIELDS} is enabled,
+   * we need to track written records within WriteStatus in two cases:
+   * <ol>
+   *   <li> When the HoodieIndex being used is not implicit with storage
+   *   <li> If any of the metadata table partitions (record index, etc) which 
require written record tracking are enabled
+   * </ol>
+   */
+  public boolean shouldTrackSuccessRecords() {
+    return config.populateMetaFields()
+        && (!getIndex().isImplicitWithStorage()
+        || 
getMetadataPartitionsNeedingWriteStatusTracking(config.getMetadataConfig(), 
getMetaClient()));
+  }
+
   public Runnable getPreExecuteRunnable() {
     return Functions.noop();
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
index 739928d6fe4..6cff94068d6 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
@@ -18,16 +18,19 @@
 
 package org.apache.hudi.io.storage.row;
 
-import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.model.HoodieRowData;
 import org.apache.hudi.client.model.HoodieRowDataCreation;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
+import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieInsertException;
@@ -67,7 +70,9 @@ public class HoodieRowDataCreateHandle implements 
Serializable {
   private final String fileId;
   private final boolean preserveHoodieMetadata;
   private final FileSystem fs;
-  protected final HoodieInternalWriteStatus writeStatus;
+  protected final WriteStatus writeStatus;
+  private final HoodieRecordLocation newRecordLocation;
+
   private final HoodieTimer currTimer;
 
   public HoodieRowDataCreateHandle(HoodieTable table, HoodieWriteConfig 
writeConfig, String partitionPath, String fileId,
@@ -81,11 +86,13 @@ public class HoodieRowDataCreateHandle implements 
Serializable {
     this.taskId = taskId;
     this.taskEpochId = taskEpochId;
     this.fileId = fileId;
+    this.newRecordLocation = new HoodieRecordLocation(instantTime, fileId);
     this.preserveHoodieMetadata = preserveHoodieMetadata;
     this.currTimer = HoodieTimer.start();
     this.fs = table.getMetaClient().getFs();
     this.path = makeNewPath(partitionPath);
-    this.writeStatus = new 
HoodieInternalWriteStatus(!table.getIndex().isImplicitWithStorage(),
+
+    this.writeStatus = new WriteStatus(table.shouldTrackSuccessRecords(),
         writeConfig.getWriteStatusFailureFraction());
     writeStatus.setPartitionPath(partitionPath);
     writeStatus.setFileId(fileId);
@@ -129,9 +136,11 @@ public class HoodieRowDataCreateHandle implements 
Serializable {
           record, writeConfig.allowOperationMetadataField(), 
preserveHoodieMetadata);
       try {
         fileWriter.writeRow(recordKey, rowData);
-        writeStatus.markSuccess(recordKey);
+        HoodieRecordDelegate recordDelegate = 
writeStatus.isTrackingSuccessfulWrites()
+            ? HoodieRecordDelegate.create(recordKey, partitionPath, null, 
newRecordLocation) : null;
+        writeStatus.markSuccess(recordDelegate, Option.empty());
       } catch (Throwable t) {
-        writeStatus.markFailure(recordKey, t);
+        writeStatus.markFailure(recordKey, partitionPath, t);
       }
     } catch (Throwable ge) {
       writeStatus.setGlobalError(ge);
@@ -147,13 +156,13 @@ public class HoodieRowDataCreateHandle implements 
Serializable {
   }
 
   /**
-   * Closes the {@link HoodieRowDataCreateHandle} and returns an instance of 
{@link HoodieInternalWriteStatus} containing the stats and
+   * Closes the {@link HoodieRowDataCreateHandle} and returns an instance of 
{@link WriteStatus} containing the stats and
    * status of the writes to this handle.
    *
-   * @return the {@link HoodieInternalWriteStatus} containing the stats and 
status of the writes to this handle.
+   * @return the {@link WriteStatus} containing the stats and status of the 
writes to this handle.
    * @throws IOException
    */
-  public HoodieInternalWriteStatus close() throws IOException {
+  public WriteStatus close() throws IOException {
     fileWriter.close();
     HoodieWriteStat stat = writeStatus.getStat();
     stat.setPartitionPath(partitionPath);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
index 8e2b8a1e82c..d706070e4c8 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -356,22 +357,22 @@ public class SparkHoodieHBaseIndex extends 
HoodieIndex<Object, Object> {
             LOG.info("multiPutBatchSize for this job: " + 
this.multiPutBatchSize);
             // Create a rate limiter that allows `multiPutBatchSize` 
operations per second
             // Any calls beyond `multiPutBatchSize` within a second will be 
rate limited
-            for (HoodieRecord rec : writeStatus.getWrittenRecords()) {
-              if (!writeStatus.isErrored(rec.getKey())) {
-                Option<HoodieRecordLocation> loc = rec.getNewLocation();
+            for (HoodieRecordDelegate recordDelegate : 
writeStatus.getWrittenRecordDelegates()) {
+              if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) {
+                Option<HoodieRecordLocation> loc = 
recordDelegate.getNewLocation();
                 if (loc.isPresent()) {
-                  if (rec.getCurrentLocation() != null) {
+                  if (recordDelegate.getCurrentLocation().isPresent()) {
                     // This is an update, no need to update index
                     continue;
                   }
-                  Put put = new 
Put(Bytes.toBytes(getHBaseKey(rec.getRecordKey())));
+                  Put put = new 
Put(Bytes.toBytes(getHBaseKey(recordDelegate.getRecordKey())));
                   put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, 
Bytes.toBytes(loc.get().getInstantTime()));
                   put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, 
Bytes.toBytes(loc.get().getFileId()));
-                  put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, 
Bytes.toBytes(rec.getPartitionPath()));
+                  put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, 
Bytes.toBytes(recordDelegate.getPartitionPath()));
                   mutations.add(put);
                 } else {
                   // Delete existing index for a deleted record
-                  Delete delete = new 
Delete(Bytes.toBytes(getHBaseKey(rec.getRecordKey())));
+                  Delete delete = new 
Delete(Bytes.toBytes(getHBaseKey(recordDelegate.getRecordKey())));
                   mutations.add(delete);
                 }
               }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
index a2a553470f2..04362f94da5 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
@@ -18,14 +18,17 @@
 
 package org.apache.hudi.io.storage.row;
 
-import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.model.HoodieInternalRow;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
+import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
@@ -75,7 +78,8 @@ public class HoodieRowCreateHandle implements Serializable {
   private final HoodieTimer currTimer;
 
   protected final HoodieInternalRowFileWriter fileWriter;
-  protected final HoodieInternalWriteStatus writeStatus;
+  protected final WriteStatus writeStatus;
+  private final HoodieRecordLocation newRecordLocation;
 
   public HoodieRowCreateHandle(HoodieTable table,
                                HoodieWriteConfig writeConfig,
@@ -104,6 +108,7 @@ public class HoodieRowCreateHandle implements Serializable {
     this.table = table;
     this.writeConfig = writeConfig;
     this.fileId = fileId;
+    this.newRecordLocation = new HoodieRecordLocation(instantTime, fileId);
 
     this.currTimer = HoodieTimer.start();
 
@@ -118,7 +123,7 @@ public class HoodieRowCreateHandle implements Serializable {
     this.commitTime = UTF8String.fromString(instantTime);
     this.seqIdGenerator = (id) -> HoodieRecord.generateSequenceId(instantTime, 
taskPartitionId, id);
 
-    this.writeStatus = new 
HoodieInternalWriteStatus(!table.getIndex().isImplicitWithStorage(),
+    this.writeStatus = new WriteStatus(table.shouldTrackSuccessRecords(),
         writeConfig.getWriteStatusFailureFraction());
     this.shouldPreserveHoodieMetadata = shouldPreserveHoodieMetadata;
 
@@ -184,9 +189,11 @@ public class HoodieRowCreateHandle implements Serializable 
{
         fileWriter.writeRow(recordKey, updatedRow);
         // NOTE: To avoid conversion on the hot-path we only convert 
[[UTF8String]] into [[String]]
         //       in cases when successful records' writes are being tracked
-        writeStatus.markSuccess(writeStatus.isTrackingSuccessfulWrites() ? 
recordKey.toString() : null);
+        HoodieRecordDelegate recordDelegate = 
writeStatus.isTrackingSuccessfulWrites()
+            ? HoodieRecordDelegate.create(recordKey.toString(), 
partitionPath.toString(), null, newRecordLocation) : null;
+        writeStatus.markSuccess(recordDelegate, Option.empty());
       } catch (Exception t) {
-        writeStatus.markFailure(recordKey.toString(), t);
+        writeStatus.markFailure(recordKey.toString(), 
partitionPath.toString(), t);
       }
     } catch (Exception e) {
       writeStatus.setGlobalError(e);
@@ -199,7 +206,8 @@ public class HoodieRowCreateHandle implements Serializable {
       // TODO make sure writing w/ and w/o meta fields is consistent 
(currently writing w/o
       //      meta-fields would fail if any record will, while when writing w/ 
meta-fields it won't)
       fileWriter.writeRow(row);
-      writeStatus.markSuccess();
+      // when metafields disabled, we do not need to track individual records
+      writeStatus.markSuccess((HoodieRecordDelegate) null, Option.empty());
     } catch (Exception e) {
       writeStatus.setGlobalError(e);
       throw new HoodieException("Exception thrown while writing spark 
InternalRows to file ", e);
@@ -214,12 +222,12 @@ public class HoodieRowCreateHandle implements 
Serializable {
   }
 
   /**
-   * Closes the {@link HoodieRowCreateHandle} and returns an instance of 
{@link HoodieInternalWriteStatus} containing the stats and
+   * Closes the {@link HoodieRowCreateHandle} and returns an instance of 
{@link WriteStatus} containing the stats and
    * status of the writes to this handle.
    *
-   * @return the {@link HoodieInternalWriteStatus} containing the stats and 
status of the writes to this handle.
+   * @return the {@link WriteStatus} containing the stats and status of the 
writes to this handle.
    */
-  public HoodieInternalWriteStatus close() throws IOException {
+  public WriteStatus close() throws IOException {
     fileWriter.close();
     HoodieWriteStat stat = writeStatus.getStat();
     stat.setPartitionPath(partitionPath);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
index 6eddc489088..989ceb2ed77 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.table.action.commit;
 
-import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
@@ -62,7 +62,7 @@ public class BulkInsertDataInternalWriterHelper {
   protected final HoodieWriteConfig writeConfig;
   protected final StructType structType;
   protected final Boolean arePartitionRecordsSorted;
-  protected final List<HoodieInternalWriteStatus> writeStatusList = new 
ArrayList<>();
+  protected final List<WriteStatus> writeStatusList = new ArrayList<>();
   protected final String fileIdPrefix;
   protected final Map<String, HoodieRowCreateHandle> handles = new HashMap<>();
   protected final boolean populateMetaFields;
@@ -158,7 +158,7 @@ public class BulkInsertDataInternalWriterHelper {
     }
   }
 
-  public List<HoodieInternalWriteStatus> getWriteStatuses() throws IOException 
{
+  public List<WriteStatus> getWriteStatuses() throws IOException {
     close();
     return writeStatusList;
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index 835a1251aaf..3e5875d0b90 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -180,7 +180,7 @@ object HoodieDatasetBulkInsertHelper
         writer.close()
       }
 
-      writer.getWriteStatuses.asScala.map(_.toWriteStatus).iterator
+      writer.getWriteStatuses.asScala.iterator
     }).collect()
     table.getContext.parallelize(writeStatuses.toList.asJava)
   }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieInternalWriteStatus.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieInternalWriteStatus.java
deleted file mode 100644
index c3f31d816cd..00000000000
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieInternalWriteStatus.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.client;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.UUID;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/**
- * Unit tests {@link HoodieInternalWriteStatus}.
- */
-public class TestHoodieInternalWriteStatus {
-
-  @Test
-  public void testFailureFraction() {
-    HoodieInternalWriteStatus status = new HoodieInternalWriteStatus(true, 
0.1);
-    String fileId = UUID.randomUUID().toString();
-    String partitionPath = UUID.randomUUID().toString();
-    status.setFileId(fileId);
-    status.setPartitionPath(partitionPath);
-    Throwable t = new Exception("some error in writing");
-    for (int i = 0; i < 1000; i++) {
-      status.markFailure(UUID.randomUUID().toString(), t);
-    }
-    // verification
-    assertEquals(fileId, status.getFileId());
-    assertEquals(partitionPath, status.getPartitionPath());
-    assertEquals(1000, status.getTotalErrorRecords());
-    assertTrue(status.getFailedRecordKeys().size() > 0);
-    assertTrue(status.getFailedRecordKeys().size() < 150); // 150 instead of 
100, to prevent flaky test
-    assertTrue(status.hasErrors());
-  }
-
-  @Test
-  public void testSuccessRecordTracking() {
-    boolean[] vals = {true, false};
-    for (boolean trackSuccess : vals) {
-      HoodieInternalWriteStatus status = new 
HoodieInternalWriteStatus(trackSuccess, 1.0);
-      String fileId = UUID.randomUUID().toString();
-      status.setFileId(fileId);
-      String partitionPath = UUID.randomUUID().toString();
-      status.setPartitionPath(partitionPath);
-      Throwable t = new Exception("some error in writing");
-      for (int i = 0; i < 1000; i++) {
-        status.markSuccess(UUID.randomUUID().toString());
-        status.markFailure(UUID.randomUUID().toString(), t);
-      }
-      // verification
-      assertEquals(fileId, status.getFileId());
-      assertEquals(partitionPath, status.getPartitionPath());
-      assertEquals(1000, status.getTotalErrorRecords());
-      assertEquals(1000, status.getFailedRecordKeys().size());
-      assertTrue(status.hasErrors());
-      if (trackSuccess) {
-        assertEquals(1000, status.getSuccessRecordKeys().size());
-      } else {
-        assertTrue(status.getSuccessRecordKeys().isEmpty());
-      }
-      assertEquals(2000, status.getTotalRecords());
-    }
-  }
-
-  @Test
-  public void testGlobalError() {
-    HoodieInternalWriteStatus status = new HoodieInternalWriteStatus(true, 
0.1);
-    Throwable t = new Exception("some error in writing");
-    status.setGlobalError(t);
-    assertEquals(t, status.getGlobalError());
-  }
-}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
index eab9f0937c7..9a3fc609503 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
@@ -209,8 +209,8 @@ public class TestHoodieReadClient extends 
HoodieClientTestBase {
       // Construct HoodieRecord from the WriteStatus but set HoodieKey, Data 
and HoodieRecordLocation accordingly
       // since they have been modified in the DAG
       JavaRDD<HoodieRecord> recordRDD =
-          
jsc.parallelize(result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
-              .map(record -> new HoodieAvroRecord(record.getKey(), 
null)).collect(Collectors.toList()), PARALLELISM);
+          
jsc.parallelize(result.collect().stream().map(WriteStatus::getWrittenRecordDelegates).flatMap(Collection::stream)
+              .map(recordDelegate -> new 
HoodieAvroRecord(recordDelegate.getHoodieKey(), 
null)).collect(Collectors.toList()), PARALLELISM);
       // Should have 100 records in table (check using Index), all in 
locations marked at commit
       SparkRDDReadClient readClient = 
getHoodieReadClient(hoodieWriteConfig.getBasePath());
       List<HoodieRecord> taggedRecords = 
readClient.tagLocation(recordRDD).collect();
@@ -225,8 +225,8 @@ public class TestHoodieReadClient extends 
HoodieClientTestBase {
           Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), 
initCommitTime, numRecords, updateFn, isPrepped, true,
           numRecords, 200, 2);
       recordRDD =
-          
jsc.parallelize(result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
-              .map(record -> new HoodieAvroRecord(record.getKey(), 
null)).collect(Collectors.toList()), PARALLELISM);
+          
jsc.parallelize(result.collect().stream().map(WriteStatus::getWrittenRecordDelegates).flatMap(Collection::stream)
+              .map(recordDelegate -> new 
HoodieAvroRecord(recordDelegate.getHoodieKey(), 
null)).collect(Collectors.toList()), PARALLELISM);
       // Index should be able to locate all updates in correct locations.
       readClient = getHoodieReadClient(hoodieWriteConfig.getBasePath());
       taggedRecords = readClient.tagLocation(recordRDD).collect();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
index 99fb76650f9..92459684979 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
@@ -24,9 +24,12 @@ import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.util.Option;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -35,6 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 
 public class TestWriteStatus {
+
   @Test
   public void testFailureFraction() {
     WriteStatus status = new WriteStatus(true, 0.1);
@@ -57,7 +61,7 @@ public class TestWriteStatus {
     }
     assertEquals(1000, status.getFailedRecords().size());
     assertTrue(status.hasErrors());
-    assertTrue(status.getWrittenRecords().isEmpty());
+    assertTrue(status.getWrittenRecordDelegates().isEmpty());
     assertEquals(2000, status.getTotalRecords());
   }
 
@@ -145,4 +149,57 @@ public class TestWriteStatus {
     assertNull(status.getStat().getMinEventTime());
 
   }
+
+  @Test
+  public void testFailureFractionExtended() {
+    WriteStatus status = new WriteStatus(true, 0.1);
+    String fileId = UUID.randomUUID().toString();
+    String partitionPath = UUID.randomUUID().toString();
+    status.setFileId(fileId);
+    status.setPartitionPath(partitionPath);
+    Throwable t = new Exception("some error in writing");
+    for (int i = 0; i < 1000; i++) {
+      status.markFailure(mock(HoodieRecord.class), t, Option.empty());
+    }
+    // verification
+    assertEquals(fileId, status.getFileId());
+    assertEquals(partitionPath, status.getPartitionPath());
+    assertTrue(status.getFailedRecords().size() > 0);
+    assertTrue(status.getFailedRecords().size() < 150); // 150 instead of 100, 
to prevent flaky test
+    assertTrue(status.hasErrors());
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testSuccessRecordTrackingExtended(boolean trackSuccess) {
+    WriteStatus status = new WriteStatus(trackSuccess, 1.0);
+    String fileId = UUID.randomUUID().toString();
+    status.setFileId(fileId);
+    String partitionPath = UUID.randomUUID().toString();
+    status.setPartitionPath(partitionPath);
+    Throwable t = new Exception("some error in writing");
+    for (int i = 0; i < 1000; i++) {
+      status.markSuccess(mock(HoodieRecord.class), Option.empty());
+      status.markFailure(mock(HoodieRecord.class), t, Option.empty());
+    }
+    // verification
+    assertEquals(fileId, status.getFileId());
+    assertEquals(partitionPath, status.getPartitionPath());
+    assertEquals(1000, status.getFailedRecords().size());
+    assertTrue(status.hasErrors());
+    if (trackSuccess) {
+      assertEquals(1000, status.getWrittenRecordDelegates().size());
+    } else {
+      assertTrue(status.getWrittenRecordDelegates().isEmpty());
+    }
+    assertEquals(2000, status.getTotalRecords());
+  }
+
+  @Test
+  public void testGlobalError() {
+    WriteStatus status = new WriteStatus(true, 0.1);
+    Throwable t = new Exception("some error in writing");
+    status.setGlobalError(t);
+    assertEquals(t, status.getGlobalError());
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 9a310c0b441..8183fa4d830 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -54,6 +54,7 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -515,7 +516,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
       List<WriteStatus> statuses = writeFn.apply(client, recordList, 
newCommitTime).collect();
       assertNoWriteErrors(statuses);
       assertEquals(2, statuses.size());
-      
assertNodupesInPartition(statuses.stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
+      
assertNodupesInPartition(statuses.stream().map(WriteStatus::getWrittenRecordDelegates).flatMap(Collection::stream)
           .collect(Collectors.toList()));
     }
   }
@@ -568,7 +569,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
       List<WriteStatus> statuses = writeFn.apply(client, recordList, 
newCommitTime).collect();
       assertNoWriteErrors(statuses);
       assertEquals(2, statuses.size());
-      
assertNodupesInPartition(statuses.stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
+      
assertNodupesInPartition(statuses.stream().map(WriteStatus::getWrittenRecordDelegates).flatMap(Collection::stream)
           .collect(Collectors.toList()));
     }
   }
@@ -576,18 +577,18 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
   /**
    * Assert that there is no duplicate key at the partition level.
    *
-   * @param records List of Hoodie records
+   * @param recordDelegates List of Hoodie record delegates
    */
-  void assertNodupesInPartition(List<HoodieRecord> records) {
+  void assertNodupesInPartition(List<HoodieRecordDelegate> recordDelegates) {
     Map<String, Set<String>> partitionToKeys = new HashMap<>();
-    for (HoodieRecord r : records) {
-      String key = r.getRecordKey();
+    for (HoodieRecordDelegate r : recordDelegates) {
+      String recordKey = r.getRecordKey();
       String partitionPath = r.getPartitionPath();
       if (!partitionToKeys.containsKey(partitionPath)) {
         partitionToKeys.put(partitionPath, new HashSet<>());
       }
-      assertFalse(partitionToKeys.get(partitionPath).contains(key), "key " + 
key + " is duplicate within partition " + partitionPath);
-      partitionToKeys.get(partitionPath).add(key);
+      assertFalse(partitionToKeys.get(partitionPath).contains(recordKey), "key 
" + recordKey + " is duplicate within partition " + partitionPath);
+      partitionToKeys.get(partitionPath).add(recordKey);
     }
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
index 1edfb2133a6..963274d4a29 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
@@ -119,7 +119,6 @@ public class TestHoodieIndex extends TestHoodieMetadataBase 
{
         {IndexType.GLOBAL_SIMPLE, false, false},
         {IndexType.BUCKET, false, true},
         {IndexType.BUCKET, false, false},
-        {IndexType.RECORD_INDEX, false, true},
         {IndexType.RECORD_INDEX, true, true},
         {IndexType.RECORD_INDEX, true, false}
     };
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
index a0dc0e01241..d37bee23688 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -800,7 +801,8 @@ public class TestSparkHoodieHBaseIndex extends 
SparkClientFunctionalTestHarness
       // is not implemented via HoodieWriteClient
       JavaRDD<WriteStatus> deleteWriteStatues = writeStatues.map(w -> {
         WriteStatus newWriteStatus = new WriteStatus(true, 1.0);
-        w.getWrittenRecords().forEach(r -> newWriteStatus.markSuccess(new 
HoodieAvroRecord(r.getKey(), null), Option.empty()));
+        w.getWrittenRecordDelegates().forEach(r -> newWriteStatus
+            .markSuccess(HoodieRecordDelegate.create(r.getHoodieKey()), 
Option.empty()));
         assertEquals(w.getTotalRecords(), newWriteStatus.getTotalRecords());
         newWriteStatus.setStat(new HoodieWriteStat());
         return newWriteStatus;
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
index 05a4de483c1..21e12c74a2d 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
@@ -21,6 +21,7 @@ package org.apache.hudi.io;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -33,11 +34,11 @@ import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
-import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
 import org.apache.hudi.testutils.HoodieClientTestUtils;
+
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -331,7 +332,7 @@ public class TestHoodieMergeHandle extends 
HoodieClientTestHarness {
           (long) statuses.stream().map(status -> 
status.getStat().getNumInserts()).reduce((a, b) -> a + b).get());
       // Verify all records have location set
       statuses.forEach(writeStatus -> {
-        writeStatus.getWrittenRecords().forEach(r -> {
+        writeStatus.getWrittenRecordDelegates().forEach(r -> {
           // Ensure New Location is set
           assertTrue(r.getNewLocation().isPresent());
         });
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
index 47b07075a06..6dc90af78ae 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.io.storage.row;
 
-import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -109,7 +109,7 @@ public class TestHoodieRowCreateHandle extends 
HoodieClientTestHarness {
       }
 
       // issue writes
-      HoodieInternalWriteStatus writeStatus = 
writeAndGetWriteStatus(inputRows, handle);
+      WriteStatus writeStatus = writeAndGetWriteStatus(inputRows, handle);
 
       fileAbsPaths.add(basePath + "/" + writeStatus.getStat().getPath());
       fileNames.add(handle.getFileName());
@@ -161,7 +161,7 @@ public class TestHoodieRowCreateHandle extends 
HoodieClientTestHarness {
       // expected
     }
     // close the create handle
-    HoodieInternalWriteStatus writeStatus = handle.close();
+    WriteStatus writeStatus = handle.close();
 
     List<String> fileNames = new ArrayList<>();
     fileNames.add(handle.getFileName());
@@ -203,7 +203,7 @@ public class TestHoodieRowCreateHandle extends 
HoodieClientTestHarness {
     }
   }
 
-  private HoodieInternalWriteStatus writeAndGetWriteStatus(Dataset<Row> 
inputRows, HoodieRowCreateHandle handle)
+  private WriteStatus writeAndGetWriteStatus(Dataset<Row> inputRows, 
HoodieRowCreateHandle handle)
       throws Exception {
     List<InternalRow> internalRows = 
SparkDatasetTestUtils.toInternalRows(inputRows, SparkDatasetTestUtils.ENCODER);
     // issue writes
@@ -214,11 +214,12 @@ public class TestHoodieRowCreateHandle extends 
HoodieClientTestHarness {
     return handle.close();
   }
 
-  private void assertOutput(HoodieInternalWriteStatus writeStatus, int size, 
String fileId, String partitionPath,
+  private void assertOutput(WriteStatus writeStatus, int size, String fileId, 
String partitionPath,
                             String instantTime, Dataset<Row> inputRows, 
List<String> filenames, List<String> fileAbsPaths, boolean populateMetaFields) {
     assertEquals(writeStatus.getPartitionPath(), partitionPath);
     assertEquals(writeStatus.getTotalRecords(), size);
     assertEquals(writeStatus.getTotalErrorRecords(), 0);
+    assertEquals(writeStatus.getTotalErrorRecords(), 0);
     assertFalse(writeStatus.hasErrors());
     assertNull(writeStatus.getGlobalError());
     assertEquals(writeStatus.getFileId(), fileId);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index d78241aaeb4..1ee019eb713 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -29,6 +29,8 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
@@ -223,6 +225,7 @@ public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterf
     return this;
   }
 
+  @Nullable
   public HoodieRecordLocation getCurrentLocation() {
     return currentLocation;
   }
@@ -237,8 +240,9 @@ public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterf
     return this;
   }
 
-  public Option<HoodieRecordLocation> getNewLocation() {
-    return Option.ofNullable(this.newLocation);
+  @Nullable
+  public HoodieRecordLocation getNewLocation() {
+    return this.newLocation;
   }
 
   public boolean isCurrentLocationKnown() {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordDelegate.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordDelegate.java
new file mode 100644
index 00000000000..32e81bb7f8b
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordDelegate.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.util.Option;
+
+import javax.annotation.Nullable;
+
+/**
+ * Delegate for {@link HoodieRecord}.
+ * <p>
+ * This is used when write handles report back write operation's info and 
stats,
+ * instead of passing back the full {@link HoodieRecord}, this lean delegate
+ * of it will be passed instead.
+ */
+public class HoodieRecordDelegate {
+
+  private final HoodieKey hoodieKey;
+
+  /**
+   * Current location of record on storage. Filled in by looking up index
+   */
+  private final Option<HoodieRecordLocation> currentLocation;
+
+  /**
+   * New location of record on storage, after written.
+   */
+  private final Option<HoodieRecordLocation> newLocation;
+
+  private HoodieRecordDelegate(HoodieKey hoodieKey,
+                               @Nullable HoodieRecordLocation currentLocation,
+                               @Nullable HoodieRecordLocation newLocation) {
+    this.hoodieKey = hoodieKey;
+    this.currentLocation = Option.ofNullable(currentLocation);
+    this.newLocation = Option.ofNullable(newLocation);
+  }
+
+  public static HoodieRecordDelegate create(String recordKey, String 
partitionPath) {
+    return new HoodieRecordDelegate(new HoodieKey(recordKey, partitionPath), 
null, null);
+  }
+
+  public static HoodieRecordDelegate create(String recordKey,
+                                            String partitionPath,
+                                            HoodieRecordLocation 
currentLocation) {
+    return new HoodieRecordDelegate(new HoodieKey(recordKey, partitionPath), 
currentLocation, null);
+  }
+
+  public static HoodieRecordDelegate create(String recordKey,
+                                            String partitionPath,
+                                            HoodieRecordLocation 
currentLocation,
+                                            HoodieRecordLocation newLocation) {
+    return new HoodieRecordDelegate(new HoodieKey(recordKey, partitionPath), 
currentLocation, newLocation);
+  }
+
+  public static HoodieRecordDelegate create(HoodieKey key) {
+    return new HoodieRecordDelegate(key, null, null);
+  }
+
+  public static HoodieRecordDelegate create(HoodieKey key, 
HoodieRecordLocation currentLocation) {
+    return new HoodieRecordDelegate(key, currentLocation, null);
+  }
+
+  public static HoodieRecordDelegate create(HoodieKey key,
+                                            HoodieRecordLocation 
currentLocation,
+                                            HoodieRecordLocation newLocation) {
+    return new HoodieRecordDelegate(key, currentLocation, newLocation);
+  }
+
+  public static HoodieRecordDelegate fromHoodieRecord(HoodieRecord record) {
+    return new HoodieRecordDelegate(record.getKey(), 
record.getCurrentLocation(), record.getNewLocation());
+  }
+
+  public static HoodieRecordDelegate fromHoodieRecord(HoodieRecord record,
+                                                      @Nullable 
HoodieRecordLocation newLocationOverride) {
+    return new HoodieRecordDelegate(record.getKey(), 
record.getCurrentLocation(), newLocationOverride);
+  }
+
+  public String getRecordKey() {
+    return hoodieKey.getRecordKey();
+  }
+
+  public String getPartitionPath() {
+    return hoodieKey.getPartitionPath();
+  }
+
+  public HoodieKey getHoodieKey() {
+    return hoodieKey;
+  }
+
+  public Option<HoodieRecordLocation> getCurrentLocation() {
+    return currentLocation;
+  }
+
+  public Option<HoodieRecordLocation> getNewLocation() {
+    return newLocation;
+  }
+
+  @Override
+  public String toString() {
+    return "HoodieRecordDelegate{"
+        + "hoodieKey=" + hoodieKey
+        + ", currentLocation=" + currentLocation
+        + ", newLocation=" + newLocation
+        + '}';
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
index 8ef0eab2eaa..56f668e32f0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.sink.bulk;
 
-import org.apache.hudi.client.HoodieInternalWriteStatus;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -43,7 +42,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.stream.Collectors;
 
 /**
  * Helper class for bulk insert used by Flink.
@@ -61,13 +59,14 @@ public class BulkInsertWriterHelper {
   protected final RowType rowType;
   protected final boolean preserveHoodieMetadata;
   protected final Boolean isInputSorted;
-  private final List<HoodieInternalWriteStatus> writeStatusList = new 
ArrayList<>();
+  private final List<WriteStatus> writeStatusList = new ArrayList<>();
   protected HoodieRowDataCreateHandle handle;
   private String lastKnownPartitionPath = null;
   private final String fileIdPrefix;
   private int numFilesWritten = 0;
   protected final Map<String, HoodieRowDataCreateHandle> handles = new 
HashMap<>();
-  @Nullable protected final RowDataKeyGen keyGen;
+  @Nullable
+  protected final RowDataKeyGen keyGen;
 
   public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, 
HoodieWriteConfig writeConfig,
                                 String instantTime, int taskPartitionId, long 
taskId, long taskEpochId, RowType rowType) {
@@ -118,11 +117,6 @@ public class BulkInsertWriterHelper {
     }
   }
 
-  public List<HoodieInternalWriteStatus> getHoodieWriteStatuses() throws 
IOException {
-    close();
-    return writeStatusList;
-  }
-
   private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath) 
throws IOException {
     if (!handles.containsKey(partitionPath)) { // if there is no handle 
corresponding to the partition path
       // if records are sorted, we can close all existing handles
@@ -196,24 +190,12 @@ public class BulkInsertWriterHelper {
 
   public List<WriteStatus> getWriteStatuses(int taskID) {
     try {
-      return getHoodieWriteStatuses().stream()
-          
.map(BulkInsertWriterHelper::toWriteStatus).collect(Collectors.toList());
+      close();
+      return writeStatusList;
     } catch (IOException e) {
       throw new HoodieException("Error collect the write status for task [" + 
taskID + "]", e);
     }
   }
 
-  /**
-   * Tool to convert {@link HoodieInternalWriteStatus} into {@link 
WriteStatus}.
-   */
-  private static WriteStatus toWriteStatus(HoodieInternalWriteStatus 
internalWriteStatus) {
-    WriteStatus writeStatus = new WriteStatus(false, 0.1);
-    writeStatus.setStat(internalWriteStatus.getStat());
-    writeStatus.setFileId(internalWriteStatus.getFileId());
-    writeStatus.setGlobalError(internalWriteStatus.getGlobalError());
-    writeStatus.setTotalRecords(internalWriteStatus.getTotalRecords());
-    
writeStatus.setTotalErrorRecords(internalWriteStatus.getTotalErrorRecords());
-    return writeStatus;
-  }
 }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java
index 88a7921236a..95e7abbca32 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.internal;
 
-import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
 
 import java.util.Arrays;
 import java.util.List;
@@ -28,13 +28,13 @@ import java.util.List;
  */
 public class BaseWriterCommitMessage {
 
-  private List<HoodieInternalWriteStatus> writeStatuses;
+  private List<WriteStatus> writeStatuses;
 
-  public BaseWriterCommitMessage(List<HoodieInternalWriteStatus> 
writeStatuses) {
+  public BaseWriterCommitMessage(List<WriteStatus> writeStatuses) {
     this.writeStatuses = writeStatuses;
   }
 
-  public List<HoodieInternalWriteStatus> getWriteStatuses() {
+  public List<WriteStatus> getWriteStatuses() {
     return writeStatuses;
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
index 82306490ef7..4ad6c2066a3 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.internal;
 
 import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -41,6 +42,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Helper class for HoodieDataSourceInternalWriter used by Spark datasource v2.
@@ -81,9 +83,10 @@ public class DataSourceInternalWriterHelper {
     LOG.info("Received commit of a data writer = " + message);
   }
 
-  public void commit(List<HoodieWriteStat> writeStatList) {
+  public void commit(List<WriteStatus> writeStatuses) {
     try {
-      writeClient.commitStats(instantTime, 
writeClient.getEngineContext().emptyHoodieData(), writeStatList, 
Option.of(extraMetadata),
+      List<HoodieWriteStat> writeStatList = 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
+      writeClient.commitStats(instantTime, 
writeClient.getEngineContext().parallelize(writeStatuses), writeStatList, 
Option.of(extraMetadata),
           CommitUtils.getCommitActionType(operationType, 
metaClient.getTableType()));
     } catch (Exception ioe) {
       throw new HoodieException(ioe.getMessage(), ioe);
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java
 
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java
index 30cd8ec8df8..413c2a4d0b4 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java
@@ -19,7 +19,7 @@
 package org.apache.hudi.internal;
 
 import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -85,12 +85,12 @@ public class HoodieBulkInsertInternalWriterTestBase extends 
HoodieClientTestHarn
     return getConfigBuilder(basePath, 
timelineServicePort).withProperties(properties).build();
   }
 
-  protected void assertWriteStatuses(List<HoodieInternalWriteStatus> 
writeStatuses, int batches, int size,
+  protected void assertWriteStatuses(List<WriteStatus> writeStatuses, int 
batches, int size,
                                      Option<List<String>> fileAbsPaths, 
Option<List<String>> fileNames) {
     assertWriteStatuses(writeStatuses, batches, size, false, fileAbsPaths, 
fileNames, false);
   }
 
-  protected void assertWriteStatuses(List<HoodieInternalWriteStatus> 
writeStatuses, int batches, int size, boolean areRecordsSorted,
+  protected void assertWriteStatuses(List<WriteStatus> writeStatuses, int 
batches, int size, boolean areRecordsSorted,
                                      Option<List<String>> fileAbsPaths, 
Option<List<String>> fileNames, boolean isHiveStylePartitioning) {
     if (areRecordsSorted) {
       assertEquals(batches, writeStatuses.size());
@@ -112,7 +112,7 @@ public class HoodieBulkInsertInternalWriterTestBase extends 
HoodieClientTestHarn
     }
 
     int counter = 0;
-    for (HoodieInternalWriteStatus writeStatus : writeStatuses) {
+    for (WriteStatus writeStatus : writeStatuses) {
       // verify write status
       String actualPartitionPathFormat = isHiveStylePartitioning ? 
SparkDatasetTestUtils.PARTITION_PATH_FIELD_NAME + "=%s" : "%s";
       assertEquals(String.format(actualPartitionPathFormat, 
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3]), 
writeStatus.getPartitionPath());
@@ -123,6 +123,7 @@ public class HoodieBulkInsertInternalWriterTestBase extends 
HoodieClientTestHarn
       }
       assertNull(writeStatus.getGlobalError());
       assertEquals(writeStatus.getTotalErrorRecords(), 0);
+      assertEquals(writeStatus.getTotalErrorRecords(), 0);
       assertFalse(writeStatus.hasErrors());
       assertNotNull(writeStatus.getFileId());
       String fileId = writeStatus.getFileId();
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
 
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
index 11f5d5030b4..b3d18894380 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
@@ -19,8 +19,7 @@
 package org.apache.hudi.internal;
 
 import org.apache.hudi.DataSourceUtils;
-import org.apache.hudi.client.HoodieInternalWriteStatus;
-import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 
@@ -87,9 +86,9 @@ public class HoodieDataSourceInternalWriter implements 
DataSourceWriter {
 
   @Override
   public void commit(WriterCommitMessage[] messages) {
-    List<HoodieWriteStat> writeStatList = Arrays.stream(messages).map(m -> 
(HoodieWriterCommitMessage) m)
-        .flatMap(m -> 
m.getWriteStatuses().stream().map(HoodieInternalWriteStatus::getStat)).collect(Collectors.toList());
-    dataSourceInternalWriterHelper.commit(writeStatList);
+    List<WriteStatus> writeStatuses = Arrays.stream(messages).map(m -> 
(HoodieWriterCommitMessage) m)
+        .flatMap(m -> 
m.getWriteStatuses().stream()).collect(Collectors.toList());
+    dataSourceInternalWriterHelper.commit(writeStatuses);
   }
 
   @Override
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java
 
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java
index 1644a6d4ccc..d4a9e82d465 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.internal;
 
-import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
 
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
 
@@ -30,7 +30,7 @@ import java.util.List;
 public class HoodieWriterCommitMessage extends BaseWriterCommitMessage
     implements WriterCommitMessage {
 
-  public HoodieWriterCommitMessage(List<HoodieInternalWriteStatus> 
writeStatuses) {
+  public HoodieWriterCommitMessage(List<WriteStatus> writeStatuses) {
     super(writeStatuses);
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java
 
b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java
index fb5f609d79e..be6a1ebe7bf 100644
--- 
a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java
+++ 
b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java
@@ -19,8 +19,7 @@
 package org.apache.hudi.spark3.internal;
 
 import org.apache.hudi.DataSourceUtils;
-import org.apache.hudi.client.HoodieInternalWriteStatus;
-import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.internal.DataSourceInternalWriterHelper;
@@ -88,9 +87,9 @@ public class HoodieDataSourceInternalBatchWrite implements 
BatchWrite {
 
   @Override
   public void commit(WriterCommitMessage[] messages) {
-    List<HoodieWriteStat> writeStatList = Arrays.stream(messages).map(m -> 
(HoodieWriterCommitMessage) m)
-        .flatMap(m -> 
m.getWriteStatuses().stream().map(HoodieInternalWriteStatus::getStat)).collect(Collectors.toList());
-    dataSourceInternalWriterHelper.commit(writeStatList);
+    List<WriteStatus> writeStatuses = Arrays.stream(messages).map(m -> 
(HoodieWriterCommitMessage) m)
+        .flatMap(m -> 
m.getWriteStatuses().stream()).collect(Collectors.toList());
+    dataSourceInternalWriterHelper.commit(writeStatuses);
   }
 
   @Override
diff --git 
a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieWriterCommitMessage.java
 
b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieWriterCommitMessage.java
index 7fe787deb8a..bc2904f1ba1 100644
--- 
a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieWriterCommitMessage.java
+++ 
b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieWriterCommitMessage.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.spark3.internal;
 
-import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.internal.BaseWriterCommitMessage;
 import org.apache.spark.sql.connector.write.WriterCommitMessage;
 
@@ -30,7 +30,7 @@ import java.util.List;
 public class HoodieWriterCommitMessage extends BaseWriterCommitMessage
     implements WriterCommitMessage {
 
-  public HoodieWriterCommitMessage(List<HoodieInternalWriteStatus> 
writeStatuses) {
+  public HoodieWriterCommitMessage(List<WriteStatus> writeStatuses) {
     super(writeStatuses);
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
index 0df91b4e169..d26c8284191 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
@@ -45,10 +45,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;


Reply via email to