This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ecbd389  [HUDI-1478] Introduce HoodieBloomIndex to hudi-java-client 
(#2608)
ecbd389 is described below

commit ecbd389a3f9215e219cb19b8641f2faea4fa3ad7
Author: Shen Hong <[email protected]>
AuthorDate: Sun Mar 28 20:28:40 2021 +0800

    [HUDI-1478] Introduce HoodieBloomIndex to hudi-java-client (#2608)
---
 .../hudi/index/bloom/HoodieBaseBloomIndex.java}    |  26 +--
 .../bloom/HoodieBaseBloomIndexCheckFunction.java}  |   4 +-
 .../org/apache/hudi/index/FlinkHoodieIndex.java    |   2 +-
 .../hudi/index/bloom/FlinkHoodieBloomIndex.java    | 235 +--------------------
 .../apache/hudi/index/JavaHoodieBloomIndex.java    |  32 +++
 .../org/apache/hudi/index/JavaHoodieIndex.java     |   4 +-
 6 files changed, 50 insertions(+), 253 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndex.java
similarity index 92%
copy from 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java
copy to 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndex.java
index 255a66b..75ab693 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndex.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.index.bloom;
 
+import com.beust.jcommander.internal.Lists;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
@@ -28,15 +29,13 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.MetadataNotFoundException;
-import org.apache.hudi.index.FlinkHoodieIndex;
+import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.HoodieIndexUtils;
 import org.apache.hudi.io.HoodieKeyLookupHandle;
 import org.apache.hudi.io.HoodieRangeInfoHandle;
 import org.apache.hudi.table.HoodieTable;
-
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import com.beust.jcommander.internal.Lists;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -44,20 +43,17 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import static java.util.stream.Collectors.mapping;
 import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.mapping;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
 
-/**
- * Indexing mechanism based on bloom filter. Each parquet file includes its 
row_key bloom filter in its metadata.
- */
 @SuppressWarnings("checkstyle:LineLength")
-public class FlinkHoodieBloomIndex<T extends HoodieRecordPayload> extends 
FlinkHoodieIndex<T> {
+public class HoodieBaseBloomIndex<T extends HoodieRecordPayload> extends 
HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
 
-  private static final Logger LOG = 
LogManager.getLogger(FlinkHoodieBloomIndex.class);
+  private static final Logger LOG = 
LogManager.getLogger(HoodieBaseBloomIndex.class);
 
-  public FlinkHoodieBloomIndex(HoodieWriteConfig config) {
+  public HoodieBaseBloomIndex(HoodieWriteConfig config) {
     super(config);
   }
 
@@ -112,7 +108,7 @@ public class FlinkHoodieBloomIndex<T extends 
HoodieRecordPayload> extends FlinkH
     // Step 3: Obtain a List, for each incoming record, that already exists, 
with the file id,
     // that contains it.
     List<Pair<String, HoodieKey>> fileComparisons =
-            explodeRecordsWithFileComparisons(partitionToFileInfo, 
partitionRecordKeyMap);
+        explodeRecordsWithFileComparisons(partitionToFileInfo, 
partitionRecordKeyMap);
     return findMatchingFilesForRecordKeys(fileComparisons, hoodieTable);
   }
 
@@ -121,7 +117,7 @@ public class FlinkHoodieBloomIndex<T extends 
HoodieRecordPayload> extends FlinkH
    */
   //TODO duplicate code with spark, we can optimize this method later
   List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> 
partitions, final HoodieEngineContext context,
-                                                             final HoodieTable 
hoodieTable) {
+                                                           final HoodieTable 
hoodieTable) {
     // Obtain the latest data files from all the partitions.
     List<Pair<String, String>> partitionPathFileIDList = 
getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
         .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
@@ -197,7 +193,7 @@ public class FlinkHoodieBloomIndex<T extends 
HoodieRecordPayload> extends FlinkH
       hoodieRecordKeys.forEach(hoodieRecordKey -> {
         indexFileFilter.getMatchingFilesAndPartition(partitionPath, 
hoodieRecordKey).forEach(partitionFileIdPair -> {
           fileRecordPairs.add(Pair.of(partitionFileIdPair.getRight(),
-                  new HoodieKey(hoodieRecordKey, partitionPath)));
+              new HoodieKey(hoodieRecordKey, partitionPath)));
         });
       });
     });
