alexeykudinkin commented on a change in pull request #4480: URL: https://github.com/apache/hudi/pull/4480#discussion_r816374506
########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.ConsistentHashingNode; +import org.apache.hudi.common.model.HoodieConsistentHashingMetadata; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.util.hash.HashID; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +public class ConsistentBucketIdentifier extends BucketIdentifier { + + /** + * Hashing metadata of a partition + */ + private final HoodieConsistentHashingMetadata metadata; + /** + * in-memory structure to speed up rang mapping (hashing value -> hashing node) Review comment: Typo ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.ConsistentHashingNode; +import org.apache.hudi.common.model.HoodieConsistentHashingMetadata; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.util.hash.HashID; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +public class ConsistentBucketIdentifier extends BucketIdentifier { + + /** + * Hashing metadata of a partition + */ + private final HoodieConsistentHashingMetadata metadata; + /** + * in-memory structure to speed up rang mapping (hashing value -> hashing node) + */ + private TreeMap<Integer, ConsistentHashingNode> ring; + /** + * mapping from fileId -> hashing node + */ + private Map<String, ConsistentHashingNode> fileIdToBucket; + + public ConsistentBucketIdentifier(HoodieConsistentHashingMetadata metadata) { + this.metadata = metadata; + initialize(); + } + + public Collection<ConsistentHashingNode> getNodes() { + return ring.values(); + } + + public HoodieConsistentHashingMetadata getMetadata() { + return metadata; + } + + public int getNumBuckets() { + return getNodes().size(); + } + + /** + * Get bucket of the given file group + * + * @param fileId the file group id. NOTE: not filePfx (i.e., uuid) + * @return + */ + public ConsistentHashingNode getBucketByFileId(String fileId) { + return fileIdToBucket.get(fileId); + } + + public ConsistentHashingNode getBucket(HoodieKey hoodieKey, String indexKeyFields) { + return getBucket(getHashKeys(hoodieKey, indexKeyFields)); + } + + protected ConsistentHashingNode getBucket(List<String> hashKeys) { + int hashValue = 0; + for (int i = 0; i < hashKeys.size(); ++i) { + hashValue = HashID.getXXHash32(hashKeys.get(i), hashValue); + } + return getBucket(hashValue & HoodieConsistentHashingMetadata.MAX_HASH_VALUE); Review comment: Let's rename `MAX_HASH_VALUE` to be `HASH_VALUE_MASK` to make it clear that it's used as a mask ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.model.ConsistentHashingNode; +import org.apache.hudi.common.model.HoodieConsistentHashingMetadata; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +/** + * Consistent hashing bucket index implementation, with auto-adjust bucket number. + * NOTE: bucket resizing is triggered by clustering. + */ +public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex { + + private static final Logger LOG = LogManager.getLogger(HoodieSparkConsistentBucketIndex.class); + + private Map<String, ConsistentBucketIdentifier> partitionToIdentifier; + + public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) { + super(config); + } + + @Override + public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws HoodieIndexException { + return writeStatuses; + } + + /** + * Do nothing. + * A failed write may create a hashing metadata for a partition. In this case, we still do nothing when rolling back + * the failed write. Because the hashing metadata created by a write must have 00000000000000 timestamp and can be viewed Review comment: But what about completed commits? ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java ########## @@ -68,62 +69,46 @@ public HoodieBucketIndex(HoodieWriteConfig config) { HoodieData<HoodieRecord<R>> records, HoodieEngineContext context, HoodieTable hoodieTable) throws HoodieIndexException { - HoodieData<HoodieRecord<R>> taggedRecords = records.mapPartitions(recordIter -> { - // partitionPath -> bucketId -> fileInfo - Map<String, Map<Integer, Pair<String, String>>> partitionPathFileIDList = new HashMap<>(); - return new LazyIterableIterator<HoodieRecord<R>, HoodieRecord<R>>(recordIter) { + // initialize necessary information before tagging. e.g., hashing metadata + List<String> partitions = records.map(HoodieRecord::getPartitionPath).distinct().collectAsList(); + LOG.info("Initializing hashing metadata for partitions: " + partitions); + initialize(hoodieTable, partitions); - @Override - protected void start() { + return records.mapPartitions(iterator -> + new LazyIterableIterator<HoodieRecord<R>, HoodieRecord<R>>(iterator) { - } + @Override + protected void start() { Review comment: If these methods are clearly optional, should we add default implementations for them to avoid this boilerplate? ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.model.ConsistentHashingNode; +import org.apache.hudi.common.model.HoodieConsistentHashingMetadata; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +/** + * Consistent hashing bucket index implementation, with auto-adjust bucket number. + * NOTE: bucket resizing is triggered by clustering. + */ +public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex { + + private static final Logger LOG = LogManager.getLogger(HoodieSparkConsistentBucketIndex.class); + + private Map<String, ConsistentBucketIdentifier> partitionToIdentifier; Review comment: Same comment as above regarding `final`, please fix across the PR ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.model.ConsistentHashingNode; +import org.apache.hudi.common.model.HoodieConsistentHashingMetadata; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +/** + * Consistent hashing bucket index implementation, with auto-adjust bucket number. + * NOTE: bucket resizing is triggered by clustering. + */ +public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex { + + private static final Logger LOG = LogManager.getLogger(HoodieSparkConsistentBucketIndex.class); + + private Map<String, ConsistentBucketIdentifier> partitionToIdentifier; + + public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) { + super(config); + } + + @Override + public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws HoodieIndexException { + return writeStatuses; + } + + /** + * Do nothing. + * A failed write may create a hashing metadata for a partition. In this case, we still do nothing when rolling back + * the failed write. Because the hashing metadata created by a write must have 00000000000000 timestamp and can be viewed + * as the initialization of a partition rather than as a part of the failed write. + * @param instantTime + * @return + */ + @Override + public boolean rollbackCommit(String instantTime) { + return true; + } + + /** + * Initialize bucket metadata for each partition + * @param table + * @param partitions partitions that need to be initialized + */ + @Override + protected void initialize(HoodieTable table, List<String> partitions) { + partitionToIdentifier = new HashMap(partitions.size() + partitions.size() / 3); + + // TODO maybe parallel + partitions.stream().forEach(p -> { + HoodieConsistentHashingMetadata metadata = loadOrCreateMetadata(table, p); + ConsistentBucketIdentifier identifier = new ConsistentBucketIdentifier(metadata); + partitionToIdentifier.put(p, identifier); + }); + } + + /** + * Get bucket location for given key and partition + * + * @param key + * @param partitionPath + * @return + */ + @Override + protected HoodieRecordLocation getBucket(HoodieKey key, String partitionPath) { + ConsistentHashingNode node = partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields); + if (node.getFileIdPfx() != null && !node.getFileIdPfx().isEmpty()) { + /** + * Dynamic Bucket Index doesn't need the instant time of the latest file group. + * We add suffix 0 here to the file uuid, following the naming convention. Review comment: Can you please explicitly call out convention you're referring to? ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.ConsistentHashingNode; +import org.apache.hudi.common.model.HoodieConsistentHashingMetadata; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.util.hash.HashID; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +public class ConsistentBucketIdentifier extends BucketIdentifier { + + /** + * Hashing metadata of a partition + */ + private final HoodieConsistentHashingMetadata metadata; + /** + * in-memory structure to speed up rang mapping (hashing value -> hashing node) + */ + private TreeMap<Integer, ConsistentHashingNode> ring; + /** + * mapping from fileId -> hashing node + */ + private Map<String, ConsistentHashingNode> fileIdToBucket; Review comment: Please make all fields final ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.index.HoodieIndexUtils; +import org.apache.hudi.table.HoodieTable; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Simple bucket index implementation, with fixed bucket number. + */ +public class HoodieSimpleBucketIndex extends HoodieBucketIndex { + + private static final Logger LOG = LogManager.getLogger(HoodieSimpleBucketIndex.class); + + /** + * partitionPath -> bucketId -> fileInfo + */ + Map<String, Map<Integer, Pair<String, String>>> partitionPathFileIDList; + + public HoodieSimpleBucketIndex(HoodieWriteConfig config) { + super(config); + } + + private Map<Integer, Pair<String, String>> loadPartitionBucketIdFileIdMapping( + HoodieTable hoodieTable, + String partition) { + // bucketId -> fileIds + Map<Integer, Pair<String, String>> fileIDList = new HashMap<>(); Review comment: `fileIdList` doesn't seem to be particularly representative, shall we call it `bucketIdToFileIdMapping`? ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.ConsistentHashingNode; +import org.apache.hudi.common.model.HoodieConsistentHashingMetadata; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.util.hash.HashID; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +public class ConsistentBucketIdentifier extends BucketIdentifier { + + /** + * Hashing metadata of a partition + */ + private final HoodieConsistentHashingMetadata metadata; + /** + * in-memory structure to speed up rang mapping (hashing value -> hashing node) + */ + private TreeMap<Integer, ConsistentHashingNode> ring; + /** + * mapping from fileId -> hashing node Review comment: Let's make comments consistent (should start with capitalized) ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java ########## @@ -37,28 +36,30 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.util.HashMap; -import java.util.Map; +import java.util.List; /** * Hash indexing mechanism. */ -public class HoodieBucketIndex extends HoodieIndex<Object, Object> { +public abstract class HoodieBucketIndex extends HoodieIndex<Object, Object> { - private static final Logger LOG = LogManager.getLogger(HoodieBucketIndex.class); + private static final Logger LOG = LogManager.getLogger(HoodieBucketIndex.class); - private final int numBuckets; + protected int numBuckets; + protected String indexKeyFields; public HoodieBucketIndex(HoodieWriteConfig config) { super(config); - numBuckets = config.getBucketIndexNumBuckets(); - LOG.info("use bucket index, numBuckets=" + numBuckets); + + this.numBuckets = config.getBucketIndexNumBuckets(); + this.indexKeyFields = config.getBucketIndexHashField(); + LOG.info("use bucket index, numBuckets = " + numBuckets + ", indexFields: " + indexKeyFields); Review comment: Please fix log statement to start w/ capital ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.model.ConsistentHashingNode; +import org.apache.hudi.common.model.HoodieConsistentHashingMetadata; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +/** + * Consistent hashing bucket index implementation, with auto-adjust bucket number. + * NOTE: bucket resizing is triggered by clustering. + */ +public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex { + + private static final Logger LOG = LogManager.getLogger(HoodieSparkConsistentBucketIndex.class); + + private Map<String, ConsistentBucketIdentifier> partitionToIdentifier; + + public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) { + super(config); + } + + @Override + public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws HoodieIndexException { + return writeStatuses; + } + + /** + * Do nothing. + * A failed write may create a hashing metadata for a partition. In this case, we still do nothing when rolling back + * the failed write. Because the hashing metadata created by a write must have 00000000000000 timestamp and can be viewed + * as the initialization of a partition rather than as a part of the failed write. + * @param instantTime + * @return + */ + @Override + public boolean rollbackCommit(String instantTime) { + return true; + } + + /** + * Initialize bucket metadata for each partition + * @param table + * @param partitions partitions that need to be initialized + */ + @Override + protected void initialize(HoodieTable table, List<String> partitions) { + partitionToIdentifier = new HashMap(partitions.size() + partitions.size() / 3); + + // TODO maybe parallel + partitions.stream().forEach(p -> { + HoodieConsistentHashingMetadata metadata = loadOrCreateMetadata(table, p); + ConsistentBucketIdentifier identifier = new ConsistentBucketIdentifier(metadata); + partitionToIdentifier.put(p, identifier); + }); + } + + /** + * Get bucket location for given key and partition + * + * @param key + * @param partitionPath + * @return + */ + @Override + protected HoodieRecordLocation getBucket(HoodieKey key, String partitionPath) { + ConsistentHashingNode node = partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields); + if (node.getFileIdPfx() != null && !node.getFileIdPfx().isEmpty()) { + /** + * Dynamic Bucket Index doesn't need the instant time of the latest file group. + * We add suffix 0 here to the file uuid, following the naming convention. + */ + return new HoodieRecordLocation(null, FSUtils.createNewFileId(node.getFileIdPfx(), 0)); + } + + LOG.error("Consistent hashing node has no file group, partition: " + partitionPath + ", meta: " + + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); + throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); + } + + /** + * Load hashing metadata of the given partition, if it is not existed, create a new one (also persist it into storage) + * + * @param table hoodie table + * @param partition table partition + * @return Consistent hashing metadata + */ + public HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable table, String partition) { + int retry = 3; + // TODO maybe use ConsistencyGuard to do the retry thing + // retry to allow concurrent creation of metadata (only one attempt can succeed) + while (retry-- > 0) { Review comment: That indeed seems like a perfect candidate for abstraction w/in `ConsistencyGuard` ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.index.HoodieIndexUtils; +import org.apache.hudi.table.HoodieTable; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Simple bucket index implementation, with fixed bucket number. + */ +public class HoodieSimpleBucketIndex extends HoodieBucketIndex { + + private static final Logger LOG = LogManager.getLogger(HoodieSimpleBucketIndex.class); + + /** + * partitionPath -> bucketId -> fileInfo + */ + Map<String, Map<Integer, Pair<String, String>>> partitionPathFileIDList; + + public HoodieSimpleBucketIndex(HoodieWriteConfig config) { + super(config); + } + + private Map<Integer, Pair<String, String>> loadPartitionBucketIdFileIdMapping( + HoodieTable hoodieTable, + String partition) { + // bucketId -> fileIds + Map<Integer, Pair<String, String>> fileIDList = new HashMap<>(); + hoodieTable.getMetaClient().reloadActiveTimeline(); + HoodieIndexUtils + .getLatestBaseFilesForPartition(partition, hoodieTable) + .forEach(file -> { + String fileId = file.getFileId(); + String commitTime = file.getCommitTime(); + int bucketId = BucketIdentifier.bucketIdFromFileId(fileId); + if (!fileIDList.containsKey(bucketId)) { + fileIDList.put(bucketId, Pair.of(fileId, commitTime)); + } else { + // check if bucket data is valid + throw new HoodieIOException("Find multiple files at partition path=" + + partition + " belongs to the same bucket id = " + bucketId); + } + }); + return fileIDList; + } + + @Override + public boolean canIndexLogFiles() { + return false; + } + + @Override + protected void initialize(HoodieTable table, List<String> partitions) { + partitionPathFileIDList = new HashMap<>(); + partitions.forEach(p -> partitionPathFileIDList.put(p, loadPartitionBucketIdFileIdMapping(table, p))); + } + + @Override + protected HoodieRecordLocation getBucket(HoodieKey key, String partitionPath) { + int bucketId = BucketIdentifier.getBucketId(key, config.getBucketIndexHashField(), numBuckets); + if (partitionPathFileIDList.get(partitionPath).containsKey(bucketId)) { Review comment: Let's do just 2 hash-table lookup instead of 4 ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.index.HoodieIndexUtils; +import org.apache.hudi.table.HoodieTable; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Simple bucket index implementation, with fixed bucket number. + */ +public class HoodieSimpleBucketIndex extends HoodieBucketIndex { + + private static final Logger LOG = LogManager.getLogger(HoodieSimpleBucketIndex.class); + + /** + * partitionPath -> bucketId -> fileInfo + */ + Map<String, Map<Integer, Pair<String, String>>> partitionPathFileIDList; Review comment: Please make this `private final`, ditto everywhere ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.model.ConsistentHashingNode; +import org.apache.hudi.common.model.HoodieConsistentHashingMetadata; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +/** + * Consistent hashing bucket index implementation, with auto-adjust bucket number. + * NOTE: bucket resizing is triggered by clustering. + */ +public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex { + + private static final Logger LOG = LogManager.getLogger(HoodieSparkConsistentBucketIndex.class); + + private Map<String, ConsistentBucketIdentifier> partitionToIdentifier; + + public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) { + super(config); + } + + @Override + public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws HoodieIndexException { + return writeStatuses; + } + + /** + * Do nothing. + * A failed write may create a hashing metadata for a partition. In this case, we still do nothing when rolling back + * the failed write. Because the hashing metadata created by a write must have 00000000000000 timestamp and can be viewed + * as the initialization of a partition rather than as a part of the failed write. + * @param instantTime + * @return + */ + @Override + public boolean rollbackCommit(String instantTime) { + return true; + } + + /** + * Initialize bucket metadata for each partition + * @param table + * @param partitions partitions that need to be initialized + */ + @Override + protected void initialize(HoodieTable table, List<String> partitions) { + partitionToIdentifier = new HashMap(partitions.size() + partitions.size() / 3); + + // TODO maybe parallel + partitions.stream().forEach(p -> { + HoodieConsistentHashingMetadata metadata = loadOrCreateMetadata(table, p); + ConsistentBucketIdentifier identifier = new ConsistentBucketIdentifier(metadata); + partitionToIdentifier.put(p, identifier); + }); + } + + /** + * Get bucket location for given key and partition + * + * @param key + * @param partitionPath + * @return + */ + @Override + protected HoodieRecordLocation getBucket(HoodieKey key, String partitionPath) { + ConsistentHashingNode node = partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields); + if (node.getFileIdPfx() != null && !node.getFileIdPfx().isEmpty()) { + /** + * Dynamic Bucket Index doesn't need the instant time of the latest file group. + * We add suffix 0 here to the file uuid, following the naming convention. + */ + return new HoodieRecordLocation(null, FSUtils.createNewFileId(node.getFileIdPfx(), 0)); + } + + LOG.error("Consistent hashing node has no file group, partition: " + partitionPath + ", meta: " + + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); + throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); + } + + /** + * Load hashing metadata of the given partition, if it is not existed, create a new one (also persist it into storage) + * + * @param table hoodie table + * @param partition table partition + * @return Consistent hashing metadata + */ + public HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable table, String partition) { + int retry = 3; + // TODO maybe use ConsistencyGuard to do the retry thing + // retry to allow concurrent creation of metadata (only one attempt can succeed) + while (retry-- > 0) { + HoodieConsistentHashingMetadata metadata = loadMetadata(table, partition); + if (metadata == null) { + metadata = new HoodieConsistentHashingMetadata(partition, numBuckets); + if (saveMetadata(table, metadata, false)) { + return metadata; + } + } else { + return metadata; + } + } + throw new HoodieIndexException("Failed to load or create metadata, partition: " + partition); + } + + /** + * Load hashing metadata of the given partition, if it is not existed, return null + * + * @param table hoodie table + * @param partition table partition + * @return Consistent hashing metadata or null if it does not exist + */ + public static HoodieConsistentHashingMetadata loadMetadata(HoodieTable table, String partition) { + Path metadataPath = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), partition); + + try { + if (!table.getMetaClient().getFs().exists(metadataPath)) { + return null; + } + FileStatus[] metaFiles = table.getMetaClient().getFs().listStatus(metadataPath); + HoodieTimeline completedCommits = table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(); Review comment: What about compactions? Can the index be updated after compaction? ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.model.ConsistentHashingNode; +import org.apache.hudi.common.model.HoodieConsistentHashingMetadata; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +/** + * Consistent hashing bucket index implementation, with auto-adjust bucket number. + * NOTE: bucket resizing is triggered by clustering. + */ +public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex { + + private static final Logger LOG = LogManager.getLogger(HoodieSparkConsistentBucketIndex.class); + + private Map<String, ConsistentBucketIdentifier> partitionToIdentifier; + + public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) { + super(config); + } + + @Override + public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws HoodieIndexException { + return writeStatuses; + } + + /** + * Do nothing. + * A failed write may create a hashing metadata for a partition. In this case, we still do nothing when rolling back + * the failed write. Because the hashing metadata created by a write must have 00000000000000 timestamp and can be viewed + * as the initialization of a partition rather than as a part of the failed write. + * @param instantTime + * @return + */ + @Override + public boolean rollbackCommit(String instantTime) { + return true; + } + + /** + * Initialize bucket metadata for each partition + * @param table + * @param partitions partitions that need to be initialized + */ + @Override + protected void initialize(HoodieTable table, List<String> partitions) { + partitionToIdentifier = new HashMap(partitions.size() + partitions.size() / 3); + + // TODO maybe parallel + partitions.stream().forEach(p -> { + HoodieConsistentHashingMetadata metadata = loadOrCreateMetadata(table, p); + ConsistentBucketIdentifier identifier = new ConsistentBucketIdentifier(metadata); + partitionToIdentifier.put(p, identifier); + }); + } + + /** + * Get bucket location for given key and partition + * + * @param key + * @param partitionPath + * @return + */ + @Override + protected HoodieRecordLocation getBucket(HoodieKey key, String partitionPath) { + ConsistentHashingNode node = partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields); + if (node.getFileIdPfx() != null && !node.getFileIdPfx().isEmpty()) { + /** + * Dynamic Bucket Index doesn't need the instant time of the latest file group. + * We add suffix 0 here to the file uuid, following the naming convention. + */ + return new HoodieRecordLocation(null, FSUtils.createNewFileId(node.getFileIdPfx(), 0)); + } + + LOG.error("Consistent hashing node has no file group, partition: " + partitionPath + ", meta: " + + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); + throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); + } + + /** + * Load hashing metadata of the given partition, if it is not existed, create a new one (also persist it into storage) + * + * @param table hoodie table + * @param partition table partition + * @return Consistent hashing metadata + */ + public HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable table, String partition) { + int retry = 3; + // TODO maybe use ConsistencyGuard to do the retry thing + // retry to allow concurrent creation of metadata (only one attempt can succeed) + while (retry-- > 0) { + HoodieConsistentHashingMetadata metadata = loadMetadata(table, partition); + if (metadata == null) { + metadata = new HoodieConsistentHashingMetadata(partition, numBuckets); + if (saveMetadata(table, metadata, false)) { + return metadata; + } + } else { + return metadata; + } + } + throw new HoodieIndexException("Failed to load or create metadata, partition: " + partition); + } + + /** + * Load hashing metadata of the given partition, if it is not existed, return null + * + * @param table hoodie table + * @param partition table partition + * @return Consistent hashing metadata or null if it does not exist + */ + public static HoodieConsistentHashingMetadata loadMetadata(HoodieTable table, String partition) { + Path metadataPath = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), partition); + + try { + if (!table.getMetaClient().getFs().exists(metadataPath)) { + return null; + } + FileStatus[] metaFiles = table.getMetaClient().getFs().listStatus(metadataPath); + HoodieTimeline completedCommits = table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + Predicate<FileStatus> metaFilePredicate = fileStatus -> { + String filename = fileStatus.getPath().getName(); + return filename.contains(HoodieConsistentHashingMetadata.HASHING_METADATA_FILE_SUFFIX) + && (completedCommits.containsInstant(HoodieConsistentHashingMetadata.getTimestampFromFile(filename)) + || HoodieConsistentHashingMetadata.getTimestampFromFile(filename).equals(HoodieTimeline.INIT_INSTANT_TS)); + }; + + FileStatus metaFile = Arrays.stream(metaFiles).filter(metaFilePredicate) + .max(Comparator.comparing(a -> a.getPath().getName())).orElse(null); + + if (metaFile != null) { + byte[] content = FileIOUtils.readAsByteArray(table.getMetaClient().getFs().open(metaFile.getPath())); + return HoodieConsistentHashingMetadata.fromBytes(content); + } + + return null; + } catch (IOException e) { + LOG.warn("Error when loading hashing metadata, partition: " + partition + ", error meg: " + e.getMessage()); + throw new HoodieIndexException("Error while loading hashing metadata, " + e.getMessage()); + } + } + + /** + * Save metadata into storage + * @param table + * @param metadata + * @param overwrite + * @return + */ + private static boolean saveMetadata(HoodieTable table, HoodieConsistentHashingMetadata metadata, boolean overwrite) { + FSDataOutputStream fsOut = null; + HoodieWrapperFileSystem fs = table.getMetaClient().getFs(); + Path dir = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), metadata.getPartitionPath()); + Path fullPath = new Path(dir, metadata.getFilename()); + try { + byte[] bytes = metadata.toBytes(); + fsOut = fs.create(fullPath, overwrite); + fsOut.write(bytes); + fsOut.close(); + return true; + } catch (IOException e) { + e.printStackTrace(); Review comment: Please clean this up, instead log as error with exception ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.ConsistentHashingNode; +import org.apache.hudi.common.model.HoodieConsistentHashingMetadata; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.util.hash.HashID; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +public class ConsistentBucketIdentifier extends BucketIdentifier { + + /** + * Hashing metadata of a partition + */ + private final HoodieConsistentHashingMetadata metadata; + /** + * in-memory structure to speed up rang mapping (hashing value -> hashing node) + */ + private TreeMap<Integer, ConsistentHashingNode> ring; Review comment: I actually didn't see us ever re-balancing that ring. Is this going to be added in a follow-up PR? ########## File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieConsistentHashingMetadata.java ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.timeline.HoodieTimeline; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +/** + * All the metadata that is used for consistent hashing bucket index + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class HoodieConsistentHashingMetadata implements Serializable { + + private static final Logger LOG = LogManager.getLogger(HoodieConsistentHashingMetadata.class); + + public static final int MAX_HASH_VALUE = Integer.MAX_VALUE; + public static final String HASHING_METADATA_FILE_SUFFIX = ".hashing_meta"; + + protected short version; + protected String partitionPath; + protected String instant; + protected int numBuckets; + protected int seqNo; + protected List<ConsistentHashingNode> nodes; + + public HoodieConsistentHashingMetadata() { + } + + public HoodieConsistentHashingMetadata(String partitionPath, int numBuckets) { + this((short) 0, partitionPath, HoodieTimeline.INIT_INSTANT_TS, numBuckets); + } + + /** + * Construct default metadata with all bucket's file group uuid initialized + * + * @param partitionPath + * @param numBuckets + */ + private HoodieConsistentHashingMetadata(short version, String partitionPath, String instant, int numBuckets) { + this.version = version; + this.partitionPath = partitionPath; + this.instant = instant; + this.numBuckets = numBuckets; + + nodes = new ArrayList<>(); + long step = ((long) MAX_HASH_VALUE + numBuckets - 1) / numBuckets; + for (int i = 1; i <= numBuckets; ++i) { + nodes.add(new ConsistentHashingNode((int) Math.min(step * i, MAX_HASH_VALUE), FSUtils.createNewFileIdPfx())); + } + } + + public short getVersion() { + return version; + } + + public String getPartitionPath() { + return partitionPath; + } + + public String getInstant() { + return instant; + } + + public int getNumBuckets() { + return numBuckets; + } + + public int getSeqNo() { + return seqNo; + } + + public List<ConsistentHashingNode> getNodes() { + return nodes; + } + + public void setInstant(String instant) { + this.instant = instant; + } + + public void setNodes(List<ConsistentHashingNode> nodes) { + this.nodes = nodes; + this.numBuckets = nodes.size(); + } + + public void setSeqNo(int seqNo) { + this.seqNo = seqNo; + } + + public String getFilename() { + return instant + HASHING_METADATA_FILE_SUFFIX; + } + + public byte[] toBytes() throws IOException { + return toJsonString().getBytes(StandardCharsets.UTF_8); + } + + public static HoodieConsistentHashingMetadata fromBytes(byte[] bytes) throws IOException { + try { + return fromJsonString(new String(bytes, StandardCharsets.UTF_8), HoodieConsistentHashingMetadata.class); + } catch (Exception e) { + throw new IOException("unable to read hashing metadata", e); + } + } + + private String toJsonString() throws IOException { + return getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this); + } + + protected static <T> T fromJsonString(String jsonStr, Class<T> clazz) throws Exception { + if (jsonStr == null || jsonStr.isEmpty()) { + // For empty commit file (no data or somethings bad happen). + return clazz.newInstance(); + } + return getObjectMapper().readValue(jsonStr, clazz); + } + + protected static ObjectMapper getObjectMapper() { + ObjectMapper mapper = new ObjectMapper(); Review comment: Please check my other comment regarding `ObjectMapper` ########## File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieConsistentHashingMetadata.java ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.timeline.HoodieTimeline; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +/** + * All the metadata that is used for consistent hashing bucket index + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class HoodieConsistentHashingMetadata implements Serializable { + + private static final Logger LOG = LogManager.getLogger(HoodieConsistentHashingMetadata.class); + + public static final int MAX_HASH_VALUE = Integer.MAX_VALUE; + public static final String HASHING_METADATA_FILE_SUFFIX = ".hashing_meta"; + + protected short version; + protected String partitionPath; + protected String instant; + protected int numBuckets; + protected int seqNo; + protected List<ConsistentHashingNode> nodes; + + public HoodieConsistentHashingMetadata() { Review comment: As far as i can tell this is only serialized/de by Jackson. Let's make this object immutable (all fields final, no setters) and instead use `@JsonCreator` annotations ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieConsistentBucketLayout.java ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.storage; + +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; + +import java.util.HashSet; +import java.util.Set; + +/** + * Storage layout when using consistent hashing bucket index. + */ +public class HoodieConsistentBucketLayout extends HoodieStorageLayout { + public static final Set<WriteOperationType> SUPPORTED_OPERATIONS = new HashSet<WriteOperationType>() { Review comment: You can use `CollectionUtils.createImmutableSet` for that ########## File path: hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java ########## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.keygen.KeyGenUtils; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +public class TestBucketIdentifier { + + public static final String NESTED_COL_SCHEMA = "{\"type\":\"record\", \"name\":\"nested_col\",\"fields\": [" + + "{\"name\": \"prop1\",\"type\": \"string\"},{\"name\": \"prop2\", \"type\": \"long\"}]}"; + public static final String EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ " + + "{\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"}," + + "{\"name\": \"ts_ms\", \"type\": \"string\"}," + + "{\"name\": \"pii_col\", \"type\": \"string\"}," + + "{\"name\": \"nested_col\",\"type\": " + + NESTED_COL_SCHEMA + "}" + + "]}"; + + @Test + public void testBucketFileId() { + for (int i = 0; i < 1000; i++) { Review comment: What's the point of doing 1000 iterations? ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.model.ConsistentHashingNode; +import org.apache.hudi.common.model.HoodieConsistentHashingMetadata; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +/** + * Consistent hashing bucket index implementation, with auto-adjust bucket number. + * NOTE: bucket resizing is triggered by clustering. + */ +public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex { + + private static final Logger LOG = LogManager.getLogger(HoodieSparkConsistentBucketIndex.class); + + private Map<String, ConsistentBucketIdentifier> partitionToIdentifier; + + public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) { + super(config); + } + + @Override + public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws HoodieIndexException { + return writeStatuses; + } + + /** + * Do nothing. + * A failed write may create a hashing metadata for a partition. In this case, we still do nothing when rolling back + * the failed write. Because the hashing metadata created by a write must have 00000000000000 timestamp and can be viewed + * as the initialization of a partition rather than as a part of the failed write. + * @param instantTime + * @return + */ + @Override + public boolean rollbackCommit(String instantTime) { + return true; + } + + /** + * Initialize bucket metadata for each partition + * @param table + * @param partitions partitions that need to be initialized + */ + @Override + protected void initialize(HoodieTable table, List<String> partitions) { + partitionToIdentifier = new HashMap(partitions.size() + partitions.size() / 3); + + // TODO maybe parallel + partitions.stream().forEach(p -> { + HoodieConsistentHashingMetadata metadata = loadOrCreateMetadata(table, p); + ConsistentBucketIdentifier identifier = new ConsistentBucketIdentifier(metadata); + partitionToIdentifier.put(p, identifier); + }); + } + + /** + * Get bucket location for given key and partition + * + * @param key + * @param partitionPath + * @return + */ + @Override + protected HoodieRecordLocation getBucket(HoodieKey key, String partitionPath) { + ConsistentHashingNode node = partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields); + if (node.getFileIdPfx() != null && !node.getFileIdPfx().isEmpty()) { + /** + * Dynamic Bucket Index doesn't need the instant time of the latest file group. + * We add suffix 0 here to the file uuid, following the naming convention. + */ + return new HoodieRecordLocation(null, FSUtils.createNewFileId(node.getFileIdPfx(), 0)); + } + + LOG.error("Consistent hashing node has no file group, partition: " + partitionPath + ", meta: " + + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); + throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); + } + + /** + * Load hashing metadata of the given partition, if it is not existed, create a new one (also persist it into storage) + * + * @param table hoodie table + * @param partition table partition + * @return Consistent hashing metadata + */ + public HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable table, String partition) { + int retry = 3; + // TODO maybe use ConsistencyGuard to do the retry thing + // retry to allow concurrent creation of metadata (only one attempt can succeed) + while (retry-- > 0) { + HoodieConsistentHashingMetadata metadata = loadMetadata(table, partition); + if (metadata == null) { + metadata = new HoodieConsistentHashingMetadata(partition, numBuckets); + if (saveMetadata(table, metadata, false)) { + return metadata; + } + } else { + return metadata; + } + } + throw new HoodieIndexException("Failed to load or create metadata, partition: " + partition); + } + + /** + * Load hashing metadata of the given partition, if it is not existed, return null + * + * @param table hoodie table + * @param partition table partition + * @return Consistent hashing metadata or null if it does not exist + */ + public static HoodieConsistentHashingMetadata loadMetadata(HoodieTable table, String partition) { + Path metadataPath = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), partition); + + try { + if (!table.getMetaClient().getFs().exists(metadataPath)) { + return null; + } + FileStatus[] metaFiles = table.getMetaClient().getFs().listStatus(metadataPath); + HoodieTimeline completedCommits = table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + Predicate<FileStatus> metaFilePredicate = fileStatus -> { + String filename = fileStatus.getPath().getName(); + return filename.contains(HoodieConsistentHashingMetadata.HASHING_METADATA_FILE_SUFFIX) + && (completedCommits.containsInstant(HoodieConsistentHashingMetadata.getTimestampFromFile(filename)) + || HoodieConsistentHashingMetadata.getTimestampFromFile(filename).equals(HoodieTimeline.INIT_INSTANT_TS)); Review comment: Please extract to a var ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.model.ConsistentHashingNode; +import org.apache.hudi.common.model.HoodieConsistentHashingMetadata; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +/** + * Consistent hashing bucket index implementation, with auto-adjust bucket number. + * NOTE: bucket resizing is triggered by clustering. + */ +public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex { + + private static final Logger LOG = LogManager.getLogger(HoodieSparkConsistentBucketIndex.class); + + private Map<String, ConsistentBucketIdentifier> partitionToIdentifier; + + public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) { + super(config); + } + + @Override + public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws HoodieIndexException { + return writeStatuses; + } + + /** + * Do nothing. + * A failed write may create a hashing metadata for a partition. In this case, we still do nothing when rolling back + * the failed write. Because the hashing metadata created by a write must have 00000000000000 timestamp and can be viewed + * as the initialization of a partition rather than as a part of the failed write. + * @param instantTime + * @return + */ + @Override + public boolean rollbackCommit(String instantTime) { + return true; + } + + /** + * Initialize bucket metadata for each partition + * @param table + * @param partitions partitions that need to be initialized + */ + @Override + protected void initialize(HoodieTable table, List<String> partitions) { + partitionToIdentifier = new HashMap(partitions.size() + partitions.size() / 3); + + // TODO maybe parallel + partitions.stream().forEach(p -> { + HoodieConsistentHashingMetadata metadata = loadOrCreateMetadata(table, p); + ConsistentBucketIdentifier identifier = new ConsistentBucketIdentifier(metadata); + partitionToIdentifier.put(p, identifier); + }); + } + + /** + * Get bucket location for given key and partition + * + * @param key + * @param partitionPath + * @return + */ + @Override + protected HoodieRecordLocation getBucket(HoodieKey key, String partitionPath) { + ConsistentHashingNode node = partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields); + if (node.getFileIdPfx() != null && !node.getFileIdPfx().isEmpty()) { + /** + * Dynamic Bucket Index doesn't need the instant time of the latest file group. + * We add suffix 0 here to the file uuid, following the naming convention. + */ + return new HoodieRecordLocation(null, FSUtils.createNewFileId(node.getFileIdPfx(), 0)); + } + + LOG.error("Consistent hashing node has no file group, partition: " + partitionPath + ", meta: " + + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); + throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); + } + + /** + * Load hashing metadata of the given partition, if it is not existed, create a new one (also persist it into storage) + * + * @param table hoodie table + * @param partition table partition + * @return Consistent hashing metadata + */ + public HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable table, String partition) { + int retry = 3; + // TODO maybe use ConsistencyGuard to do the retry thing + // retry to allow concurrent creation of metadata (only one attempt can succeed) + while (retry-- > 0) { + HoodieConsistentHashingMetadata metadata = loadMetadata(table, partition); + if (metadata == null) { + metadata = new HoodieConsistentHashingMetadata(partition, numBuckets); + if (saveMetadata(table, metadata, false)) { + return metadata; + } + } else { + return metadata; + } + } + throw new HoodieIndexException("Failed to load or create metadata, partition: " + partition); + } + + /** + * Load hashing metadata of the given partition, if it is not existed, return null + * + * @param table hoodie table + * @param partition table partition + * @return Consistent hashing metadata or null if it does not exist + */ + public static HoodieConsistentHashingMetadata loadMetadata(HoodieTable table, String partition) { + Path metadataPath = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), partition); + + try { + if (!table.getMetaClient().getFs().exists(metadataPath)) { + return null; + } + FileStatus[] metaFiles = table.getMetaClient().getFs().listStatus(metadataPath); + HoodieTimeline completedCommits = table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + Predicate<FileStatus> metaFilePredicate = fileStatus -> { + String filename = fileStatus.getPath().getName(); + return filename.contains(HoodieConsistentHashingMetadata.HASHING_METADATA_FILE_SUFFIX) + && (completedCommits.containsInstant(HoodieConsistentHashingMetadata.getTimestampFromFile(filename)) + || HoodieConsistentHashingMetadata.getTimestampFromFile(filename).equals(HoodieTimeline.INIT_INSTANT_TS)); + }; + + FileStatus metaFile = Arrays.stream(metaFiles).filter(metaFilePredicate) + .max(Comparator.comparing(a -> a.getPath().getName())).orElse(null); + + if (metaFile != null) { Review comment: General rule of thumb is to return as early as possible so that it's easier to comprehend control-flow while going top-to-bottom w/o the need to rewind back to understand it. As such. please invert the conditional ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.index.HoodieIndexUtils; +import org.apache.hudi.table.HoodieTable; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Simple bucket index implementation, with fixed bucket number. + */ +public class HoodieSimpleBucketIndex extends HoodieBucketIndex { + + private static final Logger LOG = LogManager.getLogger(HoodieSimpleBucketIndex.class); + + /** + * partitionPath -> bucketId -> fileInfo + */ + Map<String, Map<Integer, Pair<String, String>>> partitionPathFileIDList; Review comment: Why storing `Pair<String, String>` instead of `HoodieRecordLocation`? ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.model.ConsistentHashingNode; +import org.apache.hudi.common.model.HoodieConsistentHashingMetadata; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +/** + * Consistent hashing bucket index implementation, with auto-adjust bucket number. + * NOTE: bucket resizing is triggered by clustering. + */ +public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex { + + private static final Logger LOG = LogManager.getLogger(HoodieSparkConsistentBucketIndex.class); + + private Map<String, ConsistentBucketIdentifier> partitionToIdentifier; + + public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) { + super(config); + } + + @Override + public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws HoodieIndexException { + return writeStatuses; + } + + /** + * Do nothing. + * A failed write may create a hashing metadata for a partition. In this case, we still do nothing when rolling back + * the failed write. Because the hashing metadata created by a write must have 00000000000000 timestamp and can be viewed + * as the initialization of a partition rather than as a part of the failed write. + * @param instantTime + * @return + */ + @Override + public boolean rollbackCommit(String instantTime) { + return true; + } + + /** + * Initialize bucket metadata for each partition + * @param table + * @param partitions partitions that need to be initialized + */ + @Override + protected void initialize(HoodieTable table, List<String> partitions) { + partitionToIdentifier = new HashMap(partitions.size() + partitions.size() / 3); Review comment: Why overallocating? ########## File path: hudi-common/src/main/java/org/apache/hudi/common/model/ConsistentHashingNode.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * Used in consistent hashing index, representing nodes in the consistent hash ring. + * Record the end hash range value and its corresponding file group id. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class ConsistentHashingNode implements Serializable { + + private int value; + private String fileIdPfx; + + public ConsistentHashingNode() { + } + + public ConsistentHashingNode(int value, String fileIdPfx) { + this.value = value; + this.fileIdPfx = fileIdPfx; + } + + public static String toJsonString(List<ConsistentHashingNode> nodes) throws IOException { + return getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(nodes); + } + + public static List<ConsistentHashingNode> fromJsonString(String json) throws Exception { + if (json == null || json.isEmpty()) { + return Collections.emptyList(); + } + + ConsistentHashingNode[] nodes = getObjectMapper().readValue(json, ConsistentHashingNode[].class); + return Arrays.asList(nodes); + } + + protected static ObjectMapper getObjectMapper() { + ObjectMapper mapper = new ObjectMapper(); Review comment: This isn't a cheap object to throw around and churning it like that chips away its ability to cache encoders. Let's instead init this as a static object. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
