nsivabalan commented on code in PR #5581:
URL: https://github.com/apache/hudi/pull/5581#discussion_r1027105129


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -379,10 +380,12 @@ private void 
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<Wr
           + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 
0L).map(s -> s.getFileId()).collect(Collectors.joining(",")));
     }
     final HoodieInstant clusteringInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime);
+
     try {
       this.txnManager.beginTransaction(Option.of(clusteringInstant), 
Option.empty());
       finalizeWrite(table, clusteringCommitTime, writeStats);
-      writeTableMetadataForTableServices(table, metadata,clusteringInstant);
+      preCommit(clusteringInstant, metadata);

Review Comment:
   Looking at this fix, I was bumbed that we did not have preCommit even for 
compaction. (preCommit is where we do conflict resolution).
   
   Probably it was designed that way. 
   1. For compaction: conflict resolution is done during compaction planning. 
So, there is no other condition with which we might abort compaction during 
completion. so probably thats why it was left out. 
   2. For clustering: for most common scenario (update reject strategy), again, 
an incoming write  if overlaps w/ clustering file groups(to be replaced), it 
will be aborted right away. And new file groups that are being created by 
clustering is not known to any new writes until clustering completes. So, there 
is no real necessity to do conflict resolution during clustering completion. 
   
   Did you identify any other gaps wrt conflict resolution ? 
   



##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -249,7 +242,36 @@ private void 
mayBeHandleSpuriousDeletes(Option<HoodieRecord<HoodieMetadataPayloa
 
   protected abstract Option<HoodieRecord<HoodieMetadataPayload>> 
getRecordByKey(String key, String partitionName);
 
-  protected abstract List<Pair<String, 
Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> 
key, String partitionName);
+  protected abstract Map<String, HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeys(List<String> keys, String partitionName);
+
+  /**
+   * Reads record keys from record-level index.
+   *
+   * If the Metadata Table is not enabled, an exception is thrown to 
distinguish this from the absence of the key.
+   *
+   * @param recordKeys The list of record keys to read
+   */
+  @Override
+  public Map<String, HoodieRecordGlobalLocation> readRecordIndex(List<String> 
recordKeys) {

Review Comment:
   List of records is gonna scale. we need to fix this as HoodieData.



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -41,13 +43,16 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
 
   public static final String METADATA_PREFIX = "hoodie.metadata";
 
-  // Enable the internal Metadata Table which saves file listings
+  // Create the Metadata Table (MDT) which saves file listings
   public static final ConfigProperty<Boolean> ENABLE = ConfigProperty
       .key(METADATA_PREFIX + ".enable")
       .defaultValue(false)
       .sinceVersion("0.7.0")
-      .withDocumentation("Enable the internal metadata table which serves 
table metadata like level file listings");
+      .withDocumentation("Create and use the metadata table (MDT) which serves 
table metadata like file listings "
+          + "and indexes. If set to true, MDT will be created. Once created, 
this setting only controls if the MDT "
+          + "will be used on reader side. MDT cannot be disabled/removed by 
setting this to false.");

Review Comment:
   actually we have fix this flow in master. ie. if on write path, if metadata 
was disabled and hudi detects that metadata exists, it will delete the metadata 
table. 
   



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -136,23 +161,91 @@ public static HoodieRecord<HoodieMetadataPayload> 
createPartitionFilesRecord(Str
     return new HoodieRecord<>(key, payload);
   }
 
+  /**
+   * Create and return a {@code HoodieMetadataPayload} to insert or update an 
entry for the record index.
+   *
+   * Each entry maps the key of a single record in HUDI to its location.
+   *
+   * @param recordKey Key of the record
+   * @param partition Name of the partition which contains the record
+   * @param fileId fileId which contains the record
+   * @param instantTime instantTime when the record was added
+   */
+  public static HoodieRecord<HoodieMetadataPayload> 
createRecordIndexUpdate(String recordKey, String partition,
+      String fileId, String instantTime) {
+    HoodieKey key = new HoodieKey(recordKey, 
MetadataPartitionType.RECORD_INDEX.partitionPath());
+    // Data file names have a -D suffix to denote the index (D = integer) of 
the file written
+    // In older HUID versions the file index was missing
+    final UUID uuid;
+    final int fileIndex;
+    try {
+      if (fileId.length() == 36) {
+        uuid = UUID.fromString(fileId);
+        fileIndex = MISSING_FILEINDEX;
+      } else {
+        final int index = fileId.lastIndexOf("-");
+        uuid = UUID.fromString(fileId.substring(0, index));
+        fileIndex = Integer.parseInt(fileId.substring(index + 1));
+      }
+    } catch (Exception e) {
+      throw new HoodieMetadataException(String.format("Invalid UUID or index: 
fileID=%s, partition=%s, instantTIme=%s",
+          fileId, partition, instantTime), e);
+    }
+
+    // Store instantTime as milliseconds sinch epoch. Using an int here allows 
dates till year 2038.
+    Date instantDate;
+    try {
+      instantDate = HoodieActiveTimeline.parseDateFromInstantTime(instantTime);
+    } catch (Exception e) {
+      throw new HoodieMetadataException("Invalid instantTime format: " + 
instantTime, e);
+    }
+
+    HoodieMetadataPayload payload = new HoodieMetadataPayload(recordKey, new 
HoodieRecordIndexInfo(partition,
+        uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 
fileIndex,
+        instantDate.getTime()));
+    return new HoodieRecord<>(key, payload);
+  }
+
+  /**
+   * Create and return a {@code HoodieMetadataPayload} to delete a record in 
the Metadata Table's record index.
+   *
+   * @param recordKey Key of the record to be deleted
+   */
+  public static HoodieRecord<HoodieMetadataPayload> 
createRecordIndexDelete(String recordKey) {
+    HoodieKey key = new HoodieKey(recordKey, 
MetadataPartitionType.RECORD_INDEX.partitionPath());
+    HoodieMetadataPayload payload = new HoodieMetadataPayload();
+    return new HoodieRecord<>(key, payload);
+  }
+
   @Override
   public HoodieMetadataPayload preCombine(HoodieMetadataPayload 
previousRecord) {
-    ValidationUtils.checkArgument(previousRecord.type == type,
-        "Cannot combine " + previousRecord.type  + " with " + type);
-
-    Map<String, HoodieMetadataFileInfo> combinedFileInfo = null;
+    if (previousRecord.type != type) {
+      throw new HoodieMetadataException("Cannot combine " + 
previousRecord.type  + " with " + type);
+    }
+    if (!previousRecord.key.equals(key)) {
+      throw new HoodieMetadataException("Cannot combine record with key " + 
previousRecord.key + " with " + key);
+    }
 
     switch (type) {
       case PARTITION_LIST:
       case FILE_LIST:
-        combinedFileInfo = combineFilesystemMetadata(previousRecord);
-        break;
+        Map<String, HoodieMetadataFileInfo> combinedFileInfo = 
combineFilesystemMetadata(previousRecord);
+        return new HoodieMetadataPayload(key, type, combinedFileInfo);
+      case RECORD_INDEX:
+        // TODO: does not work with updates
+        if (previousRecord.recordIndexInfo.getInstantTime() != 
recordIndexInfo.getInstantTime()) {
+          throw new HoodieMetadataException(String.format("InstantTime for %s 
should not change from %s to %s", previousRecord.key,
+                previousRecord, toString()));
+        }
+        // TODO: This does not work with clustering
+        if 
(!previousRecord.getRecordGlobalLocation().equals(getRecordGlobalLocation())) {
+          throw new HoodieMetadataException(String.format("Location for %s 
should not change from %s to %s", previousRecord.key,

Review Comment:
   why do we need this check. w/ clustering, records could move from 1 location 
to another right. any harm in removing this? 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -169,19 +169,23 @@ protected void commit(HoodieData<HoodieRecord> 
hoodieDataRecords, String partiti
     metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata));
   }
 
-  /**
-   * Tag each record with the location in the given partition.
-   *
-   * The record is tagged with respective file slice's location based on its 
record key.
-   */
-  private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, 
String partitionName, int numFileGroups) {
-    List<FileSlice> fileSlices = 
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
partitionName);
-    ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, 
String.format("Invalid number of file groups: found=%d, required=%d", 
fileSlices.size(), numFileGroups));
-
-    return recordsRDD.map(r -> {
-      FileSlice slice = 
fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
 numFileGroups));
-      r.setCurrentLocation(new 
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
-      return r;
-    });
+  @Override
+  protected void commit(HoodieData<HoodieRecord> records, String instantTime, 
boolean canTriggerTableService) {
+    commitInternal(records, instantTime, canTriggerTableService, 
Option.empty());
+  }
+
+  @Override
+  protected void commit(HoodieData<HoodieRecord> records, String instantTime, 
String partitionName,
+      int fileGroupCount) {
+    LOG.info("Performing bulk insert for partition " + partitionName + " with 
" + fileGroupCount + " file groups");
+    SparkHoodieMetadataBulkInsertPartitioner partitioner = new 
SparkHoodieMetadataBulkInsertPartitioner(fileGroupCount);

Review Comment:
   good optimization 👏 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieMetadataBulkInsertPartitioner.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+
+import scala.Tuple2;
+
+/**
+ * A {@code BulkInsertPartitioner} implementation for Metadata Table to 
improve performance of initialization of metadata
+ * table partition when a very large number of records are inserted.
+ *
+ * This partitioner requires the records to tbe already tagged with the 
appropriate file slice.
+ */
+public class SparkHoodieMetadataBulkInsertPartitioner implements 
BulkInsertPartitioner<JavaRDD<HoodieRecord>>, Serializable {
+
+  private class FileGroupPartitioner extends Partitioner {
+    private int numFileGroups;
+
+    public FileGroupPartitioner(int numFileGroups) {
+      this.numFileGroups = numFileGroups;
+    }
+
+    @Override
+    public int getPartition(Object key) {
+      return ((Tuple2<Integer, String>)key)._1;
+    }
+
+    @Override
+    public int numPartitions() {
+      return numFileGroups;
+    }
+  }
+
+  // The file group count in the partition
+  private int fileGroupCount;
+  // FileIDs for the various partitions
+  private List<String> fileIDPfxs;
+
+  public SparkHoodieMetadataBulkInsertPartitioner(int fileGroupCount) {
+    this.fileGroupCount = fileGroupCount;
+  }
+
+  @Override
+  public JavaRDD<HoodieRecord> repartitionRecords(JavaRDD<HoodieRecord> 
records, int outputSparkPartitions) {
+    Comparator<Tuple2<Integer, String>> keyComparator = 
(Comparator<Tuple2<Integer, String>> & Serializable)(t1, t2) -> {
+      return t1._2.compareTo(t2._2);
+    };
+
+    // Partition the records by their location
+    JavaRDD<HoodieRecord> partitionedRDD = records
+        .keyBy(r -> new Tuple2<Integer, 
String>(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), 
fileGroupCount), r.getRecordKey()))
+        .repartitionAndSortWithinPartitions(new 
FileGroupPartitioner(fileGroupCount), keyComparator)
+        .map(t -> t._2);
+    ValidationUtils.checkArgument(partitionedRDD.getNumPartitions() <= 
fileGroupCount,
+        String.format("Partitioned RDD has more partitions %d than the 
fileGroupCount %d", partitionedRDD.getNumPartitions(), fileGroupCount));
+
+    fileIDPfxs = partitionedRDD.mapPartitions(recordItr -> {
+      // Due to partitioning, all record in the partition should have same 
fileID
+      List<String> fileIds = new ArrayList<>(1);
+      if (recordItr.hasNext()) {
+        HoodieRecord record = recordItr.next();
+        final String fileID = record.getCurrentLocation().getFileId();
+        // Remove the write-token from the fileID as we need to return only 
the prefix
+        int index = fileID.lastIndexOf("-");

Review Comment:
   lets move this to FSUtils or something like PathUtils. i.e. parsing fileID 
given a file name



##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java:
##########
@@ -79,4 +89,21 @@ public void close() {
   public long getTotalRecords() {
     return parquetUtils.getRowCount(conf, path);
   }
+
+  @Override
+  public Iterator<String> getRecordKeyIterator() throws IOException {
+    Iterator<R> recordIterator = 
getRecordIterator(HoodieAvroUtils.getRecordKeySchema());
+    return new Iterator<String>() {
+      @Override
+      public boolean hasNext() {
+        return recordIterator.hasNext();
+      }
+
+      @Override
+      public String next() {

Review Comment:
   we can probably use parquetUtils.readRowKeys()



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieMetadataBulkInsertPartitioner.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+
+import scala.Tuple2;
+
+/**
+ * A {@code BulkInsertPartitioner} implementation for Metadata Table to 
improve performance of initialization of metadata
+ * table partition when a very large number of records are inserted.
+ *
+ * This partitioner requires the records to tbe already tagged with the 
appropriate file slice.
+ */
+public class SparkHoodieMetadataBulkInsertPartitioner implements 
BulkInsertPartitioner<JavaRDD<HoodieRecord>>, Serializable {
+
+  private class FileGroupPartitioner extends Partitioner {
+    private int numFileGroups;
+
+    public FileGroupPartitioner(int numFileGroups) {
+      this.numFileGroups = numFileGroups;
+    }
+
+    @Override
+    public int getPartition(Object key) {
+      return ((Tuple2<Integer, String>)key)._1;
+    }
+
+    @Override
+    public int numPartitions() {
+      return numFileGroups;
+    }
+  }
+
+  // The file group count in the partition
+  private int fileGroupCount;
+  // FileIDs for the various partitions
+  private List<String> fileIDPfxs;
+
+  public SparkHoodieMetadataBulkInsertPartitioner(int fileGroupCount) {
+    this.fileGroupCount = fileGroupCount;
+  }
+
+  @Override
+  public JavaRDD<HoodieRecord> repartitionRecords(JavaRDD<HoodieRecord> 
records, int outputSparkPartitions) {
+    Comparator<Tuple2<Integer, String>> keyComparator = 
(Comparator<Tuple2<Integer, String>> & Serializable)(t1, t2) -> {
+      return t1._2.compareTo(t2._2);
+    };
+
+    // Partition the records by their location
+    JavaRDD<HoodieRecord> partitionedRDD = records
+        .keyBy(r -> new Tuple2<Integer, 
String>(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), 
fileGroupCount), r.getRecordKey()))
+        .repartitionAndSortWithinPartitions(new 
FileGroupPartitioner(fileGroupCount), keyComparator)
+        .map(t -> t._2);
+    ValidationUtils.checkArgument(partitionedRDD.getNumPartitions() <= 
fileGroupCount,
+        String.format("Partitioned RDD has more partitions %d than the 
fileGroupCount %d", partitionedRDD.getNumPartitions(), fileGroupCount));
+
+    fileIDPfxs = partitionedRDD.mapPartitions(recordItr -> {
+      // Due to partitioning, all record in the partition should have same 
fileID
+      List<String> fileIds = new ArrayList<>(1);
+      if (recordItr.hasNext()) {
+        HoodieRecord record = recordItr.next();
+        final String fileID = record.getCurrentLocation().getFileId();
+        // Remove the write-token from the fileID as we need to return only 
the prefix
+        int index = fileID.lastIndexOf("-");
+        fileIds.add(fileID.substring(0, index));
+      }
+      // Remove the file-index since we want to
+      return fileIds.iterator();
+    }, true).collect();
+    ValidationUtils.checkArgument(partitionedRDD.getNumPartitions() == 
fileIDPfxs.size(),
+        String.format("Generated fileIDPfxs (%d) are lesser in size than the 
partitions %d", fileIDPfxs.size(), partitionedRDD.getNumPartitions()));
+
+    return partitionedRDD;
+  }
+
+  @Override
+  public boolean arePartitionRecordsSorted() {
+    return true;
+  }
+
+  @Override
+  public List<String> generateFileIDPfxs(int numPartitions) {

Review Comment:
   bcoz, within SparkBulkInsertHelper, we are not leveraging this new method. 
looks like we are generating inline there. 
   L110 in SparkBulkInsertHelper
   ```
       // generate new file ID prefixes for each output partition
       final List<String> fileIDPrefixes =
           IntStream.range(0, parallelism).mapToObj(i -> 
FSUtils.createNewFileIdPfx()).collect(Collectors.toList());
   ```



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieMetadataBulkInsertPartitioner.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+
+import scala.Tuple2;
+
+/**
+ * A {@code BulkInsertPartitioner} implementation for Metadata Table to 
improve performance of initialization of metadata
+ * table partition when a very large number of records are inserted.
+ *
+ * This partitioner requires the records to tbe already tagged with the 
appropriate file slice.
+ */
+public class SparkHoodieMetadataBulkInsertPartitioner implements 
BulkInsertPartitioner<JavaRDD<HoodieRecord>>, Serializable {
+
+  private class FileGroupPartitioner extends Partitioner {
+    private int numFileGroups;
+
+    public FileGroupPartitioner(int numFileGroups) {
+      this.numFileGroups = numFileGroups;
+    }
+
+    @Override
+    public int getPartition(Object key) {
+      return ((Tuple2<Integer, String>)key)._1;
+    }
+
+    @Override
+    public int numPartitions() {
+      return numFileGroups;
+    }
+  }
+
+  // The file group count in the partition
+  private int fileGroupCount;
+  // FileIDs for the various partitions
+  private List<String> fileIDPfxs;
+
+  public SparkHoodieMetadataBulkInsertPartitioner(int fileGroupCount) {
+    this.fileGroupCount = fileGroupCount;
+  }
+
+  @Override
+  public JavaRDD<HoodieRecord> repartitionRecords(JavaRDD<HoodieRecord> 
records, int outputSparkPartitions) {
+    Comparator<Tuple2<Integer, String>> keyComparator = 
(Comparator<Tuple2<Integer, String>> & Serializable)(t1, t2) -> {
+      return t1._2.compareTo(t2._2);
+    };
+
+    // Partition the records by their location
+    JavaRDD<HoodieRecord> partitionedRDD = records
+        .keyBy(r -> new Tuple2<Integer, 
String>(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), 
fileGroupCount), r.getRecordKey()))
+        .repartitionAndSortWithinPartitions(new 
FileGroupPartitioner(fileGroupCount), keyComparator)
+        .map(t -> t._2);
+    ValidationUtils.checkArgument(partitionedRDD.getNumPartitions() <= 
fileGroupCount,
+        String.format("Partitioned RDD has more partitions %d than the 
fileGroupCount %d", partitionedRDD.getNumPartitions(), fileGroupCount));
+
+    fileIDPfxs = partitionedRDD.mapPartitions(recordItr -> {
+      // Due to partitioning, all record in the partition should have same 
fileID
+      List<String> fileIds = new ArrayList<>(1);
+      if (recordItr.hasNext()) {
+        HoodieRecord record = recordItr.next();
+        final String fileID = record.getCurrentLocation().getFileId();
+        // Remove the write-token from the fileID as we need to return only 
the prefix
+        int index = fileID.lastIndexOf("-");
+        fileIds.add(fileID.substring(0, index));
+      }
+      // Remove the file-index since we want to
+      return fileIds.iterator();
+    }, true).collect();
+    ValidationUtils.checkArgument(partitionedRDD.getNumPartitions() == 
fileIDPfxs.size(),
+        String.format("Generated fileIDPfxs (%d) are lesser in size than the 
partitions %d", fileIDPfxs.size(), partitionedRDD.getNumPartitions()));
+
+    return partitionedRDD;
+  }
+
+  @Override
+  public boolean arePartitionRecordsSorted() {
+    return true;
+  }
+
+  @Override
+  public List<String> generateFileIDPfxs(int numPartitions) {

Review Comment:
   I am not sure where is this used. can you throw some light. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -470,10 +520,116 @@ private boolean 
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
     // During bootstrap, the list of files to be committed can be huge. So 
creating a HoodieCommitMetadata out of these
     // large number of files and calling the existing 
update(HoodieCommitMetadata) function does not scale well.
     // Hence, we have a special commit just for the bootstrap scenario.
-    bootstrapCommit(dirInfoList, createInstantTime);
+    bootstrapCommit(dirInfoList, createInstantTime, 1);
+
+    final long totalFiles = dirInfoList.stream().mapToLong(d -> 
d.getTotalFiles()).sum();
+
+    metrics.ifPresent(m -> 
m.incrementMetric(HoodieMetadataMetrics.INITIALIZE_FILE_LISTING_TIME_STR, 
timer.endTimer()));
+    metrics.ifPresent(m -> 
m.incrementMetric(HoodieMetadataMetrics.READ_FILES_COUNT_STR, totalFiles));
+    return true;
+  }
+
+  /**
+   * Bootstrap the record index.
+   *
+   * @param dataMetaClient {@code HoodieTableMetaClient} for the dataset
+   * @param createInstantTime InstantTime to use for the commit
+   * @param inflightInstantTimestamp
+   * @param partitions List of partitions from which the data files are to be 
read
+   */
+  private boolean bootstrapRecordLevelIndex(HoodieEngineContext engineContext, 
HoodieTableMetaClient dataMetaClient,
+      String createInstantTime, Option<String> inflightInstantTimestamp) 
throws IOException {
+    ValidationUtils.checkState(enabled, "Record level index cannot be 
initialized as Metadata Table is not enabled");
+    ValidationUtils.checkState(dataWriteConfig.createRecordIndex(),
+        "Record level index cannot be initialized as it is not enabled");
+
+    // Starting two timers to time reading of keys and total time to bootstrap
+    HoodieTimer timer = new HoodieTimer().startTimer().startTimer();
+
+    // Collect the list of base files present
+    final List<String> partitions = metadata.getAllPartitionPaths();
+    final HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(dataMetaClient,
+        dataMetaClient.getActiveTimeline(), metadata);
+    final List<Pair<String, String>> partitionBaseFilePairs = new 
ArrayList<>();
+    for (String partition : partitions) {
+      partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition)
+          // ignore base files being created due to the inflight operation
+          .filter(baseFile -> !inflightInstantTimestamp.isPresent()
+              || 
!baseFile.getCommitTime().equals(inflightInstantTimestamp.get()))
+          .map(basefile -> Pair.of(partition, 
basefile.getFileName())).collect(Collectors.toList()));
+    }
+
+    LOG.info("Initializing record index from " + partitionBaseFilePairs.size() 
+ " base files in "
+        + partitions.size() + " partitions");
+
+    // Collect record keys from the files in parallel
+    HoodieData<HoodieRecord> records = 
readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs);
+    records.persist("MEMORY_AND_DISK_SER");
+    final long recordCount = records.count();
+
+    // Initialize the file groups
+    final int fileGroupCount = 
estimateFileGroupCount(MetadataPartitionType.RECORD_INDEX.partitionPath(), 
recordCount,
+        RECORD_INDEX_AVERAGE_RECORD_SIZE, 
dataWriteConfig.getRecordIndexMinFileGroupCount(),
+        dataWriteConfig.getRecordIndexMaxFileGroupCount(), 
dataWriteConfig.getRecordIndexGrowthFactor());
+    initializeFileGroups(MetadataPartitionType.RECORD_INDEX, 
createInstantTime, fileGroupCount);
+    metrics.ifPresent(m -> 
m.incrementMetric(HoodieMetadataMetrics.STAT_COUNT_FILE_GROUP, fileGroupCount));
+
+    if (recordCount > 0) {
+      LOG.info("Initializing record index with " + recordCount + " mappings");
+      metrics.ifPresent(m -> 
m.incrementMetric(HoodieMetadataMetrics.READ_RECORDKEYS_TIME_STR, 
timer.endTimer()));
+      metrics.ifPresent(m -> 
m.incrementMetric(HoodieMetadataMetrics.READ_RECORDKEYS_COUNT_STR, 
recordCount));
+
+      // tag and commit records
+      records = tagRecordsWithLocation(records, 
MetadataPartitionType.RECORD_INDEX.partitionPath());
+      commit(records, createInstantTime, 
MetadataPartitionType.RECORD_INDEX.partitionPath(), fileGroupCount);
+    }
+
+    metrics.ifPresent(m -> 
m.incrementMetric(HoodieMetadataMetrics.INITIALIZE_RECORD_INDEX_TIME_STR, 
timer.endTimer()));
     return true;
   }
 
+  /**
+   * Estimates the file group count to use for a partition.
+   *
+   * @param partitionName Name of the partition for which the file group count 
is to be estimated.
+   * @param recordCount The number of records expected to be written.
+   * @param averageRecordSize Average size of each record to be writen.
+   * @param minFileGroupCount Minimum number of file groups to use.
+   * @param maxFileGroupCount Maximum number of file groups to use.
+   * @param growthFactor By what factor are the records (recordCount) expected 
to grow?
+   * @param maxFileGroupSizeBytes Maximum size of the file group.
+   * @return The estimated number of file groups.
+   */
+  private int estimateFileGroupCount(String partitionName, long recordCount, 
int averageRecordSize, int minFileGroupCount,

Review Comment:
   yes, we should. it definitely helps. 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.execution.PartitionIdPassthrough;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+/**
+ * Hoodie Index implementation backed by the record index present in the 
Metadata Table.
+ */
+public class SparkMetadataTableRecordIndex<T extends HoodieRecordPayload>
+    extends HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> {
+
+  public SparkMetadataTableRecordIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public HoodieData<HoodieRecord<T>> tagLocation(HoodieData<HoodieRecord<T>> 
records, HoodieEngineContext context,
+      HoodieTable hoodieTable) {
+    final int numFileGroups;
+    try {
+      HoodieTableMetaClient metaClient = 
HoodieTableMetadataUtil.getMetadataTableMetaClient(hoodieTable.getMetaClient());
+      numFileGroups = 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metaClient,
+          MetadataPartitionType.RECORD_INDEX.partitionPath()).size();
+    } catch (TableNotFoundException e) {
+      // implies that metadata table has not been initialized yet (probably 
the first write on a new table)
+      return records;
+    }
+
+    JavaRDD<HoodieRecord<T>> y = HoodieJavaRDD.getJavaRDD(records).keyBy(r -> 
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), 
numFileGroups))
+        .partitionBy(new PartitionIdPassthrough(numFileGroups))
+        .map(t -> t._2);
+    ValidationUtils.checkState(y.getNumPartitions() <= numFileGroups);
+
+    registry.ifPresent(r -> r.add(TAG_LOC_NUM_PARTITIONS, 
records.getNumPartitions()));
+    return HoodieJavaRDD.of(y.mapPartitions(new 
LocationTagFunction(hoodieTable, registry)));
+  }
+
+  @Override
+  public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> 
writeStatuses, HoodieEngineContext context,
+                                                HoodieTable hoodieTable) {
+    // This is a no-op as metadata record index updates are automatically 
maintained within the metadata table.
+    return writeStatuses;
+  }
+
+  @Override
+  public boolean rollbackCommit(String instantTime) {
+    return true;
+  }
+
+  @Override
+  public boolean isGlobal() {
+    return true;
+  }
+
+  @Override
+  public boolean canIndexLogFiles() {
+    return false;
+  }
+
+  @Override
+  public boolean isImplicitWithStorage() {
+    return false;
+  }
+
+  /**
+   * Function that tags each HoodieRecord with an existing location, if known.
+   */
+  class LocationTagFunction implements 
FlatMapFunction<Iterator<HoodieRecord<T>>, HoodieRecord<T>> {
+    private HoodieTable hoodieTable;
+    private Option<Registry> registry;
+
+    public LocationTagFunction(HoodieTable hoodieTable, Option<Registry> 
registry) {
+      this.hoodieTable = hoodieTable;
+      this.registry = registry;
+    }
+
+    @Override
+    public Iterator<HoodieRecord<T>> call(Iterator<HoodieRecord<T>> 
hoodieRecordIterator) {
+      HoodieTimer timer = new HoodieTimer().startTimer();
+
+      List<HoodieRecord<T>> taggedRecords = new ArrayList<>();
+      Map<String, Integer> keyToIndexMap = new HashMap<>();
+      while (hoodieRecordIterator.hasNext()) {
+        HoodieRecord rec = hoodieRecordIterator.next();
+        keyToIndexMap.put(rec.getRecordKey(), taggedRecords.size());
+        taggedRecords.add(rec);
+      }
+
+      List<String> recordKeys = 
keyToIndexMap.keySet().stream().sorted().collect(Collectors.toList());

Review Comment:
   I assume this sorting is to ensure lookup in hfile does not have unnecessary 
seeks. 
   but, we already sort all lookups within HFileDataBlock
   ```
       if (!enableFullScan) {
         // HFile read will be efficient if keys are sorted, since on storage, 
records are sorted by key. This will avoid unnecessary seeks.
         Collections.sort(keys);
       }
   ```
   
   So, I don't see a need to sort here. Or do you mean to sort for some other 
reason? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to