@@ -215,7 +211,7 @@ public class FlinkHoodieBloomIndex<T extends 
HoodieRecordPayload> extends FlinkH
 
     List<HoodieKeyLookupHandle.KeyLookupResult> keyLookupResults = new 
ArrayList<>();
 
-    Iterator<List<HoodieKeyLookupHandle.KeyLookupResult>> iterator = new 
HoodieFlinkBloomIndexCheckFunction(hoodieTable, 
config).apply(fileComparisons.iterator());
+    Iterator<List<HoodieKeyLookupHandle.KeyLookupResult>> iterator = new 
HoodieBaseBloomIndexCheckFunction(hoodieTable, 
config).apply(fileComparisons.iterator());
     while (iterator.hasNext()) {
       keyLookupResults.addAll(iterator.next());
     }
@@ -237,7 +233,7 @@ public class FlinkHoodieBloomIndex<T extends 
HoodieRecordPayload> extends FlinkH
    * Tag the <rowKey, filename> back to the original HoodieRecord List.
    */
   protected List<HoodieRecord<T>> tagLocationBacktoRecords(
-          Map<HoodieKey, HoodieRecordLocation> keyFilenamePair, 
List<HoodieRecord<T>> records) {
+      Map<HoodieKey, HoodieRecordLocation> keyFilenamePair, 
List<HoodieRecord<T>> records) {
     Map<HoodieKey, HoodieRecord<T>> keyRecordPairMap = new HashMap<>();
     records.forEach(r -> keyRecordPairMap.put(r.getKey(), r));
     // Here as the record might have more data than rowKey (some rowKeys' 
fileId is null),
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/HoodieFlinkBloomIndexCheckFunction.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndexCheckFunction.java
similarity index 96%
rename from 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/HoodieFlinkBloomIndexCheckFunction.java
rename to 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndexCheckFunction.java
index a147c14..95a2255 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/HoodieFlinkBloomIndexCheckFunction.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndexCheckFunction.java
@@ -37,14 +37,14 @@ import java.util.List;
  * Function performing actual checking of list containing (fileId, hoodieKeys) 
against the actual files.
  */
 //TODO we can move this class into the hudi-client-common and reuse it for 
spark client
-public class HoodieFlinkBloomIndexCheckFunction
+public class HoodieBaseBloomIndexCheckFunction 
         implements Function<Iterator<Pair<String, HoodieKey>>, 
