alexeykudinkin commented on a change in pull request #4480:
URL: https://github.com/apache/hudi/pull/4480#discussion_r816374506



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.hash.HashID;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class ConsistentBucketIdentifier extends BucketIdentifier {
+
+  /**
+   * Hashing metadata of a partition
+   */
+  private final HoodieConsistentHashingMetadata metadata;
+  /**
+   * in-memory structure to speed up rang mapping (hashing value -> hashing 
node)

Review comment:
       Typo

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.hash.HashID;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class ConsistentBucketIdentifier extends BucketIdentifier {
+
+  /**
+   * Hashing metadata of a partition
+   */
+  private final HoodieConsistentHashingMetadata metadata;
+  /**
+   * in-memory structure to speed up rang mapping (hashing value -> hashing 
node)
+   */
+  private TreeMap<Integer, ConsistentHashingNode> ring;
+  /**
+   * mapping from fileId -> hashing node
+   */
+  private Map<String, ConsistentHashingNode> fileIdToBucket;
+
+  public ConsistentBucketIdentifier(HoodieConsistentHashingMetadata metadata) {
+    this.metadata = metadata;
+    initialize();
+  }
+
+  public Collection<ConsistentHashingNode> getNodes() {
+    return ring.values();
+  }
+
+  public HoodieConsistentHashingMetadata getMetadata() {
+    return metadata;
+  }
+
+  public int getNumBuckets() {
+    return getNodes().size();
+  }
+
+  /**
+   * Get bucket of the given file group
+   *
+   * @param fileId the file group id. NOTE: not filePfx (i.e., uuid)
+   * @return
+   */
+  public ConsistentHashingNode getBucketByFileId(String fileId) {
+    return fileIdToBucket.get(fileId);
+  }
+
+  public ConsistentHashingNode getBucket(HoodieKey hoodieKey, String 
indexKeyFields) {
+    return getBucket(getHashKeys(hoodieKey, indexKeyFields));
+  }
+
+  protected ConsistentHashingNode getBucket(List<String> hashKeys) {
+    int hashValue = 0;
+    for (int i = 0; i < hashKeys.size(); ++i) {
+      hashValue = HashID.getXXHash32(hashKeys.get(i), hashValue);
+    }
+    return getBucket(hashValue & 
HoodieConsistentHashingMetadata.MAX_HASH_VALUE);

Review comment:
       Let's rename `MAX_HASH_VALUE` to be `HASH_VALUE_MASK` to make it clear 
that it's used as a mask 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+/**
+ * Consistent hashing bucket index implementation, with auto-adjust bucket 
number.
+ * NOTE: bucket resizing is triggered by clustering.
+ */
+public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieSparkConsistentBucketIndex.class);
+
+  private Map<String, ConsistentBucketIdentifier> partitionToIdentifier;
+
+  public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> 
writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws 
HoodieIndexException {
+    return writeStatuses;
+  }
+
+  /**
+   * Do nothing.
+   * A failed write may create a hashing metadata for a partition. In this 
case, we still do nothing when rolling back
+   * the failed write. Because the hashing metadata created by a write must 
have 00000000000000 timestamp and can be viewed

Review comment:
       But what about completed commits?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
##########
@@ -68,62 +69,46 @@ public HoodieBucketIndex(HoodieWriteConfig config) {
       HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
       HoodieTable hoodieTable)
       throws HoodieIndexException {
-    HoodieData<HoodieRecord<R>> taggedRecords = 
records.mapPartitions(recordIter -> {
-      // partitionPath -> bucketId -> fileInfo
-      Map<String, Map<Integer, Pair<String, String>>> partitionPathFileIDList 
= new HashMap<>();
-      return new LazyIterableIterator<HoodieRecord<R>, 
HoodieRecord<R>>(recordIter) {
+    // initialize necessary information before tagging. e.g., hashing metadata
+    List<String> partitions = 
records.map(HoodieRecord::getPartitionPath).distinct().collectAsList();
+    LOG.info("Initializing hashing metadata for partitions: " + partitions);
+    initialize(hoodieTable, partitions);
 
-        @Override
-        protected void start() {
+    return records.mapPartitions(iterator ->
+        new LazyIterableIterator<HoodieRecord<R>, HoodieRecord<R>>(iterator) {
 
-        }
+          @Override
+          protected void start() {

Review comment:
       If these methods are clearly optional, should we add default 
implementations for them to avoid this boilerplate?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+/**
+ * Consistent hashing bucket index implementation, with auto-adjust bucket 
number.
+ * NOTE: bucket resizing is triggered by clustering.
+ */
+public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieSparkConsistentBucketIndex.class);
+
+  private Map<String, ConsistentBucketIdentifier> partitionToIdentifier;

