This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 73ff4ea3aa4f perf(metadata): Parse RLI instant time once per batch 
instead of per … (#18965)
73ff4ea3aa4f is described below

commit 73ff4ea3aa4fb0c6b7b3e5be1c0210c72faa532c
Author: voonhous <[email protected]>
AuthorDate: Sat Jun 13 17:37:16 2026 +0800

    perf(metadata): Parse RLI instant time once per batch instead of per … 
(#18965)
    
    * perf(metadata): Parse RLI instant time once per batch instead of per 
record
    
    createRecordIndexUpdate parsed the commit instant time string into epoch
    millis for every record although the instant is constant for the whole
    commit. Add a millis-based overload plus a public
    parseRecordIndexInstantTime helper and hoist the parse out of the
    per-record loops in base-file RLI generation, revived-key processing,
    record key iterators, RLI initialization and the Flink index write
    path. RecordIndexMapper memoizes per write status since its delegate
    locations can carry different instants.
    
    On the read side, getLocationFromRecordIndexInfo formatted the instant
    millis back to a string per looked-up record. Cache the formatting in a
    per-thread bounded map that is invalidated when the JVM default time
    zone changes, since formatDate depends on it.
    
    * review: Print raw epoch millis in the invalid-fileId error message
    
    Reformatting the millis through HoodieInstantTimeGenerator.formatDate
    raised a timezone question and is not always byte-identical to the
    original instant string (legacy second-granularity instants gain the
    compatibility-parse millis suffix), so print the unambiguous raw
    millis instead on this error path.
    
    * review: Drop the read-side formatted-instant cache
    
    Restore the direct per-call formatting in getLocationFromRecordIndexInfo;
    the cache needed zone-change invalidation to stay correct, which is more
    machinery than the formatting cost justifies. The PR is now write-side
    only (parse hoisting).
    
    * review: Print instantTime, not raw millis, in the invalid-fileId error
    
    Log the human-readable instant time in the error message instead of raw
    epoch millis. Reconstruct it only on the cold catch path via
    TimelineUtils.formatDate(new Date(instantTimeMillis)) (the inverse of
    parseDateFromInstantTime), so the hot per-record path keeps using the
    pre-parsed millis and pays nothing.
    
    * docs(metadata): Document how instantTimeMillis must be parsed and its 
timezone
    
    The millis overload of createRecordIndexUpdate now documents that 
instantTimeMillis
    must come from parseRecordIndexInstantTime(String) (delegating to
    TimelineUtils.parseDateFromInstantTime), not hand-constructed, and that the
    instant-time string is interpreted in the JVM default time zone
    (ZoneId.systemDefault()), yielding epoch millis since 1970-01-01T00:00:00Z.
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |  4 +-
 .../apache/hudi/metadata/RecordIndexMapper.java    |  9 +++-
 .../hudi/metadata/BaseFileRecordParsingUtils.java  |  3 +-
 .../hudi/metadata/HoodieMetadataPayload.java       | 42 ++++++++++++++--
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 10 ++--
 .../hudi/metadata/TestHoodieTableMetadataUtil.java | 58 ++++++++++++++++++++++
 .../hudi/sink/partitioner/index/IndexRowUtils.java |  4 +-
 .../sink/partitioner/index/IndexWriteFunction.java |  4 +-
 8 files changed, 120 insertions(+), 14 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 2131080a0ac2..7e75b71b14f4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -1011,11 +1011,11 @@ public abstract class 
HoodieBackedTableMetadataWriter<I, O> implements HoodieTab
           .withShouldUseRecordPosition(false)
           .withProps(metaClient.getTableConfig().getProps())
           .build();
-      String baseFileInstantTime = fileSlice.getBaseInstantTime();
+      long baseFileInstantTimeMillis = 
HoodieMetadataPayload.parseRecordIndexInstantTime(fileSlice.getBaseInstantTime());
       return new 
CloseableMappingIterator<>(fileGroupReader.getClosableIterator(), record -> {
         String recordKey = 
readerContext.getRecordContext().getRecordKey(record, requestedSchema);
         return HoodieMetadataPayload.createRecordIndexUpdate(recordKey, 
partition, fileId,
-            baseFileInstantTime, 0);
+            baseFileInstantTimeMillis, 0);
       });
     });
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/RecordIndexMapper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/RecordIndexMapper.java
index 329e74e60676..4b736768ebc2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/RecordIndexMapper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/RecordIndexMapper.java
@@ -30,7 +30,9 @@ import org.apache.hudi.exception.HoodieMetadataException;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Mapper for Record Level Index (RLI).
@@ -45,6 +47,8 @@ public class RecordIndexMapper extends MetadataIndexMapper {
   @Override
   protected List<HoodieRecord> generateRecords(WriteStatus writeStatus) {
     List<HoodieRecord> allRecords = new ArrayList<>();
+    // delegates of one write status share at most a few distinct instants, so 
memoize the parse
+    Map<String, Long> instantTimeMillisCache = new HashMap<>();
     for (HoodieRecordDelegate recordDelegate : 
writeStatus.getIndexStats().getWrittenRecordDelegates()) {
       if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) {
         if (recordDelegate.isIgnoreIndexUpdate()) {
@@ -68,7 +72,10 @@ public class RecordIndexMapper extends MetadataIndexMapper {
             // Insert new record case
             hoodieRecord = HoodieMetadataPayload.createRecordIndexUpdate(
                 recordDelegate.getRecordKey(), 
recordDelegate.getPartitionPath(),
-                newLocation.get().getFileId(), 
newLocation.get().getInstantTime(), dataWriteConfig.getWritesFileIdEncoding());
+                newLocation.get().getFileId(),
+                instantTimeMillisCache.computeIfAbsent(
+                    newLocation.get().getInstantTime(), 
HoodieMetadataPayload::parseRecordIndexInstantTime),
+                dataWriteConfig.getWritesFileIdEncoding());
             allRecords.add(hoodieRecord);
           }
         } else {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java
index e4d83111ae88..6b92f9dcf39b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java
@@ -77,9 +77,10 @@ public class BaseFileRecordParsingUtils {
         recordStatuses);
     List<HoodieRecord> hoodieRecords = new ArrayList<>();
     if (recordStatusListMap.containsKey(RecordStatus.INSERT)) {
+      long instantTimeMillis = 
HoodieMetadataPayload.parseRecordIndexInstantTime(instantTime);
       
hoodieRecords.addAll(recordStatusListMap.get(RecordStatus.INSERT).stream()
           .map(recordKey -> (HoodieRecord) 
HoodieMetadataPayload.createRecordIndexUpdate(recordKey, partition, fileId,
-              instantTime, writesFileIdEncoding)).collect(toList()));
+              instantTimeMillis, writesFileIdEncoding)).collect(toList()));
     }
 
     if (recordStatusListMap.containsKey(RecordStatus.DELETE)) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index ac509fce962e..325a7a5af6ba 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -60,6 +60,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -649,14 +650,46 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
    */
   public static HoodieRecord<HoodieMetadataPayload> 