Iterator<List<KeyLookupResult>>> {
 
   private final HoodieTable hoodieTable;
 
   private final HoodieWriteConfig config;
 
-  public HoodieFlinkBloomIndexCheckFunction(HoodieTable hoodieTable, 
HoodieWriteConfig config) {
+  public HoodieBaseBloomIndexCheckFunction(HoodieTable hoodieTable, 
HoodieWriteConfig config) {
     this.hoodieTable = hoodieTable;
     this.config = config;
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java
index ecd6490..272da8c 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java
@@ -46,7 +46,7 @@ public abstract class FlinkHoodieIndex<T extends 
HoodieRecordPayload> extends Ho
     super(config);
   }
 
-  public static FlinkHoodieIndex createIndex(HoodieFlinkEngineContext context, 
HoodieWriteConfig config) {
+  public static HoodieIndex createIndex(HoodieFlinkEngineContext context, 
HoodieWriteConfig config) {
     // first use index class config to create index.
     if (!StringUtils.isNullOrEmpty(config.getIndexClass())) {
       Object instance = ReflectionUtils.loadClass(config.getIndexClass(), 
config);
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java
index 255a66b..355dced 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java
@@ -18,248 +18,15 @@
 
 package org.apache.hudi.index.bloom;
 
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.MetadataNotFoundException;
-import org.apache.hudi.index.FlinkHoodieIndex;
-import org.apache.hudi.index.HoodieIndexUtils;
-import org.apache.hudi.io.HoodieKeyLookupHandle;
-import org.apache.hudi.io.HoodieRangeInfoHandle;
-import org.apache.hudi.table.HoodieTable;
-
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import com.beust.jcommander.internal.Lists;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import static java.util.stream.Collectors.mapping;
-import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.toList;
-import static 
org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
 
 /**
  * Indexing mechanism based on bloom filter. Each parquet file includes its 
row_key bloom filter in its metadata.
  */
 @SuppressWarnings("checkstyle:LineLength")
-public class FlinkHoodieBloomIndex<T extends HoodieRecordPayload> extends 
FlinkHoodieIndex<T> {
-
-  private static final Logger LOG = 
LogManager.getLogger(FlinkHoodieBloomIndex.class);
-
+public class FlinkHoodieBloomIndex<T extends HoodieRecordPayload> extends 
HoodieBaseBloomIndex<T> {
   public FlinkHoodieBloomIndex(HoodieWriteConfig config) {
     super(config);
   }
-
-  @Override
-  public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records, 
HoodieEngineContext context,
-                                           HoodieTable<T, 
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
-    // Step 1: Extract out thinner Map of (partitionPath, recordKey)
-    Map<String, List<String>> partitionRecordKeyMap = new HashMap<>();
-    records.forEach(record -> {
-      if (partitionRecordKeyMap.containsKey(record.getPartitionPath())) {
-        
partitionRecordKeyMap.get(record.getPartitionPath()).add(record.getRecordKey());
-      } else {
-        List<String> recordKeys = Lists.newArrayList();
-        recordKeys.add(record.getRecordKey());
-        partitionRecordKeyMap.put(record.getPartitionPath(), recordKeys);
-      }
-    });
-
-    // Step 2: Lookup indexes for all the partition/recordkey pair
-    Map<HoodieKey, HoodieRecordLocation> keyFilenamePairMap =
-        lookupIndex(partitionRecordKeyMap, context, hoodieTable);
-
-    if (LOG.isDebugEnabled()) {
-      long totalTaggedRecords = keyFilenamePairMap.values().size();
-      LOG.debug("Number of update records (ones tagged with a fileID): " + 
totalTaggedRecords);
-    }
-
-    // Step 3: Tag the incoming records, as inserts or updates, by joining 
with existing record keys
-    List<HoodieRecord<T>> taggedRecords = 
tagLocationBacktoRecords(keyFilenamePairMap, records);
-
-    return taggedRecords;
-  }
-
-  /**
-   * Lookup the location for each record key and return the 
pair<record_key,location> for all record keys already
-   * present and drop the record keys if not present.
-   */
-  private Map<HoodieKey, HoodieRecordLocation> lookupIndex(
-      Map<String, List<String>> partitionRecordKeyMap, final 
HoodieEngineContext context,
-      final HoodieTable hoodieTable) {
-    // Obtain records per partition, in the incoming records
-    Map<String, Long> recordsPerPartition = new HashMap<>();
-    partitionRecordKeyMap.keySet().forEach(k -> recordsPerPartition.put(k, 
Long.valueOf(partitionRecordKeyMap.get(k).size())));
-    List<String> affectedPartitionPathList = new 
ArrayList<>(recordsPerPartition.keySet());
-
-    // Step 2: Load all involved files as <Partition, filename> pairs
-    List<Pair<String, BloomIndexFileInfo>> fileInfoList =
-        loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable);
-    final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
-        fileInfoList.stream().collect(groupingBy(Pair::getLeft, 
mapping(Pair::getRight, toList())));
-
-    // Step 3: Obtain a List, for each incoming record, that already exists, 
with the file id,
-    // that contains it.
-    List<Pair<String, HoodieKey>> fileComparisons =
-            explodeRecordsWithFileComparisons(partitionToFileInfo, 
partitionRecordKeyMap);
-    return findMatchingFilesForRecordKeys(fileComparisons, hoodieTable);
-  }
-
-  /**
-   * Load all involved files as <Partition, filename> pair List.
-   */
-  //TODO duplicate code with spark, we can optimize this method later
-  List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> 
partitions, final HoodieEngineContext context,
-                                                             final HoodieTable 
hoodieTable) {
-    // Obtain the latest data files from all the partitions.
-    List<Pair<String, String>> partitionPathFileIDList = 
getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
-        .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
-        .collect(toList());
-
-    if (config.getBloomIndexPruneByRanges()) {
-      // also obtain file ranges, if range pruning is enabled
-      context.setJobStatus(this.getClass().getName(), "Obtain key ranges for 
file slices (range pruning=on)");
-      return context.map(partitionPathFileIDList, pf -> {
-        try {
-          HoodieRangeInfoHandle rangeInfoHandle = new 
HoodieRangeInfoHandle(config, hoodieTable, pf);
-          String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), 
minMaxKeys[0], minMaxKeys[1]));
-        } catch (MetadataNotFoundException me) {
-          LOG.warn("Unable to find range metadata in file :" + pf);
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
-        }
-      }, Math.max(partitionPathFileIDList.size(), 1));
-    } else {
-      return partitionPathFileIDList.stream()
-          .map(pf -> Pair.of(pf.getKey(), new 
BloomIndexFileInfo(pf.getValue()))).collect(toList());
-    }
-  }
-
-  @Override
-  public boolean rollbackCommit(String instantTime) {
-    // Nope, don't need to do anything.
-    return true;
-  }
-
-  /**
-   * This is not global, since we depend on the partitionPath to do the lookup.
-   */
-  @Override
-  public boolean isGlobal() {
-    return false;
-  }
-
-  /**
-   * No indexes into log files yet.
-   */
-  @Override
-  public boolean canIndexLogFiles() {
-    return false;
-  }
-
-  /**
-   * Bloom filters are stored, into the same data files.
-   */
-  @Override
-  public boolean isImplicitWithStorage() {
-    return true;
-  }
-
-  /**
-   * For each incoming record, produce N output records, 1 each for each file 
against which the record's key needs to be
-   * checked. For tables, where the keys have a definite insert order (e.g: 
timestamp as prefix), the number of files
-   * to be compared gets cut down a lot from range pruning.
-   * <p>
-   * Sub-partition to ensure the records can be looked up against files & also 
prune file<=>record comparisons based on
-   * recordKey ranges in the index info.
-   */
-  List<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
-      final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
-      Map<String, List<String>> partitionRecordKeyMap) {
-    IndexFileFilter indexFileFilter =
-        config.useBloomIndexTreebasedFilter() ? new 
IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
-            : new ListBasedIndexFileFilter(partitionToFileIndexInfo);
-
-    List<Pair<String, HoodieKey>> fileRecordPairs = new ArrayList<>();
-    partitionRecordKeyMap.keySet().forEach(partitionPath ->  {
-      List<String> hoodieRecordKeys = partitionRecordKeyMap.get(partitionPath);
-      hoodieRecordKeys.forEach(hoodieRecordKey -> {
-        indexFileFilter.getMatchingFilesAndPartition(partitionPath, 
hoodieRecordKey).forEach(partitionFileIdPair -> {
-          fileRecordPairs.add(Pair.of(partitionFileIdPair.getRight(),
-                  new HoodieKey(hoodieRecordKey, partitionPath)));
-        });
-      });
-    });
-    return fileRecordPairs;
-  }
-
-  /**
-   * Find out <RowKey, filename> pair.
-   */
-  Map<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
-      List<Pair<String, HoodieKey>> fileComparisons,
-      HoodieTable hoodieTable) {
-
-    fileComparisons = fileComparisons.stream().sorted((o1, o2) -> 
o1.getLeft().compareTo(o2.getLeft())).collect(toList());
-
-    List<HoodieKeyLookupHandle.KeyLookupResult> keyLookupResults = new 
ArrayList<>();
-
-    Iterator<List<HoodieKeyLookupHandle.KeyLookupResult>> iterator = new 
HoodieFlinkBloomIndexCheckFunction(hoodieTable, 
config).apply(fileComparisons.iterator());
-    while (iterator.hasNext()) {
-      keyLookupResults.addAll(iterator.next());
-    }
-
-    Map<HoodieKey, HoodieRecordLocation> hoodieRecordLocationMap = new 
HashMap<>();
-
-    keyLookupResults = keyLookupResults.stream().filter(lr -> 
lr.getMatchingRecordKeys().size() > 0).collect(toList());
-    keyLookupResults.forEach(lookupResult -> {
-      lookupResult.getMatchingRecordKeys().forEach(r -> {
-        hoodieRecordLocationMap.put(new HoodieKey(r, 
lookupResult.getPartitionPath()), new 
HoodieRecordLocation(lookupResult.getBaseInstantTime(), 
lookupResult.getFileId()));
-      });
-    });
-
-    return hoodieRecordLocationMap;
-  }
-
-
-  /**
-   * Tag the <rowKey, filename> back to the original HoodieRecord List.
-   */
-  protected List<HoodieRecord<T>> tagLocationBacktoRecords(
-          Map<HoodieKey, HoodieRecordLocation> keyFilenamePair, 
List<HoodieRecord<T>> records) {
-    Map<HoodieKey, HoodieRecord<T>> keyRecordPairMap = new HashMap<>();
-    records.forEach(r -> keyRecordPairMap.put(r.getKey(), r));
-    // Here as the record might have more data than rowKey (some rowKeys' 
fileId is null),
-    // so we do left outer join.
-    List<Pair<HoodieRecord<T>, HoodieRecordLocation>> newList = new 
ArrayList<>();
-    keyRecordPairMap.keySet().forEach(k -> {
-      if (keyFilenamePair.containsKey(k)) {
-        newList.add(Pair.of(keyRecordPairMap.get(k), keyFilenamePair.get(k)));
-      } else {
-        newList.add(Pair.of(keyRecordPairMap.get(k), null));
-      }
-    });
-    List<HoodieRecord<T>> res = Lists.newArrayList();
-    for (Pair<HoodieRecord<T>, HoodieRecordLocation> v : newList) {
-      res.add(HoodieIndexUtils.getTaggedRecord(v.getLeft(), 
Option.ofNullable(v.getRight())));
-    }
-    return res;
-  }
-
-  @Override
-  public List<WriteStatus> updateLocation(List<WriteStatus> writeStatusList, 
HoodieEngineContext context,
-                                          HoodieTable<T, 
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
-    return writeStatusList;
-  }
 }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieBloomIndex.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieBloomIndex.java
