vinothchandar commented on a change in pull request #3173:
URL: https://github.com/apache/hudi/pull/3173#discussion_r702157524
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
##########
@@ -36,6 +38,9 @@ public ComplexAvroKeyGenerator(TypedProperties props) {
.split(",")).map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toList());
this.partitionPathFields =
Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
.split(",")).map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toList());
+ this.indexKeyFields = props.getStringList(
Review comment:
Can we keep the key generator unchanged? Extract the index field during
actual
indexing? I find it odd that key generator has to know about index field. I
guess you
want to do this for efficiency?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
##########
@@ -36,6 +38,9 @@ public ComplexAvroKeyGenerator(TypedProperties props) {
.split(",")).map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toList());
this.partitionPathFields =
Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
.split(",")).map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toList());
+ this.indexKeyFields = props.getStringList(
Review comment:
Also why would this be different than the record key fields? We bucket
by those right, only then Can we make sure Updates to same key k goes into the
same bucket?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/utils/HiveHasher.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utils;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import sun.misc.Unsafe;
+
+public class HiveHasher {
Review comment:
This will be Something we will have to now maintain for a long time. I
Would prefer we used Some simpler performant hashing aIgorithm that came with
the Jvm
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
##########
@@ -135,6 +136,20 @@ public static String getRecordPartitionPath(GenericRecord
record, List<String> p
return partitionPath.toString();
}
+ /**
+ * Index key is nullable.
+ */
+ public static List<Object> getIndexKey(GenericRecord record, List<String>
indexKeyFields) {
+ if (indexKeyFields == null || indexKeyFields.size() == 0) {
+ return null;
Review comment:
Let's please not return nulls. Option?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -76,6 +77,11 @@ public BaseCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig c
// TODO : Remove this once we refactor and move out autoCommit method from
here, since the TxnManager is held in {@link AbstractHoodieWriteClient}.
this.txnManager = new TransactionManager(config,
table.getMetaClient().getFs());
this.lastCompletedTxn =
TransactionUtils.getLastCompletedTxnInstantAndMetadata(table.getMetaClient());
+ // TODO: HUDI-2155 bulk insert support bucket index, HUDI-2156 cluster the
table with bucket index.
Review comment:
Can we tackle it in the same PR? bulk. insert is pretty foundational
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/SparkBucketIndex.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.action.commit.Partitioner;
+import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Triple;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.index.SparkHoodieIndex;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.utils.BucketUtils;
+import org.apache.spark.api.java.JavaRDD;
+
+public class SparkBucketIndex<T extends HoodieRecordPayload> extends
SparkHoodieIndex<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkBucketIndex.class);
+
+ private final int numBuckets;
+
+ public SparkBucketIndex(HoodieWriteConfig config) {
+ super(config);
+ String indexNumBucket = config.getProps()
+ .getProperty(HoodieIndexConfig.BUCKET_INDEX_BUCKET_NUM.key());
+ if (indexNumBucket == null) {
+ throw new IllegalArgumentException(
+ "Please set hoodie.index.hive.bucket.num or
hoodie.table.numbuckets(deprecated) to a positive integer.");
+ }
+ this.numBuckets = Integer.parseInt(indexNumBucket);
+ LOG.info("use bucket index, numBuckets={}", numBuckets);
+ }
+
+ @Override
+ public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus>
writeStatusRDD,
+ HoodieEngineContext context,
+ HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> hoodieTable)
+ throws HoodieIndexException {
+ return writeStatusRDD;
+ }
+
+ @Override
+ public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> records,
+ HoodieEngineContext context,
+ HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> hoodieTable)
+ throws HoodieIndexException {
+ List<String> partitions = records.map(HoodieRecord::getPartitionPath)
Review comment:
I think our goal here has to be to do this in a Streaming fashion
Without any sort
off caching. Even as it stands now, we will double
read from source due to spark recompute since we are not Caching any input
RDD
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/SparkBucketIndex.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.action.commit.Partitioner;
+import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
+import org.jetbrains.annotations.NotNull;
Review comment:
jetbrains? can weavoid this
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/SparkBucketIndex.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.action.commit.Partitioner;
+import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
Review comment:
log4j?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -76,6 +77,11 @@ public BaseCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig c
// TODO : Remove this once we refactor and move out autoCommit method from
here, since the TxnManager is held in {@link AbstractHoodieWriteClient}.
this.txnManager = new TransactionManager(config,
table.getMetaClient().getFs());
this.lastCompletedTxn =
TransactionUtils.getLastCompletedTxnInstantAndMetadata(table.getMetaClient());
+ // TODO: HUDI-2155 bulk insert support bucket index, HUDI-2156 cluster the
table with bucket index.
+ if (this.config.getIndexType() == HoodieIndex.IndexType.BUCKET_INDEX
+ && (operationType == WriteOperationType.BULK_INSERT || operationType
== WriteOperationType.CLUSTER)) {
Review comment:
We might add new write operations and this check would let them through.
e-g- Optimize introduced by
z-order PR
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/utils/HiveHasher.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utils;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import sun.misc.Unsafe;
+
+public class HiveHasher {
Review comment:
Why do we specifically need the hashused by hive?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -76,6 +77,11 @@ public BaseCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig c
// TODO : Remove this once we refactor and move out autoCommit method from
here, since the TxnManager is held in {@link AbstractHoodieWriteClient}.
this.txnManager = new TransactionManager(config,
table.getMetaClient().getFs());
this.lastCompletedTxn =
TransactionUtils.getLastCompletedTxnInstantAndMetadata(table.getMetaClient());
+ // TODO: HUDI-2155 bulk insert support bucket index, HUDI-2156 cluster the
table with bucket index.
+ if (this.config.getIndexType() == HoodieIndex.IndexType.BUCKET_INDEX
+ && (operationType == WriteOperationType.BULK_INSERT || operationType
== WriteOperationType.CLUSTER)) {
Review comment:
Can we rewrite this based on what is actually Supported On bucket index
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]