createRecordIndexUpdate(String recordKey, String partition,
                                                                             
String fileId, String instantTime, int fileIdEncoding) {
+    return createRecordIndexUpdate(recordKey, partition, fileId, 
parseRecordIndexInstantTime(instantTime), fileIdEncoding);
+  }
 
-    HoodieKey key = new HoodieKey(recordKey, 
MetadataPartitionType.RECORD_INDEX.getPartitionPath());
-    long instantTimeMillis = -1;
+  /**
+   * Parses an instant time into the epoch millis stored in record index 
entries.
+   * <p>
+   * Callers creating record index updates for many records of the same commit 
should parse the
+   * instant time once with this method and use the millis-based
+   * {@link #createRecordIndexUpdate(String, String, String, long, int)} 
overload per record.
+   */
+  public static long parseRecordIndexInstantTime(String instantTime) {
     try {
-      instantTimeMillis = 
TimelineUtils.parseDateFromInstantTime(instantTime).getTime();
+      return TimelineUtils.parseDateFromInstantTime(instantTime).getTime();
     } catch (Exception e) {
       throw new HoodieMetadataException("Failed to create metadata payload for 
record index. Instant time parsing for " + instantTime + " failed ", e);
     }
+  }
+
+  /**
+   * Create and return a {@code HoodieMetadataPayload} to insert or update an 
entry for the record index.
+   * <p>
+   * Same as {@link #createRecordIndexUpdate(String, String, String, String, 
int)} but takes the
+   * instant time already parsed to epoch millis, so per-commit callers parse 
it only once.
+   * <p>
+   * {@code instantTimeMillis} must be obtained from {@link 
#parseRecordIndexInstantTime(String)} (which
+   * delegates to {@link TimelineUtils#parseDateFromInstantTime(String)}); it 
should not be constructed by
+   * hand, so that the value stored here matches what the String overload 
would write. The instant-time
+   * string is interpreted in the JVM default time zone ({@link 
java.time.ZoneId#systemDefault()}) and the
+   * result is epoch milliseconds (since 1970-01-01T00:00:00Z).
+   *
+   * @param recordKey         Key of the record
+   * @param partition         Name of the partition which contains the record
+   * @param fileId            fileId which contains the record
+   * @param instantTimeMillis epoch millis of the instant when the record was 
added, as returned by
+   *                          {@link #parseRecordIndexInstantTime(String)}
+   */
+  public static HoodieRecord<HoodieMetadataPayload> 
createRecordIndexUpdate(String recordKey, String partition,
+                                                                            
String fileId, long instantTimeMillis, int fileIdEncoding) {
+
+    HoodieKey key = new HoodieKey(recordKey, 
MetadataPartitionType.RECORD_INDEX.getPartitionPath());
     if (fileIdEncoding == 0) {
       // Data file names have a -D suffix to denote the index (D = integer) of 
the file written
       // In older HUID versions the file index was missing
@@ -672,8 +705,9 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
           fileIndex = Integer.parseInt(fileId.substring(index + 1));
         }
       } catch (Exception e) {
+        // reconstruct the instant time only on this cold error path; the hot 
per-record path keeps the pre-parsed millis
         throw new HoodieMetadataException(String.format("Invalid UUID or 
index: fileID=%s, partition=%s, instantTime=%s",
-            fileId, partition, instantTime), e);
+            fileId, partition, TimelineUtils.formatDate(new 
Date(instantTimeMillis))), e);
       }
 
       HoodieMetadataPayload payload = new HoodieMetadataPayload(recordKey,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 0ede3ae5015e..23f20fa4efea 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -144,6 +144,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -940,8 +941,9 @@ public class HoodieTableMetadataUtil {
               Set<String> revivedKeys = revivedAndDeletedKeys.getLeft();
               Set<String> deletedKeys = revivedAndDeletedKeys.getRight();
               // Process revived keys to create updates
+              long instantTimeMillis = 
HoodieMetadataPayload.parseRecordIndexInstantTime(instantTime);
               List<HoodieRecord> revivedRecords = revivedKeys.stream()
-                  .map(recordKey -> 
HoodieMetadataPayload.createRecordIndexUpdate(recordKey, partitionPath, fileId, 
instantTime, writesFileIdEncoding))
+                  .map(recordKey -> 
HoodieMetadataPayload.createRecordIndexUpdate(recordKey, partitionPath, fileId, 
instantTimeMillis, writesFileIdEncoding))
                   .collect(Collectors.toList());
               // Process deleted keys to create deletes
               List<HoodieRecord> deletedRecords = deletedKeys.stream()
@@ -2499,7 +2501,7 @@ public class HoodieTableMetadataUtil {
       fileId = originalFileId;
     }
 
-    final java.util.Date instantDate = new java.util.Date(instantTime);
+    final Date instantDate = new Date(instantTime);
     return new HoodieRecordGlobalLocation(partition, 
HoodieInstantTimeGenerator.formatDate(instantDate), fileId);
   }
 
@@ -2615,6 +2617,8 @@ public class HoodieTableMetadataUtil {
                                                                         String 
instantTime,
                                                                         
boolean isPartitionedRLI
   ) {
+    // the delete iterator never reads the instant time, so only the update 
path pays the parse
+    final long instantTimeMillis = forDelete ? -1L : 
HoodieMetadataPayload.parseRecordIndexInstantTime(instantTime);
     return new ClosableIterator<HoodieRecord>() {
       @Override
       public void close() {
@@ -2630,7 +2634,7 @@ public class HoodieTableMetadataUtil {
       public HoodieRecord next() {
         return forDelete
             ? 
HoodieMetadataPayload.createRecordIndexDelete(recordKeyIterator.next(), 
partition, isPartitionedRLI)
-            : 
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), 
partition, fileId, instantTime, 0);
+            : 
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), 
partition, fileId, instantTimeMillis, 0);
       }
     };
   }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
 
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 95023eebe695..6cab9b9067c3 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -21,22 +21,27 @@ package org.apache.hudi.metadata;
 import org.apache.hudi.common.function.SerializableBiFunction;
 import org.apache.hudi.common.model.HoodieIndexDefinition;
 import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
 import org.apache.hudi.common.util.Option;
 
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TimeZone;
 
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS;
@@ -352,4 +357,57 @@ class TestHoodieTableMetadataUtil {
           "STRING should remain supported for record type " + recordType);
     }
   }