Review comment:
       Same comment as above regarding `final`, please fix across the PR 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+/**
+ * Consistent hashing bucket index implementation, with auto-adjust bucket 
number.
+ * NOTE: bucket resizing is triggered by clustering.
+ */
+public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieSparkConsistentBucketIndex.class);
+
+  private Map<String, ConsistentBucketIdentifier> partitionToIdentifier;
+
+  public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> 
writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws 
HoodieIndexException {
+    return writeStatuses;
+  }
+
+  /**
+   * Do nothing.
+   * A failed write may create a hashing metadata for a partition. In this 
case, we still do nothing when rolling back
+   * the failed write. Because the hashing metadata created by a write must 
have 00000000000000 timestamp and can be viewed
+   * as the initialization of a partition rather than as a part of the failed 
write.
+   * @param instantTime
+   * @return
+   */
+  @Override
+  public boolean rollbackCommit(String instantTime) {
+    return true;
+  }
+
+  /**
+   * Initialize bucket metadata for each partition
+   * @param table
+   * @param partitions partitions that need to be initialized
+   */
+  @Override
+  protected void initialize(HoodieTable table, List<String> partitions) {
+    partitionToIdentifier = new HashMap(partitions.size() + partitions.size() 
/ 3);
+
+    // TODO maybe parallel
+    partitions.stream().forEach(p -> {
+      HoodieConsistentHashingMetadata metadata = loadOrCreateMetadata(table, 
p);
+      ConsistentBucketIdentifier identifier = new 
ConsistentBucketIdentifier(metadata);
+      partitionToIdentifier.put(p, identifier);
+    });
+  }
+
+  /**
+   * Get bucket location for given key and partition
+   *
+   * @param key
+   * @param partitionPath
+   * @return
+   */
+  @Override
+  protected HoodieRecordLocation getBucket(HoodieKey key, String 
partitionPath) {
+    ConsistentHashingNode node = 
partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields);
+    if (node.getFileIdPfx() != null && !node.getFileIdPfx().isEmpty()) {
+      /**
+       * Dynamic Bucket Index doesn't need the instant time of the latest file 
group.
+       * We add suffix 0 here to the file uuid, following the naming 
convention.

Review comment:
       Can you please explicitly call out convention you're referring to?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.hash.HashID;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class ConsistentBucketIdentifier extends BucketIdentifier {
+
+  /**
+   * Hashing metadata of a partition
+   */
+  private final HoodieConsistentHashingMetadata metadata;
+  /**
+   * in-memory structure to speed up rang mapping (hashing value -> hashing 
node)
+   */
+  private TreeMap<Integer, ConsistentHashingNode> ring;
+  /**
+   * mapping from fileId -> hashing node
+   */
+  private Map<String, ConsistentHashingNode> fileIdToBucket;

Review comment:
       Please make all fields final

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Simple bucket index implementation, with fixed bucket number.
+ */
+public class HoodieSimpleBucketIndex extends HoodieBucketIndex {
+
+  private static final Logger LOG =  
LogManager.getLogger(HoodieSimpleBucketIndex.class);
+
+  /**
+   * partitionPath -> bucketId -> fileInfo
+   */
+  Map<String, Map<Integer, Pair<String, String>>> partitionPathFileIDList;
+
+  public HoodieSimpleBucketIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  private Map<Integer, Pair<String, String>> 
loadPartitionBucketIdFileIdMapping(
+      HoodieTable hoodieTable,
+      String partition) {
+    // bucketId -> fileIds
+    Map<Integer, Pair<String, String>> fileIDList = new HashMap<>();

Review comment:
       `fileIdList` doesn't seem to be particularly representative, shall we 
call it `bucketIdToFileIdMapping`?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.hash.HashID;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class ConsistentBucketIdentifier extends BucketIdentifier {
+
+  /**
+   * Hashing metadata of a partition
+   */
+  private final HoodieConsistentHashingMetadata metadata;
+  /**
+   * in-memory structure to speed up rang mapping (hashing value -> hashing 
node)
+   */
+  private TreeMap<Integer, ConsistentHashingNode> ring;
+  /**
+   * mapping from fileId -> hashing node

Review comment:
       Let's make comments consistent (should start with capitalized)

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
##########
@@ -37,28 +36,30 @@
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.List;
 
 /**
  * Hash indexing mechanism.
  */
-public class HoodieBucketIndex extends HoodieIndex<Object, Object> {
+public abstract class HoodieBucketIndex extends HoodieIndex<Object, Object> {
 
-  private static final Logger LOG =  
LogManager.getLogger(HoodieBucketIndex.class);
+  private static final Logger LOG = 
LogManager.getLogger(HoodieBucketIndex.class);
 
-  private final int numBuckets;
+  protected int numBuckets;
+  protected String indexKeyFields;
 
   public HoodieBucketIndex(HoodieWriteConfig config) {
     super(config);
-    numBuckets = config.getBucketIndexNumBuckets();
-    LOG.info("use bucket index, numBuckets=" + numBuckets);
+
+    this.numBuckets = config.getBucketIndexNumBuckets();
+    this.indexKeyFields = config.getBucketIndexHashField();
+    LOG.info("use bucket index, numBuckets = " + numBuckets + ", indexFields: 
" + indexKeyFields);

Review comment:
       Please fix log statement to start w/ capital

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+/**
+ * Consistent hashing bucket index implementation, with auto-adjust bucket 
number.
+ * NOTE: bucket resizing is triggered by clustering.
+ */
+public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieSparkConsistentBucketIndex.class);
+
+  private Map<String, ConsistentBucketIdentifier> partitionToIdentifier;
+
+  public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> 
writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws 
HoodieIndexException {
+    return writeStatuses;
+  }
+
+  /**
+   * Do nothing.
+   * A failed write may create a hashing metadata for a partition. In this 
case, we still do nothing when rolling back
+   * the failed write. Because the hashing metadata created by a write must 
have 00000000000000 timestamp and can be viewed
+   * as the initialization of a partition rather than as a part of the failed 
write.
+   * @param instantTime
+   * @return
+   */
+  @Override
+  public boolean rollbackCommit(String instantTime) {
+    return true;
+  }
+
+  /**
+   * Initialize bucket metadata for each partition
+   * @param table
+   * @param partitions partitions that need to be initialized
+   */
+  @Override
+  protected void initialize(HoodieTable table, List<String> partitions) {
+    partitionToIdentifier = new HashMap(partitions.size() + partitions.size() 
/ 3);
+
+    // TODO maybe parallel
+    partitions.stream().forEach(p -> {
+      HoodieConsistentHashingMetadata metadata = loadOrCreateMetadata(table, 
p);
+      ConsistentBucketIdentifier identifier = new 
ConsistentBucketIdentifier(metadata);
+      partitionToIdentifier.put(p, identifier);
+    });
+  }
+
+  /**
+   * Get bucket location for given key and partition
+   *
+   * @param key
+   * @param partitionPath
+   * @return
+   */
+  @Override
+  protected HoodieRecordLocation getBucket(HoodieKey key, String 
partitionPath) {
+    ConsistentHashingNode node = 
partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields);
+    if (node.getFileIdPfx() != null && !node.getFileIdPfx().isEmpty()) {
+      /**
+       * Dynamic Bucket Index doesn't need the instant time of the latest file 
group.
+       * We add suffix 0 here to the file uuid, following the naming 
convention.
+       */
+      return new HoodieRecordLocation(null, 
FSUtils.createNewFileId(node.getFileIdPfx(), 0));
+    }
+
+    LOG.error("Consistent hashing node has no file group, partition: " + 
partitionPath + ", meta: "
+        + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() 
+ ", record_key: " + key.toString());
+    throw new HoodieIndexException("Failed to getBucket as hashing node has no 
file group");
+  }
+
+  /**
+   * Load hashing metadata of the given partition, if it is not existed, 
create a new one (also persist it into storage)
+   *
+   * @param table     hoodie table
+   * @param partition table partition
+   * @return Consistent hashing metadata
+   */
+  public HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable 
table, String partition) {
+    int retry = 3;
+    // TODO maybe use ConsistencyGuard to do the retry thing
+    // retry to allow concurrent creation of metadata (only one attempt can 
succeed)
+    while (retry-- > 0) {

Review comment:
       That indeed seems like a perfect candidate for abstraction w/in 
`ConsistencyGuard`

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Simple bucket index implementation, with fixed bucket number.
+ */
+public class HoodieSimpleBucketIndex extends HoodieBucketIndex {
+
+  private static final Logger LOG =  
LogManager.getLogger(HoodieSimpleBucketIndex.class);
+
+  /**
+   * partitionPath -> bucketId -> fileInfo
+   */
+  Map<String, Map<Integer, Pair<String, String>>> partitionPathFileIDList;
+
+  public HoodieSimpleBucketIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  private Map<Integer, Pair<String, String>> 
loadPartitionBucketIdFileIdMapping(
+      HoodieTable hoodieTable,
+      String partition) {
+    // bucketId -> fileIds
+    Map<Integer, Pair<String, String>> fileIDList = new HashMap<>();
+    hoodieTable.getMetaClient().reloadActiveTimeline();
+    HoodieIndexUtils
+        .getLatestBaseFilesForPartition(partition, hoodieTable)
+        .forEach(file -> {
+          String fileId = file.getFileId();
+          String commitTime = file.getCommitTime();
+          int bucketId = BucketIdentifier.bucketIdFromFileId(fileId);
+          if (!fileIDList.containsKey(bucketId)) {
+            fileIDList.put(bucketId, Pair.of(fileId, commitTime));
+          } else {
+            // check if bucket data is valid
+            throw new HoodieIOException("Find multiple files at partition 
path="
+                + partition + " belongs to the same bucket id = " + bucketId);
+          }
+        });
+    return fileIDList;
+  }
+
+  @Override
+  public boolean canIndexLogFiles() {
+    return false;
+  }
+
+  @Override
+  protected void initialize(HoodieTable table, List<String> partitions) {
+    partitionPathFileIDList = new HashMap<>();
+    partitions.forEach(p -> partitionPathFileIDList.put(p, 
loadPartitionBucketIdFileIdMapping(table, p)));
+  }
+
+  @Override
+  protected HoodieRecordLocation getBucket(HoodieKey key, String 
partitionPath) {
+    int bucketId = BucketIdentifier.getBucketId(key, 
config.getBucketIndexHashField(), numBuckets);
+    if (partitionPathFileIDList.get(partitionPath).containsKey(bucketId)) {

Review comment:
       Let's do just 2 hash-table lookup instead of 4

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Simple bucket index implementation, with fixed bucket number.
+ */
+public class HoodieSimpleBucketIndex extends HoodieBucketIndex {
+
+  private static final Logger LOG =  
LogManager.getLogger(HoodieSimpleBucketIndex.class);
+
+  /**
+   * partitionPath -> bucketId -> fileInfo
+   */
+  Map<String, Map<Integer, Pair<String, String>>> partitionPathFileIDList;

Review comment:
       Please make this `private final`, ditto everywhere

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+/**
+ * Consistent hashing bucket index implementation, with auto-adjust bucket 
number.
+ * NOTE: bucket resizing is triggered by clustering.
+ */
+public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieSparkConsistentBucketIndex.class);
+
+  private Map<String, ConsistentBucketIdentifier> partitionToIdentifier;
+
+  public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> 
writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws 
HoodieIndexException {
+    return writeStatuses;
+  }
+
+  /**
+   * Do nothing.
+   * A failed write may create a hashing metadata for a partition. In this 
case, we still do nothing when rolling back
+   * the failed write. Because the hashing metadata created by a write must 
have 00000000000000 timestamp and can be viewed
+   * as the initialization of a partition rather than as a part of the failed 
write.
+   * @param instantTime
+   * @return
+   */
+  @Override
+  public boolean rollbackCommit(String instantTime) {
+    return true;
+  }
+
+  /**
+   * Initialize bucket metadata for each partition
+   * @param table
+   * @param partitions partitions that need to be initialized
+   */
+  @Override
+  protected void initialize(HoodieTable table, List<String> partitions) {
+    partitionToIdentifier = new HashMap(partitions.size() + partitions.size() 
/ 3);
+
+    // TODO maybe parallel
+    partitions.stream().forEach(p -> {
+      HoodieConsistentHashingMetadata metadata = loadOrCreateMetadata(table, 
p);
+      ConsistentBucketIdentifier identifier = new 
ConsistentBucketIdentifier(metadata);
+      partitionToIdentifier.put(p, identifier);
+    });
+  }
+
+  /**
+   * Get bucket location for given key and partition
+   *
+   * @param key
+   * @param partitionPath
+   * @return
+   */
+  @Override
+  protected HoodieRecordLocation getBucket(HoodieKey key, String 
partitionPath) {
+    ConsistentHashingNode node = 
partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields);
+    if (node.getFileIdPfx() != null && !node.getFileIdPfx().isEmpty()) {
+      /**
+       * Dynamic Bucket Index doesn't need the instant time of the latest file 
group.
+       * We add suffix 0 here to the file uuid, following the naming 
convention.
+       */
+      return new HoodieRecordLocation(null, 
FSUtils.createNewFileId(node.getFileIdPfx(), 0));
+    }
+
+    LOG.error("Consistent hashing node has no file group, partition: " + 
partitionPath + ", meta: "
+        + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() 
+ ", record_key: " + key.toString());
+    throw new HoodieIndexException("Failed to getBucket as hashing node has no 
file group");
+  }
+
+  /**
+   * Load hashing metadata of the given partition, if it is not existed, 
create a new one (also persist it into storage)
+   *
+   * @param table     hoodie table
+   * @param partition table partition
+   * @return Consistent hashing metadata
+   */
+  public HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable 
table, String partition) {
+    int retry = 3;
+    // TODO maybe use ConsistencyGuard to do the retry thing
+    // retry to allow concurrent creation of metadata (only one attempt can 
succeed)
+    while (retry-- > 0) {
+      HoodieConsistentHashingMetadata metadata = loadMetadata(table, 
partition);
+      if (metadata == null) {
+        metadata = new HoodieConsistentHashingMetadata(partition, numBuckets);
+        if (saveMetadata(table, metadata, false)) {
+          return metadata;
+        }
+      } else {
+        return metadata;
+      }
+    }
+    throw new HoodieIndexException("Failed to load or create metadata, 
partition: " + partition);
+  }
+
+  /**
+   * Load hashing metadata of the given partition, if it is not existed, 
return null
+   *
+   * @param table     hoodie table
+   * @param partition table partition
+   * @return Consistent hashing metadata or null if it does not exist
+   */
+  public static HoodieConsistentHashingMetadata loadMetadata(HoodieTable 
table, String partition) {
+    Path metadataPath = 
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), 
partition);
+
+    try {
+      if (!table.getMetaClient().getFs().exists(metadataPath)) {
+        return null;
+      }
+      FileStatus[] metaFiles = 
table.getMetaClient().getFs().listStatus(metadataPath);
+      HoodieTimeline completedCommits = 
table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants();

Review comment:
       What about compactions? Can the index be updated after compaction?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+/**
+ * Consistent hashing bucket index implementation, with auto-adjust bucket 
number.
+ * NOTE: bucket resizing is triggered by clustering.
+ */
+public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieSparkConsistentBucketIndex.class);
+
+  private Map<String, ConsistentBucketIdentifier> partitionToIdentifier;
+
+  public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> 
writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws 
HoodieIndexException {
+    return writeStatuses;
+  }
+
+  /**
+   * Do nothing.
+   * A failed write may create a hashing metadata for a partition. In this 
case, we still do nothing when rolling back
+   * the failed write. Because the hashing metadata created by a write must 
have 00000000000000 timestamp and can be viewed
+   * as the initialization of a partition rather than as a part of the failed 
write.
+   * @param instantTime
+   * @return
+   */
+  @Override
+  public boolean rollbackCommit(String instantTime) {
+    return true;
+  }
+
+  /**
+   * Initialize bucket metadata for each partition
+   * @param table
+   * @param partitions partitions that need to be initialized
+   */
+  @Override
+  protected void initialize(HoodieTable table, List<String> partitions) {
+    partitionToIdentifier = new HashMap(partitions.size() + partitions.size() 
/ 3);
+
+    // TODO maybe parallel
+    partitions.stream().forEach(p -> {
+      HoodieConsistentHashingMetadata metadata = loadOrCreateMetadata(table, 
p);
+      ConsistentBucketIdentifier identifier = new 
ConsistentBucketIdentifier(metadata);
+      partitionToIdentifier.put(p, identifier);
+    });
+  }
+
+  /**
+   * Get bucket location for given key and partition
+   *
+   * @param key
+   * @param partitionPath
+   * @return
+   */
+  @Override
+  protected HoodieRecordLocation getBucket(HoodieKey key, String 
partitionPath) {
+    ConsistentHashingNode node = 
partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields);
+    if (node.getFileIdPfx() != null && !node.getFileIdPfx().isEmpty()) {
+      /**
+       * Dynamic Bucket Index doesn't need the instant time of the latest file 
group.
+       * We add suffix 0 here to the file uuid, following the naming 
convention.
+       */
+      return new HoodieRecordLocation(null, 
FSUtils.createNewFileId(node.getFileIdPfx(), 0));
+    }
+
+    LOG.error("Consistent hashing node has no file group, partition: " + 
partitionPath + ", meta: "
+        + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() 
+ ", record_key: " + key.toString());
+    throw new HoodieIndexException("Failed to getBucket as hashing node has no 
file group");
+  }
+
+  /**
+   * Load hashing metadata of the given partition, if it is not existed, 
create a new one (also persist it into storage)
+   *
+   * @param table     hoodie table
+   * @param partition table partition
+   * @return Consistent hashing metadata
+   */
+  public HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable 
table, String partition) {
+    int retry = 3;
+    // TODO maybe use ConsistencyGuard to do the retry thing
+    // retry to allow concurrent creation of metadata (only one attempt can 
succeed)
+    while (retry-- > 0) {
+      HoodieConsistentHashingMetadata metadata = loadMetadata(table, 
partition);
+      if (metadata == null) {
+        metadata = new HoodieConsistentHashingMetadata(partition, numBuckets);
+        if (saveMetadata(table, metadata, false)) {
+          return metadata;
+        }
+      } else {
+        return metadata;
+      }
+    }
+    throw new HoodieIndexException("Failed to load or create metadata, 
partition: " + partition);
+  }
+
+  /**
+   * Load hashing metadata of the given partition, if it is not existed, 
return null
+   *
+   * @param table     hoodie table
+   * @param partition table partition
+   * @return Consistent hashing metadata or null if it does not exist
+   */
+  public static HoodieConsistentHashingMetadata loadMetadata(HoodieTable 
table, String partition) {
+    Path metadataPath = 
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), 
partition);
+
+    try {
+      if (!table.getMetaClient().getFs().exists(metadataPath)) {
+        return null;
+      }
+      FileStatus[] metaFiles = 
table.getMetaClient().getFs().listStatus(metadataPath);
+      HoodieTimeline completedCommits = 
table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants();
+      Predicate<FileStatus> metaFilePredicate = fileStatus -> {
+        String filename = fileStatus.getPath().getName();
+        return 
filename.contains(HoodieConsistentHashingMetadata.HASHING_METADATA_FILE_SUFFIX)
+            && 
(completedCommits.containsInstant(HoodieConsistentHashingMetadata.getTimestampFromFile(filename))
+            || 
HoodieConsistentHashingMetadata.getTimestampFromFile(filename).equals(HoodieTimeline.INIT_INSTANT_TS));
+      };
+
+      FileStatus metaFile = Arrays.stream(metaFiles).filter(metaFilePredicate)
+          .max(Comparator.comparing(a -> a.getPath().getName())).orElse(null);
+
+      if (metaFile != null) {
+        byte[] content = 
FileIOUtils.readAsByteArray(table.getMetaClient().getFs().open(metaFile.getPath()));
+        return HoodieConsistentHashingMetadata.fromBytes(content);
+      }
+
+      return null;
+    } catch (IOException e) {
+      LOG.warn("Error when loading hashing metadata, partition: " + partition 
+ ", error meg: " + e.getMessage());
+      throw new HoodieIndexException("Error while loading hashing metadata, " 
+ e.getMessage());
+    }
+  }
+
+  /**
+   * Save metadata into storage
+   * @param table
+   * @param metadata
+   * @param overwrite
+   * @return
+   */
+  private static boolean saveMetadata(HoodieTable table, 
HoodieConsistentHashingMetadata metadata, boolean overwrite) {
+    FSDataOutputStream fsOut = null;
+    HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
+    Path dir = 
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), 
metadata.getPartitionPath());
+    Path fullPath = new Path(dir, metadata.getFilename());
+    try {
+      byte[] bytes = metadata.toBytes();
+      fsOut = fs.create(fullPath, overwrite);
+      fsOut.write(bytes);
+      fsOut.close();
+      return true;
+    } catch (IOException e) {
+      e.printStackTrace();

Review comment:
       Please clean this up, instead log as error with exception

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.hash.HashID;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class ConsistentBucketIdentifier extends BucketIdentifier {
+
+  /**
+   * Hashing metadata of a partition
+   */
+  private final HoodieConsistentHashingMetadata metadata;
+  /**
+   * in-memory structure to speed up rang mapping (hashing value -> hashing 
node)
+   */
+  private TreeMap<Integer, ConsistentHashingNode> ring;

Review comment:
       I actually didn't see us ever re-balancing that ring. Is this going to 
be added in a follow-up PR? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieConsistentHashingMetadata.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * All the metadata that is used for consistent hashing bucket index
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class HoodieConsistentHashingMetadata implements Serializable {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieConsistentHashingMetadata.class);
+
+  public static final int MAX_HASH_VALUE = Integer.MAX_VALUE;
+  public static final String HASHING_METADATA_FILE_SUFFIX = ".hashing_meta";
+
+  protected short version;
+  protected String partitionPath;
+  protected String instant;
+  protected int numBuckets;
+  protected int seqNo;
+  protected List<ConsistentHashingNode> nodes;
+
+  public HoodieConsistentHashingMetadata() {
+  }
+
+  public HoodieConsistentHashingMetadata(String partitionPath, int numBuckets) 
{
+    this((short) 0, partitionPath, HoodieTimeline.INIT_INSTANT_TS, numBuckets);
+  }
+
+  /**
+   * Construct default metadata with all bucket's file group uuid initialized
+   *
+   * @param partitionPath
+   * @param numBuckets
+   */
+  private HoodieConsistentHashingMetadata(short version, String partitionPath, 
String instant, int numBuckets) {
+    this.version = version;
+    this.partitionPath = partitionPath;
+    this.instant = instant;
+    this.numBuckets = numBuckets;
+
+    nodes = new ArrayList<>();
+    long step = ((long) MAX_HASH_VALUE + numBuckets - 1) / numBuckets;
+    for (int i = 1; i <= numBuckets; ++i) {
+      nodes.add(new ConsistentHashingNode((int) Math.min(step * i, 
MAX_HASH_VALUE), FSUtils.createNewFileIdPfx()));
+    }
+  }
+
+  public short getVersion() {
+    return version;
+  }
+
+  public String getPartitionPath() {
+    return partitionPath;
+  }
+
+  public String getInstant() {
+    return instant;
+  }
+
+  public int getNumBuckets() {
+    return numBuckets;
+  }
+
+  public int getSeqNo() {
+    return seqNo;
+  }
+
+  public List<ConsistentHashingNode> getNodes() {
+    return nodes;
+  }
+
+  public void setInstant(String instant) {
+    this.instant = instant;
+  }
+
+  public void setNodes(List<ConsistentHashingNode> nodes) {
+    this.nodes = nodes;
+    this.numBuckets = nodes.size();
+  }
+
+  public void setSeqNo(int seqNo) {
+    this.seqNo = seqNo;
+  }
+
+  public String getFilename() {
+    return instant + HASHING_METADATA_FILE_SUFFIX;
+  }
+
+  public byte[] toBytes() throws IOException {
+    return toJsonString().getBytes(StandardCharsets.UTF_8);
+  }
+
+  public static HoodieConsistentHashingMetadata fromBytes(byte[] bytes) throws 
IOException {
+    try {
+      return fromJsonString(new String(bytes, StandardCharsets.UTF_8), 
HoodieConsistentHashingMetadata.class);
+    } catch (Exception e) {
+      throw new IOException("unable to read hashing metadata", e);
+    }
+  }
+
+  private String toJsonString() throws IOException {
+    return 
getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this);
+  }
+
+  protected static <T> T fromJsonString(String jsonStr, Class<T> clazz) throws 
Exception {
+    if (jsonStr == null || jsonStr.isEmpty()) {
+      // For empty commit file (no data or somethings bad happen).
+      return clazz.newInstance();
+    }
+    return getObjectMapper().readValue(jsonStr, clazz);
+  }
+
+  protected static ObjectMapper getObjectMapper() {
+    ObjectMapper mapper = new ObjectMapper();

Review comment:
       Please check my other comment regarding `ObjectMapper`

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieConsistentHashingMetadata.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * All the metadata that is used for consistent hashing bucket index
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class HoodieConsistentHashingMetadata implements Serializable {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieConsistentHashingMetadata.class);
+
+  public static final int MAX_HASH_VALUE = Integer.MAX_VALUE;
+  public static final String HASHING_METADATA_FILE_SUFFIX = ".hashing_meta";
+
+  protected short version;
+  protected String partitionPath;
+  protected String instant;
+  protected int numBuckets;
+  protected int seqNo;
+  protected List<ConsistentHashingNode> nodes;
+
+  public HoodieConsistentHashingMetadata() {

Review comment:
       As far as i can tell this is only serialized/de by Jackson. Let's make 
this object immutable (all fields final, no setters) and instead use 
`@JsonCreator` annotations

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieConsistentBucketLayout.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.storage;
+
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Storage layout when using consistent hashing bucket index.
+ */
+public class HoodieConsistentBucketLayout extends HoodieStorageLayout {
+  public static final Set<WriteOperationType> SUPPORTED_OPERATIONS = new 
HashSet<WriteOperationType>() {

Review comment:
       You can use `CollectionUtils.createImmutableSet` for that

##########
File path: 
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.keygen.KeyGenUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+public class TestBucketIdentifier {
+
+  public static final String NESTED_COL_SCHEMA = "{\"type\":\"record\", 
\"name\":\"nested_col\",\"fields\": ["
+      + "{\"name\": \"prop1\",\"type\": \"string\"},{\"name\": \"prop2\", 
\"type\": \"long\"}]}";
+  public static final String EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": 
\"testrec\",\"fields\": [ "
+      + "{\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", 
\"type\": \"string\"},"
+      + "{\"name\": \"ts_ms\", \"type\": \"string\"},"
+      + "{\"name\": \"pii_col\", \"type\": \"string\"},"
+      + "{\"name\": \"nested_col\",\"type\": "
+      + NESTED_COL_SCHEMA + "}"
+      + "]}";
+
+  @Test
+  public void testBucketFileId() {
+    for (int i = 0; i < 1000; i++) {

Review comment:
       What's the point of doing 1000 iterations?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+/**
+ * Consistent hashing bucket index implementation, with auto-adjust bucket 
number.
+ * NOTE: bucket resizing is triggered by clustering.
+ */
+public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieSparkConsistentBucketIndex.class);
+
+  private Map<String, ConsistentBucketIdentifier> partitionToIdentifier;
+
+  public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> 
writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws 
HoodieIndexException {
+    return writeStatuses;
+  }
+
+  /**
+   * Do nothing.
+   * A failed write may create a hashing metadata for a partition. In this 
case, we still do nothing when rolling back
+   * the failed write. Because the hashing metadata created by a write must 
have 00000000000000 timestamp and can be viewed
+   * as the initialization of a partition rather than as a part of the failed 
write.
+   * @param instantTime
+   * @return
+   */
+  @Override
+  public boolean rollbackCommit(String instantTime) {
+    return true;
+  }
+
+  /**
+   * Initialize bucket metadata for each partition
+   * @param table
+   * @param partitions partitions that need to be initialized
+   */
+  @Override
+  protected void initialize(HoodieTable table, List<String> partitions) {
+    partitionToIdentifier = new HashMap(partitions.size() + partitions.size() 
/ 3);
+
+    // TODO maybe parallel
+    partitions.stream().forEach(p -> {
+      HoodieConsistentHashingMetadata metadata = loadOrCreateMetadata(table, 
p);
+      ConsistentBucketIdentifier identifier = new 
ConsistentBucketIdentifier(metadata);
+      partitionToIdentifier.put(p, identifier);
+    });
+  }
+
+  /**
+   * Get bucket location for given key and partition
+   *
+   * @param key
+   * @param partitionPath
+   * @return
+   */
+  @Override
+  protected HoodieRecordLocation getBucket(HoodieKey key, String 
partitionPath) {
+    ConsistentHashingNode node = 
partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields);
+    if (node.getFileIdPfx() != null && !node.getFileIdPfx().isEmpty()) {
+      /**
+       * Dynamic Bucket Index doesn't need the instant time of the latest file 
group.
+       * We add suffix 0 here to the file uuid, following the naming 
convention.
+       */
+      return new HoodieRecordLocation(null, 
FSUtils.createNewFileId(node.getFileIdPfx(), 0));
+    }
+
+    LOG.error("Consistent hashing node has no file group, partition: " + 
partitionPath + ", meta: "
+        + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() 
+ ", record_key: " + key.toString());
+    throw new HoodieIndexException("Failed to getBucket as hashing node has no 
file group");
+  }
+
+  /**
+   * Load hashing metadata of the given partition, if it is not existed, 
create a new one (also persist it into storage)
+   *
+   * @param table     hoodie table
+   * @param partition table partition
+   * @return Consistent hashing metadata
+   */
+  public HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable 
table, String partition) {
+    int retry = 3;
+    // TODO maybe use ConsistencyGuard to do the retry thing
+    // retry to allow concurrent creation of metadata (only one attempt can 
succeed)
+    while (retry-- > 0) {
+      HoodieConsistentHashingMetadata metadata = loadMetadata(table, 
partition);
+      if (metadata == null) {
+        metadata = new HoodieConsistentHashingMetadata(partition, numBuckets);
+        if (saveMetadata(table, metadata, false)) {
+          return metadata;
+        }
+      } else {
+        return metadata;
+      }
+    }
+    throw new HoodieIndexException("Failed to load or create metadata, 
partition: " + partition);
+  }
+
+  /**
+   * Load hashing metadata of the given partition, if it is not existed, 
return null
+   *
+   * @param table     hoodie table
+   * @param partition table partition
+   * @return Consistent hashing metadata or null if it does not exist
+   */
+  public static HoodieConsistentHashingMetadata loadMetadata(HoodieTable 
table, String partition) {
+    Path metadataPath = 
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), 
partition);
+
+    try {
+      if (!table.getMetaClient().getFs().exists(metadataPath)) {
+        return null;
+      }
+      FileStatus[] metaFiles = 
table.getMetaClient().getFs().listStatus(metadataPath);
+      HoodieTimeline completedCommits = 
table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants();
+      Predicate<FileStatus> metaFilePredicate = fileStatus -> {
+        String filename = fileStatus.getPath().getName();
+        return 
filename.contains(HoodieConsistentHashingMetadata.HASHING_METADATA_FILE_SUFFIX)
+            && 
(completedCommits.containsInstant(HoodieConsistentHashingMetadata.getTimestampFromFile(filename))
+            || 
HoodieConsistentHashingMetadata.getTimestampFromFile(filename).equals(HoodieTimeline.INIT_INSTANT_TS));

Review comment:
       Please extract to a var

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+/**
+ * Consistent hashing bucket index implementation, with auto-adjust bucket 
number.
+ * NOTE: bucket resizing is triggered by clustering.
+ */
+public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieSparkConsistentBucketIndex.class);
+
+  private Map<String, ConsistentBucketIdentifier> partitionToIdentifier;
+
+  public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> 
writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws 
HoodieIndexException {
+    return writeStatuses;
+  }
+
+  /**
+   * Do nothing.
+   * A failed write may create a hashing metadata for a partition. In this 
case, we still do nothing when rolling back
+   * the failed write. Because the hashing metadata created by a write must 
have 00000000000000 timestamp and can be viewed
+   * as the initialization of a partition rather than as a part of the failed 
write.
+   * @param instantTime
+   * @return
+   */
+  @Override
+  public boolean rollbackCommit(String instantTime) {
+    return true;
+  }
+
+  /**
+   * Initialize bucket metadata for each partition
+   * @param table
+   * @param partitions partitions that need to be initialized
+   */
+  @Override
+  protected void initialize(HoodieTable table, List<String> partitions) {
+    partitionToIdentifier = new HashMap(partitions.size() + partitions.size() 
/ 3);
+
+    // TODO maybe parallel
+    partitions.stream().forEach(p -> {
+      HoodieConsistentHashingMetadata metadata = loadOrCreateMetadata(table, 
p);
+      ConsistentBucketIdentifier identifier = new 
ConsistentBucketIdentifier(metadata);
+      partitionToIdentifier.put(p, identifier);
+    });
+  }
+
+  /**
+   * Get bucket location for given key and partition
+   *
+   * @param key
+   * @param partitionPath
+   * @return
+   */
+  @Override
+  protected HoodieRecordLocation getBucket(HoodieKey key, String 
partitionPath) {
+    ConsistentHashingNode node = 
partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields);
+    if (node.getFileIdPfx() != null && !node.getFileIdPfx().isEmpty()) {
+      /**
+       * Dynamic Bucket Index doesn't need the instant time of the latest file 
group.
+       * We add suffix 0 here to the file uuid, following the naming 
convention.
+       */
+      return new HoodieRecordLocation(null, 
FSUtils.createNewFileId(node.getFileIdPfx(), 0));
+    }
+
+    LOG.error("Consistent hashing node has no file group, partition: " + 
partitionPath + ", meta: "
+        + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() 
+ ", record_key: " + key.toString());
+    throw new HoodieIndexException("Failed to getBucket as hashing node has no 
file group");
+  }
+
+  /**
+   * Load hashing metadata of the given partition, if it is not existed, 
create a new one (also persist it into storage)
+   *
+   * @param table     hoodie table
+   * @param partition table partition
+   * @return Consistent hashing metadata
+   */
+  public HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable 
table, String partition) {
+    int retry = 3;
+    // TODO maybe use ConsistencyGuard to do the retry thing
+    // retry to allow concurrent creation of metadata (only one attempt can 
succeed)
+    while (retry-- > 0) {
+      HoodieConsistentHashingMetadata metadata = loadMetadata(table, 
partition);
+      if (metadata == null) {
+        metadata = new HoodieConsistentHashingMetadata(partition, numBuckets);
+        if (saveMetadata(table, metadata, false)) {
+          return metadata;
+        }
+      } else {
+        return metadata;
+      }
+    }
+    throw new HoodieIndexException("Failed to load or create metadata, 
partition: " + partition);
+  }
+
+  /**
+   * Load hashing metadata of the given partition, if it is not existed, 
return null
+   *
+   * @param table     hoodie table
+   * @param partition table partition
+   * @return Consistent hashing metadata or null if it does not exist
+   */
+  public static HoodieConsistentHashingMetadata loadMetadata(HoodieTable 
table, String partition) {
+    Path metadataPath = 
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), 
partition);
+
+    try {
+      if (!table.getMetaClient().getFs().exists(metadataPath)) {
+        return null;
+      }
+      FileStatus[] metaFiles = 
table.getMetaClient().getFs().listStatus(metadataPath);
+      HoodieTimeline completedCommits = 
table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants();
+      Predicate<FileStatus> metaFilePredicate = fileStatus -> {
+        String filename = fileStatus.getPath().getName();
+        return 
filename.contains(HoodieConsistentHashingMetadata.HASHING_METADATA_FILE_SUFFIX)
+            && 
(completedCommits.containsInstant(HoodieConsistentHashingMetadata.getTimestampFromFile(filename))
+            || 
HoodieConsistentHashingMetadata.getTimestampFromFile(filename).equals(HoodieTimeline.INIT_INSTANT_TS));
+      };
+
+      FileStatus metaFile = Arrays.stream(metaFiles).filter(metaFilePredicate)
+          .max(Comparator.comparing(a -> a.getPath().getName())).orElse(null);
+
+      if (metaFile != null) {

Review comment:
       General rule of thumb is to return as early as possible so that it's 
easier to comprehend control-flow while going top-to-bottom w/o the need to 
rewind back to understand it. As such. please invert the conditional

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Simple bucket index implementation, with fixed bucket number.
+ */
+public class HoodieSimpleBucketIndex extends HoodieBucketIndex {
+
+  private static final Logger LOG =  
LogManager.getLogger(HoodieSimpleBucketIndex.class);
+
+  /**
+   * partitionPath -> bucketId -> fileInfo
+   */
+  Map<String, Map<Integer, Pair<String, String>>> partitionPathFileIDList;

Review comment:
       Why storing `Pair<String, String>` instead of `HoodieRecordLocation`?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+/**
+ * Consistent hashing bucket index implementation, with auto-adjust bucket 
number.
+ * NOTE: bucket resizing is triggered by clustering.
+ */
+public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieSparkConsistentBucketIndex.class);
+
+  private Map<String, ConsistentBucketIdentifier> partitionToIdentifier;
+
+  public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> 
writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws 
HoodieIndexException {
+    return writeStatuses;
+  }
+
+  /**
+   * Do nothing.
+   * A failed write may create a hashing metadata for a partition. In this 
case, we still do nothing when rolling back
+   * the failed write. Because the hashing metadata created by a write must 
have 00000000000000 timestamp and can be viewed
+   * as the initialization of a partition rather than as a part of the failed 
write.
+   * @param instantTime
+   * @return
+   */
+  @Override
+  public boolean rollbackCommit(String instantTime) {
+    return true;
+  }
+
+  /**
+   * Initialize bucket metadata for each partition
+   * @param table
+   * @param partitions partitions that need to be initialized
+   */
+  @Override
+  protected void initialize(HoodieTable table, List<String> partitions) {
+    partitionToIdentifier = new HashMap(partitions.size() + partitions.size() 
/ 3);

Review comment:
       Why overallocating?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/ConsistentHashingNode.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Used in consistent hashing index, representing nodes in the consistent hash 
ring.
+ * Record the end hash range value and its corresponding file group id.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ConsistentHashingNode implements Serializable {
+
+  private int value;
+  private String fileIdPfx;
+
+  public ConsistentHashingNode() {
+  }
+
+  public ConsistentHashingNode(int value, String fileIdPfx) {
+    this.value = value;
+    this.fileIdPfx = fileIdPfx;
+  }
+
+  public static String toJsonString(List<ConsistentHashingNode> nodes) throws 
IOException {
+    return 
getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(nodes);
+  }
+
+  public static List<ConsistentHashingNode> fromJsonString(String json) throws 
Exception {
+    if (json == null || json.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    ConsistentHashingNode[] nodes = getObjectMapper().readValue(json, 
ConsistentHashingNode[].class);
+    return Arrays.asList(nodes);
+  }
+
+  protected static ObjectMapper getObjectMapper() {
+    ObjectMapper mapper = new ObjectMapper();

Review comment:
       This isn't a cheap object to throw around and churning it like that 
chips away its ability to cache encoders.
   
   Let's instead init this as a static object.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to