bvaradar commented on code in PR #7834:
URL: https://github.com/apache/hudi/pull/7834#discussion_r1149970489
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java:
##########
@@ -38,9 +38,12 @@ public static BulkInsertPartitioner get(HoodieTable table,
public static BulkInsertPartitioner get(HoodieTable table,
HoodieWriteConfig config,
boolean enforceNumOutputPartitions) {
- if (config.getIndexType().equals(HoodieIndex.IndexType.BUCKET)
- &&
config.getBucketIndexEngineType().equals(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING))
{
- return new RDDConsistentBucketPartitioner(table);
+ if (config.getIndexType().equals(HoodieIndex.IndexType.BUCKET)) {
+ if
(config.getBucketIndexEngineType().equals(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING))
{
+ return new RDDConsistentBucketPartitioner(table);
+ } else if
(config.getBucketIndexEngineType().equals(HoodieIndex.BucketIndexEngineType.SIMPLE))
{
+ return new RDDSimpleBucketPartitioner(table);
+ }
Review Comment:
In the else case, throw exception ?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java:
##########
@@ -44,7 +44,7 @@ public HoodieSimpleBucketIndex(HoodieWriteConfig config) {
super(config);
}
- private Map<Integer, HoodieRecordLocation>
loadPartitionBucketIdFileIdMapping(
+ public Map<Integer, HoodieRecordLocation> loadPartitionBucketIdFileIdMapping(
Review Comment:
Minor: Rename method to loadBucketIdToFileIdMappingForPartition
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BucketIndexPartitioner.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
+import org.apache.hudi.io.AppendHandleFactory;
+import org.apache.hudi.io.SingleFileHandleCreateFactory;
+import org.apache.hudi.io.WriteHandleFactory;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Abstract of bucket index bulk_insert partitioner
+ */
+public abstract class BucketIndexPartitioner<T> implements
BulkInsertPartitioner<T> {
Review Comment:
can you rename to BucketIndexBulkInsertPartitioner. Also, similar name
change to derived classes.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketPartitioner.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.index.bucket.HoodieSimpleBucketIndex;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class RDDSimpleBucketPartitioner<T extends HoodieRecordPayload> extends
RDDBucketIndexPartitioner<T> {
+
+ public RDDSimpleBucketPartitioner(HoodieTable table) {
+ super(table, null, false);
+ ValidationUtils.checkArgument(table.getIndex() instanceof
HoodieSimpleBucketIndex);
+ }
+
+ @Override
+ public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>>
records, int outputPartitions) {
+ HoodieSimpleBucketIndex index = (HoodieSimpleBucketIndex) table.getIndex();
+ HashMap<String, Integer> fileIdToIdx = new HashMap<>();
Review Comment:
minor: rename to fileIdToBucketIndex
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketPartitioner.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.index.bucket.HoodieSimpleBucketIndex;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class RDDSimpleBucketPartitioner<T extends HoodieRecordPayload> extends
RDDBucketIndexPartitioner<T> {
Review Comment:
similar class name change to include RDDSimpleBucketBulkInsertPartitioner.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestRDDSimpleBucketPartitioner.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestRDDSimpleBucketPartitioner extends HoodieClientTestHarness {
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ initPath();
+ initSparkContexts("TestRDDSimpleBucketPartitioner");
+ initFileSystem();
+ initTimelineService();
+ }
+
+ @AfterEach
+ public void tearDown() throws IOException {
+ cleanupResources();
+ }
+
+ @ParameterizedTest
+ @MethodSource("configParams")
+ public void testSimpleBucketPartitioner(HoodieTableType type, boolean
partitionSort) throws IOException {
+ HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath,
type);
+ int bucketNum = 2;
+ HoodieWriteConfig config = HoodieWriteConfig
+ .newBuilder()
+ .withPath(basePath)
+ .withSchema(TRIP_EXAMPLE_SCHEMA)
+ .build();
+ config.setValue(HoodieIndexConfig.INDEX_TYPE,
HoodieIndex.IndexType.BUCKET.name());
+ config.setValue(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE,
HoodieIndex.BucketIndexEngineType.SIMPLE.name());
+ config.setValue(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD, "_row_key");
+ config.setValue(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS, "" +
bucketNum);
+ if (partitionSort) {
+ config.setValue(HoodieWriteConfig.BULK_INSERT_SORT_MODE,
BulkInsertSortMode.PARTITION_SORT.name());
+ }
+
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ List<HoodieRecord> records = dataGenerator.generateInserts("0", 100);
+ HoodieJavaRDD<HoodieRecord> javaRDD = HoodieJavaRDD.of(records, context,
1);
+ javaRDD.map(HoodieRecord::getPartitionPath).count();
Review Comment:
this line can be removed.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketPartitioner.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.index.bucket.HoodieSimpleBucketIndex;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class RDDSimpleBucketPartitioner<T extends HoodieRecordPayload> extends
RDDBucketIndexPartitioner<T> {
+
+ public RDDSimpleBucketPartitioner(HoodieTable table) {
+ super(table, null, false);
+ ValidationUtils.checkArgument(table.getIndex() instanceof
HoodieSimpleBucketIndex);
+ }
+
+ @Override
+ public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>>
records, int outputPartitions) {
+ HoodieSimpleBucketIndex index = (HoodieSimpleBucketIndex) table.getIndex();
+ HashMap<String, Integer> fileIdToIdx = new HashMap<>();
+
+ // Map <partition, <bucketNo, fileID>>
+ Map<String, HashMap<Integer, String>> partitionMapper =
getPartitionMapper(records, fileIdToIdx);
+
+ return doPartition(records, new Partitioner() {
+ @Override
+ public int numPartitions() {
+ return index.getNumBuckets() * partitionMapper.size();
+ }
+
+ @Override
+ public int getPartition(Object key) {
+ HoodieKey hoodieKey = (HoodieKey) key;
+ String partitionPath = hoodieKey.getPartitionPath();
+ int bucketID = index.getBucketID(hoodieKey);
+ String fileID = partitionMapper.get(partitionPath).get(bucketID);
+ return fileIdToIdx.get(fileID);
+ }
+ });
+ }
+
+ Map<String, HashMap<Integer, String>>
getPartitionMapper(JavaRDD<HoodieRecord<T>> records,
+ HashMap<String,
Integer> fileIdToIdx) {
Review Comment:
Kindly use the interface Map<String, Integer> instead of concrete
definitions whenever possible (type declarations in variables and parameters).
Rest of the codebase usually follow that.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestRDDSimpleBucketPartitioner.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestRDDSimpleBucketPartitioner extends HoodieClientTestHarness {
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ initPath();
+ initSparkContexts("TestRDDSimpleBucketPartitioner");
+ initFileSystem();
+ initTimelineService();
+ }
+
+ @AfterEach
+ public void tearDown() throws IOException {
+ cleanupResources();
+ }
+
+ @ParameterizedTest
+ @MethodSource("configParams")
+ public void testSimpleBucketPartitioner(HoodieTableType type, boolean
partitionSort) throws IOException {
+ HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath,
type);
+ int bucketNum = 2;
+ HoodieWriteConfig config = HoodieWriteConfig
+ .newBuilder()
+ .withPath(basePath)
+ .withSchema(TRIP_EXAMPLE_SCHEMA)
+ .build();
+ config.setValue(HoodieIndexConfig.INDEX_TYPE,
HoodieIndex.IndexType.BUCKET.name());
+ config.setValue(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE,
HoodieIndex.BucketIndexEngineType.SIMPLE.name());
+ config.setValue(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD, "_row_key");
+ config.setValue(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS, "" +
bucketNum);
+ if (partitionSort) {
+ config.setValue(HoodieWriteConfig.BULK_INSERT_SORT_MODE,
BulkInsertSortMode.PARTITION_SORT.name());
+ }
+
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ List<HoodieRecord> records = dataGenerator.generateInserts("0", 100);
+ HoodieJavaRDD<HoodieRecord> javaRDD = HoodieJavaRDD.of(records, context,
1);
+ javaRDD.map(HoodieRecord::getPartitionPath).count();
+
+ final HoodieSparkTable table = HoodieSparkTable.create(config, context);
+ BulkInsertPartitioner partitioner =
BulkInsertInternalPartitionerFactory.get(table, config);
+ JavaRDD<HoodieRecord> repartitionRecords =
+ (JavaRDD<HoodieRecord>)
partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(javaRDD), 1);
+
+ assertEquals(bucketNum *
javaRDD.map(HoodieRecord::getPartitionPath).distinct().count(),
Review Comment:
Can you extend this test such that we perform repartitionRecords after the
first write where the partition and file-ids exist and in the second write for
the same records (as first batch), we check if the records map to the same
bucketIds as expected.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketPartitioner.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.index.bucket.HoodieSimpleBucketIndex;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class RDDSimpleBucketPartitioner<T extends HoodieRecordPayload> extends
RDDBucketIndexPartitioner<T> {
+
+ public RDDSimpleBucketPartitioner(HoodieTable table) {
+ super(table, null, false);
+ ValidationUtils.checkArgument(table.getIndex() instanceof
HoodieSimpleBucketIndex);
+ }
+
+ @Override
+ public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>>
records, int outputPartitions) {
+ HoodieSimpleBucketIndex index = (HoodieSimpleBucketIndex) table.getIndex();
+ HashMap<String, Integer> fileIdToIdx = new HashMap<>();
+
+ // Map <partition, <bucketNo, fileID>>
+ Map<String, HashMap<Integer, String>> partitionMapper =
getPartitionMapper(records, fileIdToIdx);
+
+ return doPartition(records, new Partitioner() {
+ @Override
+ public int numPartitions() {
+ return index.getNumBuckets() * partitionMapper.size();
+ }
+
+ @Override
+ public int getPartition(Object key) {
+ HoodieKey hoodieKey = (HoodieKey) key;
+ String partitionPath = hoodieKey.getPartitionPath();
+ int bucketID = index.getBucketID(hoodieKey);
+ String fileID = partitionMapper.get(partitionPath).get(bucketID);
+ return fileIdToIdx.get(fileID);
+ }
+ });
+ }
+
+ Map<String, HashMap<Integer, String>>
getPartitionMapper(JavaRDD<HoodieRecord<T>> records,
+ HashMap<String,
Integer> fileIdToIdx) {
Review Comment:
fileIdToIdx -> fileIdToBucketIndex
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketPartitioner.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.index.bucket.HoodieSimpleBucketIndex;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class RDDSimpleBucketPartitioner<T extends HoodieRecordPayload> extends
RDDBucketIndexPartitioner<T> {
+
+ public RDDSimpleBucketPartitioner(HoodieTable table) {
+ super(table, null, false);
+ ValidationUtils.checkArgument(table.getIndex() instanceof
HoodieSimpleBucketIndex);
+ }
+
+ @Override
+ public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>>
records, int outputPartitions) {
+ HoodieSimpleBucketIndex index = (HoodieSimpleBucketIndex) table.getIndex();
+ HashMap<String, Integer> fileIdToIdx = new HashMap<>();
+
+ // Map <partition, <bucketNo, fileID>>
+ Map<String, HashMap<Integer, String>> partitionMapper =
getPartitionMapper(records, fileIdToIdx);
+
+ return doPartition(records, new Partitioner() {
+ @Override
+ public int numPartitions() {
+ return index.getNumBuckets() * partitionMapper.size();
+ }
+
+ @Override
+ public int getPartition(Object key) {
+ HoodieKey hoodieKey = (HoodieKey) key;
+ String partitionPath = hoodieKey.getPartitionPath();
+ int bucketID = index.getBucketID(hoodieKey);
+ String fileID = partitionMapper.get(partitionPath).get(bucketID);
+ return fileIdToIdx.get(fileID);
+ }
+ });
+ }
+
+ Map<String, HashMap<Integer, String>>
getPartitionMapper(JavaRDD<HoodieRecord<T>> records,
+ HashMap<String,
Integer> fileIdToIdx) {
+
+ HoodieSimpleBucketIndex index = (HoodieSimpleBucketIndex) table.getIndex();
+ int numBuckets = index.getNumBuckets();
+ return records
+ .map(HoodieRecord::getPartitionPath)
+ .distinct().collect().stream()
+ .collect(Collectors.toMap(p -> p, p -> {
+ Map<Integer, HoodieRecordLocation> locationMap =
index.loadPartitionBucketIdFileIdMapping(table, p);
+ HashMap<Integer, String> fileIdMap = new HashMap<>();
Review Comment:
rename to bucketIdToFileIdMap
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]