+
+  @Test
+  void testCreateRecordIndexUpdateMillisOverloadMatchesStringOverload() {
+    String instantTime = "20260610153045678";
+    long instantTimeMillis = 
HoodieMetadataPayload.parseRecordIndexInstantTime(instantTime);
+
+    // uuid-encoded fileId (encoding 0)
+    HoodieRecord<HoodieMetadataPayload> fromString = 
HoodieMetadataPayload.createRecordIndexUpdate(
+        "rk1", "p1", "49b8b3c8-9e5d-4731-9d51-a2d8e9b5c7f3-0", instantTime, 0);
+    HoodieRecord<HoodieMetadataPayload> fromMillis = 
HoodieMetadataPayload.createRecordIndexUpdate(
+        "rk1", "p1", "49b8b3c8-9e5d-4731-9d51-a2d8e9b5c7f3-0", 
instantTimeMillis, 0);
+    assertEquals(fromString.getKey(), fromMillis.getKey());
+    assertEquals(fromString.getData(), fromMillis.getData());
+
+    // raw fileId (encoding 1)
+    fromString = HoodieMetadataPayload.createRecordIndexUpdate(
+        "rk1", "p1", "some-raw-file-id", instantTime, 1);
+    fromMillis = HoodieMetadataPayload.createRecordIndexUpdate(
+        "rk1", "p1", "some-raw-file-id", instantTimeMillis, 1);
+    assertEquals(fromString.getKey(), fromMillis.getKey());
+    assertEquals(fromString.getData(), fromMillis.getData());
+  }
+
+  @Test
+  void testGetLocationFromRecordIndexInfoFormatsInstantConsistently() {
+    long instantMillis1 = 
HoodieMetadataPayload.parseRecordIndexInstantTime("20260610153045678");
+    long instantMillis2 = 
HoodieMetadataPayload.parseRecordIndexInstantTime("20260610163045678");
+    String expected1 = HoodieInstantTimeGenerator.formatDate(new 
Date(instantMillis1));
+    String expected2 = HoodieInstantTimeGenerator.formatDate(new 
Date(instantMillis2));
+    // repeated and alternating instants must format consistently
+    for (long instantMillis : new long[] {instantMillis1, instantMillis1, 
instantMillis2, instantMillis1}) {
+      HoodieRecordGlobalLocation location = 
HoodieTableMetadataUtil.getLocationFromRecordIndexInfo(
+          "p1", 1, -1L, -1L, -1, "some-raw-file-id", instantMillis);
+      assertEquals(instantMillis == instantMillis1 ? expected1 : expected2, 
location.getInstantTime());
+      assertEquals("p1", location.getPartitionPath());
+      assertEquals("some-raw-file-id", location.getFileId());
+    }
+
+    // formatDate follows the JVM default time zone, so the decoded location 
must track a zone
+    // change; the two switches differ in offset, so at least one changes the 
formatted string
+    TimeZone originalTimeZone = TimeZone.getDefault();
+    try {
+      for (String zoneId : new String[] {"UTC", "Asia/Kolkata"}) {
+        TimeZone.setDefault(TimeZone.getTimeZone(zoneId));
+        String expectedInZone = HoodieInstantTimeGenerator.formatDate(new 
Date(instantMillis1));
+        HoodieRecordGlobalLocation location = 
HoodieTableMetadataUtil.getLocationFromRecordIndexInfo(
+            "p1", 1, -1L, -1L, -1, "some-raw-file-id", instantMillis1);
+        assertEquals(expectedInZone, location.getInstantTime());
+      }
+    } finally {
+      TimeZone.setDefault(originalTimeZone);
+    }
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexRowUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexRowUtils.java
index 4ea6f06e7551..5e8a04c2e15c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexRowUtils.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexRowUtils.java
@@ -70,7 +70,7 @@ public class IndexRowUtils {
     return indexRow;
   }
 
-  public static HoodieRecord convertToHoodieRecord(String instant, RowData 
indexRow, HoodieWriteConfig dataWriteConfig) {
+  public static HoodieRecord convertToHoodieRecord(long instantMillis, RowData 
indexRow, HoodieWriteConfig dataWriteConfig) {
     if (indexRow.getByte(INDEX_TYPE_ORD) == RLI_TYPE) {
       switch (indexRow.getRowKind()) {
         case INSERT:
@@ -78,7 +78,7 @@ public class IndexRowUtils {
               String.valueOf(indexRow.getString(KEY_ORD)),
               String.valueOf(indexRow.getString(PARTITION_ORD)),
               String.valueOf(indexRow.getString(FILE_ID_ORD)),
-              instant,
+              instantMillis,
               dataWriteConfig.getWritesFileIdEncoding());
         case DELETE:
           return HoodieMetadataPayload.createRecordIndexDelete(
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
index 8e64d58a97b3..066f5aa62eb6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
 import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.exception.MemoryPagesExhaustedException;
@@ -174,11 +175,12 @@ public class IndexWriteFunction extends 
AbstractStreamWriteFunction<RowData> {
     HoodieWriteConfig writeConfig = writeClient.getConfig();
     // deduplicate the index records using commit time ordering.
     Map<String, HoodieRecord> keyAndRecordMap = new LinkedHashMap<>();
+    long currentInstantMillis = 
HoodieMetadataPayload.parseRecordIndexInstantTime(this.currentInstant);
     while (rowItr.hasNext()) {
       RowData indexRow = rowItr.next();
       keyAndRecordMap.put(
           dedupKeyExtractor.apply(indexRow),
-          IndexRowUtils.convertToHoodieRecord(this.currentInstant, indexRow, 
writeConfig));
+          IndexRowUtils.convertToHoodieRecord(currentInstantMillis, indexRow, 
writeConfig));
       dataPartitions.add(IndexRowUtils.getPartition(indexRow));
     }
     return Pair.of(new ArrayList<>(keyAndRecordMap.values()), dataPartitions);

Reply via email to