new file mode 100644
index 0000000..47d47c8
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieBloomIndex.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.bloom.HoodieBaseBloomIndex;
+
+/**
+ * Indexing mechanism based on bloom filter. Each parquet file includes its 
row_key bloom filter in its metadata.
+ */
+public class JavaHoodieBloomIndex<T extends HoodieRecordPayload> extends 
HoodieBaseBloomIndex<T> {
+  public JavaHoodieBloomIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java
index 0239ee9..fc7a451 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java
@@ -38,7 +38,7 @@ public abstract class JavaHoodieIndex<T extends 
HoodieRecordPayload> extends Hoo
     super(config);
   }
 
-  public static JavaHoodieIndex createIndex(HoodieWriteConfig config) {
+  public static HoodieIndex createIndex(HoodieWriteConfig config) {
     // first use index class config to create index.
     if (!StringUtils.isNullOrEmpty(config.getIndexClass())) {
       Object instance = ReflectionUtils.loadClass(config.getIndexClass(), 
config);
@@ -52,6 +52,8 @@ public abstract class JavaHoodieIndex<T extends 
HoodieRecordPayload> extends Hoo
     switch (config.getIndexType()) {
       case INMEMORY:
         return new JavaInMemoryHashIndex(config);
+      case BLOOM:
+        return new JavaHoodieBloomIndex(config);
       default:
         throw new HoodieIndexException("Unsupported index type " + 
config.getIndexType());
     }

Reply via email to