nsivabalan commented on a change in pull request #3173:
URL: https://github.com/apache/hudi/pull/3173#discussion_r743963943
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
##########
@@ -200,6 +202,25 @@
.defaultValue("true")
.withDocumentation("Similar to " +
BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE + ", but for simple index.");
+ // ***** Bucket Index Configs *****
+ public static final ConfigProperty<Integer> BUCKET_INDEX_NUM_BUCKETS =
ConfigProperty
+ .key("hoodie.bucket.index.num.buckets")
+ .defaultValue(256)
+ .withDocumentation("Only applies if index type is BUCKET_INDEX.
Determine the bucket num of the hudi table, "
+ + "and each partition is divided to N buckets.");
+
+ public static final ConfigProperty<String> BUCKET_INDEX_HASH_FIELD =
ConfigProperty
+ .key("hoodie.bucket.index.hash.field")
+ .noDefaultValue()
Review comment:
should we set `_hoodie_record_key` as the default value? any thoughts.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -76,6 +77,11 @@ public BaseCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig c
// TODO : Remove this once we refactor and move out autoCommit method from
here, since the TxnManager is held in {@link AbstractHoodieWriteClient}.
this.txnManager = new TransactionManager(config,
table.getMetaClient().getFs());
this.lastCompletedTxn =
TransactionUtils.getLastCompletedTxnInstantAndMetadata(table.getMetaClient());
+ // TODO: HUDI-2155 bulk insert support bucket index, HUDI-2156 cluster the
table with bucket index.
+ if (this.config.getIndexType() == HoodieIndex.IndexType.BUCKET_INDEX
+ && (operationType == WriteOperationType.BULK_INSERT || operationType
== WriteOperationType.CLUSTER)) {
Review comment:
https://github.com/apache/hudi/pull/3330
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/utils/BucketUtils.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utils;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.hudi.common.fs.FSUtils;
+
+public class BucketUtils {
+ // compatible with the spark bucket name
+ private static final Pattern BUCKET_NAME =
Pattern.compile(".*_(\\d+)(?:\\..*)?$");
+
+ public static int mod(int x, int y) {
+ int r = x % y;
+ if (r < 0) {
+ return (r + y) % y;
+ } else {
+ return r;
+ }
+ }
+
+ public static int bucketId(String key, int numBuckets) {
+ return bucketId(Collections.singletonList(key), numBuckets);
+ }
+
+ public static int bucketId(List<Object> values, int numBuckets) {
+ int hash = 0;
+ for (Object value : values) {
+ hash = 31 * hash;
+ if (value == null) {
+ hash += 0;
+ } else if (value instanceof Boolean) {
+ hash += HiveHasher.hashInt(value.equals(Boolean.TRUE) ? 1 : 0);
+ } else if (value instanceof Integer) {
+ hash += HiveHasher.hashInt((Integer) value);
+ } else if (value instanceof Long) {
+ hash += HiveHasher.hashLong((Long) value);
+ } else if (value instanceof Float) {
+ hash += HiveHasher.hashInt(Float.floatToIntBits((Float) value));
+ } else if (value instanceof Double) {
+ hash += HiveHasher.hashLong(Double.doubleToLongBits((Double) value));
+ } else if (value instanceof String) {
+ byte[] a = value.toString().getBytes();
+ hash += HiveHasher.hashUnsafeBytes(a,
HiveHasher.Platform.BYTE_ARRAY_OFFSET, a.length);
+ } else {
+ throw new RuntimeException("Unsupported type " +
value.getClass().getName());
+ }
+ }
+ return mod(hash & Integer.MAX_VALUE, numBuckets);
+ }
+
+ public static int bucketIdFromFileId(String fileId) {
+ return Integer.parseInt(fileId.substring(0, 8));
+ }
+
+ public static String bucketIdStr(int n) {
+ return String.format("%08d", n);
+ }
+
+ public static String bucketIdStr(int n, int m) {
+ return bucketIdStr(mod(n, m));
+ }
+
+ public static String newBucketFileIdPrefix(String bucketId) {
+ return FSUtils.createNewFileIdPfx().replaceFirst(".{8}", bucketId);
Review comment:
just so I understand correctly. so, same bucket Id across different
partition could have different fileID since we are prefixing this random value.
is my understanding right.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
##########
@@ -36,6 +38,9 @@ public ComplexAvroKeyGenerator(TypedProperties props) {
.split(",")).map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toList());
this.partitionPathFields =
Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
.split(",")).map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toList());
+ this.indexKeyFields = props.getStringList(
Review comment:
Did you think of introducing an interface similar to KeyGenerator
(BucketIdentifier or something) which takes in a HoodieRecord and returns an
object representing the bucket the HoodieRecord belongs to. And by this, we
don't need to touch KeyGenerator. and only BucketIndex will use this
BucketIdentifier.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import scala.Tuple2;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.index.bucket.SparkBucketIndex;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.WorkloadStat;
+import org.apache.hudi.utils.BucketUtils;
+
+/**
+ * TODO: pruning empty task partitions.
+ *
+ * @param <T>
+ */
+public class SparkBucketIndexPartitioner<T extends HoodieRecordPayload<T>>
extends
+ SparkHoodiePartitioner<T> {
+
+ private final int numBuckets;
+ private final String hashFunction;
+ private final int totalPartitionPaths;
+ private final List<String> partitionPaths;
+ private final Map<String, Integer> partitionPathOffset;
+ private Map<String, Set<String>> updatePartitionPathFileIds;
+
+ public SparkBucketIndexPartitioner(WorkloadProfile profile,
+ HoodieEngineContext context,
+ HoodieTable table,
+ HoodieWriteConfig config) {
+ super(profile, table);
+ if (!(table.getIndex() instanceof SparkBucketIndex)) {
+ throw new HoodieException(
+ " Bucket index partitioner should only be used by BucketIndex other
than "
+ + table.getIndex().getClass().getSimpleName());
+ }
+ this.numBuckets = ((SparkBucketIndex<T>) table.getIndex()).getNumBuckets();
+ this.hashFunction = config.getBucketIndexHashFunction();
+ this.totalPartitionPaths = profile.getPartitionPaths().size();
+ partitionPaths = new ArrayList<>(profile.getPartitionPaths());
+ partitionPathOffset = new HashMap<>();
+ int i = 0;
+ for (Object partitionPath : profile.getPartitionPaths()) {
+ partitionPathOffset.put(partitionPath.toString(), i);
+ i += numBuckets;
+ }
+ assignUpdates(profile);
+ }
+
+ private void assignUpdates(WorkloadProfile profile) {
+ updatePartitionPathFileIds = new HashMap<>();
+ // each update location gets a partition
+ Set<Entry<String, WorkloadStat>> partitionStatEntries =
profile.getPartitionPathStatMap()
+ .entrySet();
+ for (Entry<String, WorkloadStat> partitionStat : partitionStatEntries) {
+ if (!updatePartitionPathFileIds.containsKey(partitionStat.getKey())) {
+ updatePartitionPathFileIds.put(partitionStat.getKey(), new
HashSet<>());
+ }
+ for (Entry<String, Pair<String, Long>> updateLocEntry :
+ partitionStat.getValue().getUpdateLocationToCount().entrySet()) {
+
updatePartitionPathFileIds.get(partitionStat.getKey()).add(updateLocEntry.getKey());
+ }
+ }
+ }
+
+ @Override
+ public BucketInfo getBucketInfo(int bucketNumber) {
+ String partitionPath = partitionPaths.get(bucketNumber / numBuckets);
+ String bucketId = BucketUtils.bucketIdStr(bucketNumber % numBuckets);
+ Option<String> fileIdOption =
Option.fromJavaOptional(updatePartitionPathFileIds
+ .getOrDefault(partitionPath, Collections.emptySet()).stream()
+ .filter(e -> e.startsWith(bucketId))
+ .findFirst());
+ if (fileIdOption.isPresent()) {
+ return new BucketInfo(BucketType.UPDATE, fileIdOption.get(),
partitionPath);
+ } else {
+ return new BucketInfo(BucketType.INSERT,
BucketUtils.newBucketFileIdPrefix(bucketId), partitionPath);
+ }
+ }
+
+ @Override
+ public int numPartitions() {
+ return totalPartitionPaths * numBuckets;
+ }
+
+ @Override
+ public int getPartition(Object key) {
+ Tuple2<HoodieKey, Option<HoodieRecordLocation>> keyLocation =
+ (Tuple2<HoodieKey, Option<HoodieRecordLocation>>) key;
+ String partitionPath = keyLocation._1.getPartitionPath();
Review comment:
can't we get the fileId from HoodieRecordLocation and get bucketId from
it.
that seems more logical to me, rather than generating bucketId from
HoodieKey's index location. If location does not exist, I can understand, but
otherwise.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/util/serializer/HoodieKeyV1Serializer.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.serializer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import
com.esotericsoftware.kryo.serializers.DefaultSerializers.StringSerializer;
+
+import org.apache.hudi.common.model.HoodieKey;
+
+/**
+ * Serializer for HoodieKey V1.
+ */
+public class HoodieKeyV1Serializer extends Serializer {
+
+ @Override
+ public void write(Kryo kryo, Output output, Object object) {
+ throw new UnsupportedOperationException("Latest serializer should be used
to serialize HoodieKey.");
+ }
+
+ @Override
+ public Object read(Kryo kryo, Input input, Class type) {
Review comment:
can you please help me understand, why do we need this.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -58,32 +64,50 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}
override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {
val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
- mergeOnReadPartition.split match {
- case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
- read(dataFileOnlySplit.dataFile.get, requiredSchemaFileReader)
- case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
- logFileIterator(logFileOnlySplit, getConfig)
- case skipMergeSplit if skipMergeSplit.mergeType
- .equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
- skipMergeFileIterator(
- skipMergeSplit,
- read(skipMergeSplit.dataFile.get, requiredSchemaFileReader),
- getConfig
- )
- case payloadCombineSplit if payloadCombineSplit.mergeType
- .equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
- payloadCombineFileIterator(
- payloadCombineSplit,
- read(payloadCombineSplit.dataFile.get, fullSchemaFileReader),
- getConfig
- )
- case _ => throw new HoodieException(s"Unable to select an Iterator to
read the Hoodie MOR File Split for " +
- s"file path: ${mergeOnReadPartition.split.dataFile.get.filePath}" +
- s"log paths: ${mergeOnReadPartition.split.logPaths.toString}" +
- s"hoodie table path: ${mergeOnReadPartition.split.tablePath}" +
- s"spark partition Index: ${mergeOnReadPartition.index}" +
- s"merge type: ${mergeOnReadPartition.split.mergeType}")
- }
+ mergeOnReadPartition.split.iterator.flatMap { sp =>
Review comment:
I recommend to put up a separate patch for this fix. May be good to get
it reviewed separately as its not tightly coupled with bucket index as such.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
##########
@@ -146,7 +154,7 @@ class MergeOnReadSnapshotRelation(val sqlContext:
SQLContext,
rdd.asInstanceOf[RDD[Row]]
}
- def buildFileIndex(filters: Array[Filter]): List[HoodieMergeOnReadFileSplit]
= {
+ def buildFileIndex(filters: Array[Filter]):
List[List[HoodieMergeOnReadFileSplit]] = {
Review comment:
may I know why this change?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
##########
@@ -78,15 +83,29 @@
return (T) SERIALIZER_REF.get().deserialize(objectData);
}
+ public static <T> T deserialize(final byte[] objectData, int version) {
+ if (objectData == null) {
+ throw new IllegalArgumentException("The byte[] must not be null");
+ }
+ if (version == HoodieLogBlock.version) {
Review comment:
should we fix the previous method to call into this.
```
public static <T> T deserialize(final byte[] objectData) {
deserialize(objectData, HoodieLogBlock.version);
}
```
##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java
##########
@@ -31,13 +35,16 @@
private String recordKey;
private String partitionPath;
+ private List<Object> indexKey;
- public HoodieKey() {
+ public HoodieKey(String recordKey, String partitionPath) {
+ this(recordKey, partitionPath, Collections.emptyList());
}
- public HoodieKey(String recordKey, String partitionPath) {
+ public HoodieKey(String recordKey, String partitionPath, List<Object>
indexKey) {
Review comment:
HoodieRecord is uniquely identified by pair of partition path and reocrd
key and hence we have HoodieKey as a pair of these two. I somehow feel,
indexKey is forcibly being fit into HoodieKey, since indexKey is something
required only for BucketIndex and not universal across all indices. Would love
to hear your thoughts on why we have to place index key as part of HoodieKey.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/AbstractSparkDeltaCommitActionExecutor.java
##########
@@ -74,23 +74,25 @@ public Partitioner getUpsertPartitioner(WorkloadProfile
profile) {
public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String
fileId,
Iterator<HoodieRecord<T>> recordItr) throws IOException {
LOG.info("Merging updates for commit " + instantTime + " for file " +
fileId);
-
- if (!table.getIndex().canIndexLogFiles() &&
mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
+ if (!table.getIndex().canIndexLogFiles() && mergeOnReadUpsertPartitioner
!= null
+ && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
LOG.info("Small file corrections for updates for commit " + instantTime
+ " for file " + fileId);
return super.handleUpdate(partitionPath, fileId, recordItr);
- } else {
+ } else if (recordItr.hasNext()) {
Review comment:
can you help me understand why we need this change?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
##########
@@ -200,6 +202,25 @@
.defaultValue("true")
.withDocumentation("Similar to " +
BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE + ", but for simple index.");
+ // ***** Bucket Index Configs *****
+ public static final ConfigProperty<Integer> BUCKET_INDEX_NUM_BUCKETS =
ConfigProperty
+ .key("hoodie.bucket.index.num.buckets")
+ .defaultValue(256)
+ .withDocumentation("Only applies if index type is BUCKET_INDEX.
Determine the bucket num of the hudi table, "
+ + "and each partition is divided to N buckets.");
+
+ public static final ConfigProperty<String> BUCKET_INDEX_HASH_FIELD =
ConfigProperty
+ .key("hoodie.bucket.index.hash.field")
+ .noDefaultValue()
+ .withAlternatives("hoodie.datasource.write.bucket_index_hash.field")
+ .withDocumentation("Index key. It is used to index the record and find
its file group");
+
+ public static final ConfigProperty<String> BUCKET_INDEX_HASH_FUNCTION =
ConfigProperty
+ .key("hoodie.bucket.index.hash.function")
Review comment:
not required in this patch. but as a follow up. do we need to add an
interface for the hash function.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkHoodiePartitioner.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.spark.Partitioner;
+
+public abstract class SparkHoodiePartitioner<T extends HoodieRecordPayload<T>>
extends Partitioner
Review comment:
java docs please
##########
File path:
hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
##########
@@ -71,11 +75,24 @@ public final HoodieKey getKey(GenericRecord record) {
}).collect(Collectors.toList());
}
+ public List<String> getIndexKeyFields() {
+ return indexKeyFields;
+ }
+
public List<String> getRecordKeyFields() {
return recordKeyFields;
}
public List<String> getPartitionPathFields() {
return partitionPathFields;
}
+
+ /**
+ * If index key exists, it should be the subset of the record key list.
+ */
+ protected void validateIndexKeyField() {
Review comment:
may I know why do we have this constraint? Can't users configure their
own index key fields which does not overlap with record key fields?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/SparkBucketIndex.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.action.commit.Partitioner;
+import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.utils.BucketUtils;
+import org.apache.spark.api.java.JavaRDD;
+
+public class SparkBucketIndex<T extends HoodieRecordPayload>
+ extends HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> {
+
+ private static final Logger LOG =
LogManager.getLogger(SparkBucketIndex.class);
+
+ private final int numBuckets;
+
+ public SparkBucketIndex(HoodieWriteConfig config) {
+ super(config);
+ numBuckets = config.getBucketIndexNumBuckets();
+ LOG.info("use bucket index, numBuckets=" + numBuckets);
+ }
+
+ @Override
+ public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus>
writeStatuses,
+ HoodieEngineContext context,
+ HoodieTable hoodieTable)
+ throws HoodieIndexException {
+ return writeStatuses;
+ }
+
+ @Override
+ public HoodieData<HoodieRecord<T>> tagLocation(HoodieData<HoodieRecord<T>>
records,
+ HoodieEngineContext context,
+ HoodieTable hoodieTable)
+ throws HoodieIndexException {
+ // partitionPath -> bucketId -> fileInfo
+ Map<String, Map<Integer, Pair<String, String>>> partitionPathFileIDList =
new HashMap<>();
+ HoodieData<HoodieRecord<T>> taggedRecords = records.map(record -> {
+ int bucketId = BucketUtils.bucketId(record.getKey().getIndexKey(),
numBuckets,
+ config.getBucketIndexHashFunction());
+ String partitionPath = record.getPartitionPath();
+ if (!partitionPathFileIDList.containsKey(partitionPath)) {
+ partitionPathFileIDList.put(partitionPath,
loadPartitionBucketIdFileIdMapping(hoodieTable, partitionPath));
+ }
+ if (partitionPathFileIDList.get(partitionPath).containsKey(bucketId)) {
Review comment:
I see we are doing map for each record. and so partitionPathFileIDList
map will be shard among all executors.
but, wondering if you thought about mapPartittions
```
map record -> Pair (partition, record key)
mapPartitions ( { // we can introduce parallelism here such that we can try
to set it to numPartitions.
here we could process records pertaining to the partitions assigned to the
spark partition.
})
```
And we could leverage lazy iterative approach within mapPartitions as we in
HoodieBloomIndexCheckFunction today.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/AbstractSparkDeltaCommitActionExecutor.java
##########
@@ -74,23 +74,25 @@ public Partitioner getUpsertPartitioner(WorkloadProfile
profile) {
public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String
fileId,
Iterator<HoodieRecord<T>> recordItr) throws IOException {
LOG.info("Merging updates for commit " + instantTime + " for file " +
fileId);
-
- if (!table.getIndex().canIndexLogFiles() &&
mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
+ if (!table.getIndex().canIndexLogFiles() && mergeOnReadUpsertPartitioner
!= null
+ && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
LOG.info("Small file corrections for updates for commit " + instantTime
+ " for file " + fileId);
return super.handleUpdate(partitionPath, fileId, recordItr);
- } else {
+ } else if (recordItr.hasNext()) {
HoodieAppendHandle<?,?,?,?> appendHandle = new
HoodieAppendHandle<>(config, instantTime, table,
partitionPath, fileId, recordItr, taskContextSupplier);
appendHandle.doAppend();
return Collections.singletonList(appendHandle.close()).iterator();
+ } else {
+ return super.handleUpdate(partitionPath, fileId, recordItr);
}
}
@Override
public Iterator<List<WriteStatus>> handleInsert(String idPfx,
Iterator<HoodieRecord<T>> recordItr)
throws Exception {
// If canIndexLogFiles, write inserts to log files else write inserts to
base files
- if (table.getIndex().canIndexLogFiles()) {
+ if (table.getIndex().canIndexLogFiles() && recordItr.hasNext()) {
Review comment:
same here.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
##########
@@ -146,7 +154,7 @@ class MergeOnReadSnapshotRelation(val sqlContext:
SQLContext,
rdd.asInstanceOf[RDD[Row]]
}
- def buildFileIndex(filters: Array[Filter]): List[HoodieMergeOnReadFileSplit]
= {
+ def buildFileIndex(filters: Array[Filter]):
List[List[HoodieMergeOnReadFileSplit]] = {
Review comment:
@vinothchandar @bvaradar : just wanted to bring to your notice on
changes in here. I am not very conversant w/ this code path. Would appreciate
if you can review or can loop in someone who can review changes in
MergeOnReadSnapshotRelation
HoodieMergeOnReadRDD
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]