xushiyan commented on code in PR #8758:
URL: https://github.com/apache/hudi/pull/8758#discussion_r1228989080


##########
hudi-common/src/main/avro/HoodieMetadata.avsc:
##########
@@ -358,6 +358,46 @@
                 }
             ],
             "default" : null
+        },
+        {
+            "name": "recordIndexMetadata",
+            "doc": "Metadata Index that contains information about record keys 
and their location in the dataset",
+            "type": [
+                "null",
+                 {
+                   "type": "record",
+                   "name": "HoodieRecordIndexInfo",
+                    "fields": [
+                        {
+                            "name": "partition",
+                            "type": "string",
+                            "doc": "Partition which contains the record",
+                            "avro.java.string": "String"
+                        },
+                        {
+                            "name": "fileIdHighBits",
+                            "type": "long",
+                            "doc": "fileId which contains the record (high 64 
bits)"
+                        },
+                        {
+                            "name": "fileIdLowBits",
+                            "type": "long",
+                            "doc": "fileId which contains the record (low 64 
bits)"
+                        },
+                        {
+                            "name": "fileIndex",
+                            "type": "int",
+                            "doc": "index of the file"
+                        },
+                        {
+                            "name": "instantTime",
+                            "type": "long",
+                            "doc": "Timestamp at which record was added"

Review Comment:
   this is millisec right? should make it clear in the doc



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java:
##########
@@ -301,24 +300,10 @@ private void 
validateClusteringCommit(HoodieWriteMetadata<JavaRDD<WriteStatus>>
 
   private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata 
commitMetadata,
                                    HoodieInstant hoodieInstant) {
-    boolean isTableServiceAction = 
table.isTableServiceAction(hoodieInstant.getAction(), 
hoodieInstant.getTimestamp());

Review Comment:
   somehow this boolean it's not used but it should be meant for validation - 
only proceed if it's a table service 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -147,6 +154,16 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
 
   private static final Conversions.DecimalConversion AVRO_DECIMAL_CONVERSION = 
new Conversions.DecimalConversion();
 
+  // HoodieMetadata record index payload field ids
+  public static final String RECORD_INDEX_FIELD_PARTITION = "partition";
+  public static final String RECORD_INDEX_FIELD_FILEID_HIGH_BITS = 
"fileIdHighBits";
+  public static final String RECORD_INDEX_FIELD_FILEID_LOW_BITS = 
"fileIdLowBits";
+  public static final String RECORD_INDEX_FIELD_FILE_INDEX = "fileIndex";
+  public static final String RECORD_INDEX_FIELD_INSTANT_TIME = "instantTime";
+
+  // FileIndex value saved in record index record when the fileId has no index 
(old format of base filename)
+  private static final int RECORD_INDEX_MISSING_FILEINDEX = -1;

Review Comment:
   ```suggestion
     private static final int RECORD_INDEX_MISSING_FILEINDEX_FALLBACK = -1;
   ```



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java:
##########
@@ -123,6 +124,9 @@ public static HoodieWriteConfig createMetadataWriteConfig(
         .withAllowMultiWriteOnSameInstant(true)
         
.withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
         .withPopulateMetaFields(DEFAULT_METADATA_POPULATE_META_FIELDS)
+        .withStorageConfig(HoodieStorageConfig.newBuilder()
+            
.hfileWriterToAllowDuplicates(writeConfig.hfileWriterToAllowDuplicates())

Review Comment:
   i think we should fail the init with considerations
   - when people use RLI, the table is supposed to have globally unique record 
key
   - for tables having dups, it's meant for append-only usecases, so RLI is not 
applicable and should not be used
   - when we first time launch an experimental feature, in general we should 
favor restricting usage instead of trying to accommodate everything; we can 
make it more flexible when iterating in future
   
   in short i think in 0.14.0 we should strictly disallow duplicate scenario 
with RLI as it breaks the uniqueness contract. users should dedup first or 
double check if RLI is really what they need



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -95,12 +96,18 @@ protected HoodieWriteHandle(HoodieWriteConfig config, 
String instantTime, String
     this.writeSchema = overriddenSchema.orElseGet(() -> 
getWriteSchema(config));
     this.writeSchemaWithMetaFields = 
HoodieAvroUtils.addMetadataFields(writeSchema, 
config.allowOperationMetadataField());
     this.timer = HoodieTimer.start();
-    this.writeStatus = (WriteStatus) 
ReflectionUtils.loadClass(config.getWriteStatusClassName(),
-        !hoodieTable.getIndex().isImplicitWithStorage(), 
config.getWriteStatusFailureFraction());
     this.taskContextSupplier = taskContextSupplier;
     this.writeToken = makeWriteToken();
     this.schemaOnReadEnabled = 
!isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
     this.recordMerger = config.getRecordMerger();
+
+    // We need to track written records within WriteStatus in two cases:
+    // 1. When the HoodieIndex being used is not implicit with storage
+    // 2. If any of the metadata table partitions (record index, etc) which 
require written record tracking are enabled
+    final boolean trackSuccessRecords = 
!hoodieTable.getIndex().isImplicitWithStorage()
+        || 
HoodieTableMetadataUtil.getMetadataPartitionsNeedingWriteStatusTracking(config.getMetadataConfig(),
 hoodieTable.getMetaClient());

Review Comment:
   this could be encapsulated in a HoodieTable API say 
`shouldTrackSuccessRecords()`



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Hoodie Index implementation backed by the record index present in the 
Metadata Table.
+ */
+public class SparkMetadataTableRecordIndex extends HoodieIndex<Object, Object> 
{
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMetadataTableRecordIndex.class);
+  // The index to fallback upon when record index is not initialized yet.
+  // This should be a global index like record index so that the behavior of 
tagging across partitions is not changed.
+  private static final HoodieIndex.IndexType FALLBACK_INDEX_TYPE = 
IndexType.GLOBAL_SIMPLE;
+
+  public SparkMetadataTableRecordIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public <R> HoodieData<HoodieRecord<R>> 
tagLocation(HoodieData<HoodieRecord<R>> records, HoodieEngineContext context, 
HoodieTable hoodieTable) throws HoodieIndexException {
+    int fileGroupSize;
+    try {
+      
ValidationUtils.checkState(hoodieTable.getMetaClient().getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.RECORD_INDEX));
+      fileGroupSize = 
hoodieTable.getMetadataTable().getNumFileGroupsForPartition(MetadataPartitionType.RECORD_INDEX);
+      ValidationUtils.checkState(fileGroupSize > 0, "Record index should have 
at least one file group");
+    } catch (TableNotFoundException | IllegalStateException e) {
+      // This means that record index has not been initialized.
+      LOG.warn(String.format("Record index not initialized so falling back to 
%s for tagging records", FALLBACK_INDEX_TYPE.name()));
+
+      // Fallback to another index so that tagLocation is still accurate and 
there are no duplicates.
+      HoodieWriteConfig otherConfig = 
HoodieWriteConfig.newBuilder().withProperties(config.getProps())
+          
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(FALLBACK_INDEX_TYPE).build()).build();
+      HoodieIndex fallbackIndex = 
SparkHoodieIndexFactory.createIndex(otherConfig);
+
+      // Fallback index needs to be a global index like record index
+      ValidationUtils.checkArgument(fallbackIndex.isGlobal(), "Fallback index 
needs to be a global index like record index");
+
+      return fallbackIndex.tagLocation(records, context, hoodieTable);
+    }
+
+    // final variable required for lamda functions below
+    final int numFileGroups = fileGroupSize;
+
+    // cache the input records if needed

Review Comment:
   think the code is quite obvious so comment would be redundant



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Hoodie Index implementation backed by the record index present in the 
Metadata Table.
+ */
+public class SparkMetadataTableRecordIndex extends HoodieIndex<Object, Object> 
{
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMetadataTableRecordIndex.class);
+  // The index to fallback upon when record index is not initialized yet.
+  // This should be a global index like record index so that the behavior of 
tagging across partitions is not changed.
+  private static final HoodieIndex.IndexType FALLBACK_INDEX_TYPE = 
IndexType.GLOBAL_SIMPLE;
+
+  public SparkMetadataTableRecordIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public <R> HoodieData<HoodieRecord<R>> 
tagLocation(HoodieData<HoodieRecord<R>> records, HoodieEngineContext context, 
HoodieTable hoodieTable) throws HoodieIndexException {
+    int fileGroupSize;
+    try {
+      
ValidationUtils.checkState(hoodieTable.getMetaClient().getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.RECORD_INDEX));
+      fileGroupSize = 
hoodieTable.getMetadataTable().getNumFileGroupsForPartition(MetadataPartitionType.RECORD_INDEX);
+      ValidationUtils.checkState(fileGroupSize > 0, "Record index should have 
at least one file group");
+    } catch (TableNotFoundException | IllegalStateException e) {
+      // This means that record index has not been initialized.
+      LOG.warn(String.format("Record index not initialized so falling back to 
%s for tagging records", FALLBACK_INDEX_TYPE.name()));
+
+      // Fallback to another index so that tagLocation is still accurate and 
there are no duplicates.
+      HoodieWriteConfig otherConfig = 
HoodieWriteConfig.newBuilder().withProperties(config.getProps())
+          
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(FALLBACK_INDEX_TYPE).build()).build();
+      HoodieIndex fallbackIndex = 
SparkHoodieIndexFactory.createIndex(otherConfig);
+
+      // Fallback index needs to be a global index like record index
+      ValidationUtils.checkArgument(fallbackIndex.isGlobal(), "Fallback index 
needs to be a global index like record index");
+
+      return fallbackIndex.tagLocation(records, context, hoodieTable);
+    }
+
+    // final variable required for lamda functions below

Review Comment:
   we may skip this comment



##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java:
##########
@@ -126,6 +131,15 @@ public boolean canWrite() {
 
   @Override
   public void writeAvro(String recordKey, IndexedRecord record) throws 
IOException {
+    if (prevRecordKey.equals(recordKey)) {
+      // When this config is enabled, do not allow duplicates to be written to 
hFile.
+      LOG.warn("Duplicate recordKey " + recordKey + " found while writing to 
HFile. Record payload " + record);
+      if (this.hfileConfig.allowDuplicatesToBeInserted()) {
+        return;
+      } else {
+        throw new HoodieDuplicateKeyException("Duplicate recordKey " + 
recordKey + " found while writing to HFile.");
+      }
+    }

Review Comment:
   so this is to cater for tables already having existing dups or expecting to 
have dups. can we derive this based on table config for e.g. if precombine 
field is set, we do not allow dups, to save an extra config here. The advanced 
config raises usability hurdle



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -322,6 +324,7 @@ public static List<HoodieRecord> 
convertMetadataToFilesPartitionRecords(HoodieCo
     
records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded));
 
     // Update files listing records for each individual partition
+    int[] newFileCount = {0};

Review Comment:
   you could use `HoodieAtomicLongAccumulator` for this



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Hoodie Index implementation backed by the record index present in the 
Metadata Table.
+ */
+public class SparkMetadataTableRecordIndex extends HoodieIndex<Object, Object> 
{
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMetadataTableRecordIndex.class);
+  // The index to fallback upon when record index is not initialized yet.
+  // This should be a global index like record index so that the behavior of 
tagging across partitions is not changed.
+  private static final HoodieIndex.IndexType FALLBACK_INDEX_TYPE = 
IndexType.GLOBAL_SIMPLE;
+
+  public SparkMetadataTableRecordIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public <R> HoodieData<HoodieRecord<R>> 
tagLocation(HoodieData<HoodieRecord<R>> records, HoodieEngineContext context, 
HoodieTable hoodieTable) throws HoodieIndexException {
+    int fileGroupSize;
+    try {
+      
ValidationUtils.checkState(hoodieTable.getMetaClient().getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.RECORD_INDEX));
+      fileGroupSize = 
hoodieTable.getMetadataTable().getNumFileGroupsForPartition(MetadataPartitionType.RECORD_INDEX);
+      ValidationUtils.checkState(fileGroupSize > 0, "Record index should have 
at least one file group");

Review Comment:
   this validation should have been done when config was first created so we 
don't need to worry about it when use



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -666,6 +691,75 @@ private static HoodieMetadataColumnStats 
mergeColumnStatsRecords(HoodieMetadataC
         .build();
   }
 
+  /**
+   * Create and return a {@code HoodieMetadataPayload} to insert or update an 
entry for the record index.
+   * <p>
+   * Each entry maps the key of a single record in HUDI to its location.
+   *
+   * @param recordKey   Key of the record
+   * @param partition   Name of the partition which contains the record
+   * @param fileId      fileId which contains the record
+   * @param instantTime instantTime when the record was added
+   */
+  public static HoodieRecord<HoodieMetadataPayload> 
createRecordIndexUpdate(String recordKey, String partition,
+      String fileId, String instantTime) {
+    HoodieKey key = new HoodieKey(recordKey, 
MetadataPartitionType.RECORD_INDEX.getPartitionPath());
+    // Data file names have a -D suffix to denote the index (D = integer) of 
the file written
+    // In older HUID versions the file index was missing
+    final UUID uuid;
+    final int fileIndex;
+    try {
+      if (fileId.length() == 36) {
+        uuid = UUID.fromString(fileId);
+        fileIndex = RECORD_INDEX_MISSING_FILEINDEX;
+      } else {
+        final int index = fileId.lastIndexOf("-");
+        uuid = UUID.fromString(fileId.substring(0, index));
+        fileIndex = Integer.parseInt(fileId.substring(index + 1));
+      }
+    } catch (Exception e) {
+      throw new HoodieMetadataException(String.format("Invalid UUID or index: 
fileID=%s, partition=%s, instantTIme=%s",
+          fileId, partition, instantTime), e);
+    }
+
+    // Store instantTime as milliseconds sinch epoch. Using an int here allows 
dates till year 2038.

Review Comment:
   don't quite get this comment - which one is int?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Hoodie Index implementation backed by the record index present in the 
Metadata Table.
+ */
+public class SparkMetadataTableRecordIndex extends HoodieIndex<Object, Object> 
{
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMetadataTableRecordIndex.class);
+  // The index to fallback upon when record index is not initialized yet.
+  // This should be a global index like record index so that the behavior of 
tagging across partitions is not changed.
+  private static final HoodieIndex.IndexType FALLBACK_INDEX_TYPE = 
IndexType.GLOBAL_SIMPLE;
+
+  public SparkMetadataTableRecordIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public <R> HoodieData<HoodieRecord<R>> 
tagLocation(HoodieData<HoodieRecord<R>> records, HoodieEngineContext context, 
HoodieTable hoodieTable) throws HoodieIndexException {
+    int fileGroupSize;
+    try {
+      
ValidationUtils.checkState(hoodieTable.getMetaClient().getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.RECORD_INDEX));
+      fileGroupSize = 
hoodieTable.getMetadataTable().getNumFileGroupsForPartition(MetadataPartitionType.RECORD_INDEX);
+      ValidationUtils.checkState(fileGroupSize > 0, "Record index should have 
at least one file group");
+    } catch (TableNotFoundException | IllegalStateException e) {
+      // This means that record index has not been initialized.
+      LOG.warn(String.format("Record index not initialized so falling back to 
%s for tagging records", FALLBACK_INDEX_TYPE.name()));
+
+      // Fallback to another index so that tagLocation is still accurate and 
there are no duplicates.
+      HoodieWriteConfig otherConfig = 
HoodieWriteConfig.newBuilder().withProperties(config.getProps())
+          
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(FALLBACK_INDEX_TYPE).build()).build();
+      HoodieIndex fallbackIndex = 
SparkHoodieIndexFactory.createIndex(otherConfig);
+
+      // Fallback index needs to be a global index like record index
+      ValidationUtils.checkArgument(fallbackIndex.isGlobal(), "Fallback index 
needs to be a global index like record index");

Review Comment:
   again i think config-level validation can be and should be done as early as 
configs were loaded



##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java:
##########
@@ -545,6 +545,33 @@ public void close() {
     }
   }
 
+  @Override
+  public ClosableIterator<String> getRecordKeyIterator() {
+    final HFileScanner scanner = reader.getScanner(false, false);

Review Comment:
   we should improve the names to indicate usage clearly



##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -73,35 +71,25 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(BaseTableMetadata.class);
 
-  protected static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
-  // NOTE: Buffer-size is deliberately set pretty low, since MT internally is 
relying
-  //       on HFile (serving as persisted binary key-value mapping) to do 
caching
-  protected static final int BUFFER_SIZE = 10 * 1024; // 10Kb
-
   protected final transient HoodieEngineContext engineContext;
   protected final SerializablePath dataBasePath;
   protected final HoodieTableMetaClient dataMetaClient;
   protected final Option<HoodieMetadataMetrics> metrics;
   protected final HoodieMetadataConfig metadataConfig;
-  // Directory used for Spillable Map when merging records
-  protected final String spillableMapDirectory;
 
-  protected boolean isMetadataTableEnabled;
-  protected boolean isBloomFilterIndexEnabled = false;
-  protected boolean isColumnStatsIndexEnabled = false;
+  protected boolean isMetadataTableInitialized;
 
-  protected BaseTableMetadata(HoodieEngineContext engineContext, 
HoodieMetadataConfig metadataConfig,
-                              String dataBasePath, String 
spillableMapDirectory) {
+  protected BaseTableMetadata(HoodieEngineContext engineContext, 
HoodieMetadataConfig metadataConfig, String dataBasePath) {
     this.engineContext = engineContext;
     this.dataBasePath = new SerializablePath(new CachingPath(dataBasePath));
     this.dataMetaClient = HoodieTableMetaClient.builder()
         .setConf(engineContext.getHadoopConf().get())
         .setBasePath(dataBasePath)
         .build();
-    this.spillableMapDirectory = spillableMapDirectory;
     this.metadataConfig = metadataConfig;
 
-    this.isMetadataTableEnabled = metadataConfig.enabled();
+    this.isMetadataTableInitialized = 
dataMetaClient.getTableConfig().isMetadataTableEnabled();

Review Comment:
   saving this value may lead to inconsistency issue where MDT is physically 
deleted and hoodie.properties was updated, but this value won't change. We 
should refresh metaclient and always call 
`ataMetaClient.getTableConfig().isMetadataTableEnabled()` to get the latest 
value?
   
   also we should renamed `isMetadataTableEnabled()` to 
`isMetadataTableInitialized()` to distinguish it from reading 
`hoodie.metadata.enable`



##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -67,12 +57,20 @@ public static List<String> allPaths() {
     );
   }
 
+  /**
+   * Returns the list of metadata table partitions which require WriteStatus 
to track written records.
+   * <p>
+   * These partitions need the list of written records so that they can update 
their metadata.
+   */
+  public static List<MetadataPartitionType> needWriteStatusTracking() {

Review Comment:
   `partitionsWithWriteStatusTracking()`; needXXX implies boolean return
   
   i would actually suggest make this a property of MetadataPartitionType so 
keep things cohesive.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -683,12 +683,19 @@ public void initializeBootstrapDirsIfNotExists() throws 
IOException {
     initializeBootstrapDirsIfNotExists(getHadoopConf(), basePath.toString(), 
getFs());
   }
 
+  /**
+   * Reload table config and cache.
+   */
+  public synchronized void reloadTableConfig() {

Review Comment:
   should add `@visibleForTesting`



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import scala.Tuple2;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+
+/**
+ * Hoodie Index implementation backed by the record index present in the 
Metadata Table.
+ */
+public class SparkMetadataTableRecordIndex extends HoodieIndex<Object, Object> 
{
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMetadataTableRecordIndex.class);
+  // The index to fallback upon when record index is not initialized yet.
+  // This should be a global index like record index so that the behavior of 
tagging across partitions is not changed.
+  private static final HoodieIndex.IndexType FALLBACK_INDEX_TYPE = 
IndexType.GLOBAL_SIMPLE;
+
+  public SparkMetadataTableRecordIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public <R> HoodieData<HoodieRecord<R>> 
tagLocation(HoodieData<HoodieRecord<R>> records, HoodieEngineContext context, 
HoodieTable hoodieTable) throws HoodieIndexException {
+    int fileGroupSize;
+    try {
+      
ValidationUtils.checkState(hoodieTable.getMetaClient().getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.RECORD_INDEX));

Review Comment:
   this validation can be done when instantiating? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to