prashantwason commented on a change in pull request #2064:
URL: https://github.com/apache/hudi/pull/2064#discussion_r491331025



##########
File path: 
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataIndex.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import scala.Tuple2;
+
+/**
+ * An Index which is specific to the Hoodie Metadata Table.
+ *
+ * Hoodie Metadata Table saves information in specific named partitions. So 
the partition and the base file location
+ * during indexing can be determined by looking at the record itself. This 
speeds up the indexing during updates.
+ */
+public class HoodieMetadataIndex<T extends HoodieMetadataPayload> extends 
HoodieIndex<T> {
+
+  public HoodieMetadataIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  /**
+   * Returns an RDD mapping each HoodieKey with a partitionPath/fileID which 
contains it. Option.Empty if the key is not
+   * found.
+   *
+   * @param hoodieKeys  keys to lookup
+   * @param jsc         spark context
+   * @param hoodieTable hoodie table object
+   */
+  @Override
+  public JavaPairRDD<HoodieKey, Option<Pair<String, String>>> 
fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
+                                                                               
   JavaSparkContext jsc,
+                                                                               
   HoodieTable<T> hoodieTable) {
+    JavaPairRDD<String, String> partitionRecordKeyPairRDD =
+        hoodieKeys.mapToPair(key -> new Tuple2<>(key.getPartitionPath(), 
key.getRecordKey()));
+
+    // Lookup location of all the hoodie keys
+    JavaPairRDD<HoodieKey, HoodieRecordLocation> recordKeyLocationRDD = 
lookupIndex(hoodieKeys, jsc, hoodieTable);
+    JavaPairRDD<HoodieKey, String> keyHoodieKeyPairRDD = 
hoodieKeys.mapToPair(key -> new Tuple2<>(key, null));
+
+    return 
keyHoodieKeyPairRDD.leftOuterJoin(recordKeyLocationRDD).mapToPair(keyLoc -> {
+      Option<Pair<String, String>> partitionPathFileidPair;
+      if (keyLoc._2._2.isPresent()) {
+        partitionPathFileidPair = 
Option.of(Pair.of(keyLoc._1().getPartitionPath(), 
keyLoc._2._2.get().getFileId()));
+      } else {
+        partitionPathFileidPair = Option.empty();
+      }
+      return new Tuple2<>(keyLoc._1, partitionPathFileidPair);
+    });
+  }
+
+  /**
+   * Tag each record with its current location (partition / fileId).
+   *
+   * @param recordRDD   records to tag
+   * @param jsc         spark context
+   * @param hoodieTable hoodie table object
+   */
+  @Override
+  public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> 
recordRDD, JavaSparkContext jsc,
+                                              HoodieTable<T> hoodieTable) 
throws HoodieIndexException {
+    JavaPairRDD<HoodieKey, HoodieRecord> keyRecordPairRDD = 
recordRDD.mapToPair(r -> new Tuple2<>(r.getKey(), r));
+    JavaPairRDD<HoodieKey, HoodieRecordLocation> keyLocationPairRDD =
+        lookupIndex(keyRecordPairRDD.keys(), jsc, hoodieTable);
+
+    return keyRecordPairRDD.leftOuterJoin(keyLocationPairRDD).values().map(v1 
-> {
+      return HoodieIndexUtils.getTaggedRecord(v1._1, 
Option.ofNullable(v1._2.orNull()));
+    });
+  }
+
+  /**
+   * Update the index (if required) from the results of the writes into the 
hoodie table.
+   *
+   * @param writeStatusRDD  status of the writes
+   * @param jsc             spark context
+   * @param hoodieTable     hoodie table object
+   */
+  @Override
+  public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> 
writeStatusRDD, JavaSparkContext jsc,
+                                             HoodieTable<T> hoodieTable)
+      throws HoodieIndexException {
+    // No update required for metadata index
+    return writeStatusRDD;
+  }
+
+  /**
+   * Lookup the location for each hoodie key and return the pair<key, 
location> for all record keys already
+   * present and drop the record keys if not present.
+   */
+  private JavaPairRDD<HoodieKey, HoodieRecordLocation> 
lookupIndex(JavaRDD<HoodieKey> hoodieKeys,
+                                                                   
JavaSparkContext jsc,
+                                                                   
HoodieTable<T> hoodieTable) {
+    SliceView fsView = hoodieTable.getSliceView();
+    List<HoodieBaseFile> baseFiles = 
fsView.getLatestFileSlices(HoodieMetadataImpl.METADATA_PARTITION_NAME)

Review comment:
       Yes. Since we dont have other metadata table partitions, I have kept it 
simple.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to