yihua commented on a change in pull request #4441:
URL: https://github.com/apache/hudi/pull/4441#discussion_r814506957
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
##########
@@ -18,24 +18,64 @@
package org.apache.hudi.table;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.io.WriteHandleFactory;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
/**
* Repartition input records into at least expected number of output spark
partitions. It should give below guarantees -
* Output spark partition will have records from only one hoodie partition. -
Average records per output spark
* partitions should be almost equal to (#inputRecords /
#outputSparkPartitions) to avoid possible skews.
*/
-public interface BulkInsertPartitioner<I> {
+public abstract class BulkInsertPartitioner<I> implements Serializable {
+
+ private WriteHandleFactory defaultWriteHandleFactory;
+ private List<String> fileIdPfx;
/**
- * Repartitions the input records into at least expected number of output
spark partitions.
+ * Repartitions the input records into at least expected number of output
spark partitions,
+ * and generates fileIdPfx for each partition.
*
* @param records Input Hoodie records
* @param outputSparkPartitions Expected number of output partitions
* @return
*/
- I repartitionRecords(I records, int outputSparkPartitions);
+ public abstract I repartitionRecords(I records, int outputSparkPartitions);
/**
* @return {@code true} if the records within a partition are sorted; {@code
false} otherwise.
*/
- boolean arePartitionRecordsSorted();
+ public abstract boolean arePartitionRecordsSorted();
+
+ public List<String> getFileIdPfx() {
+ return fileIdPfx;
+ }
+
+ public void setDefaultWriteHandleFactory(WriteHandleFactory
defaultWriteHandleFactory) {
+ this.defaultWriteHandleFactory = defaultWriteHandleFactory;
+ }
+
+ /**
+ * Return write handle factory for the given partition.
+ * By default, return the pre-assigned write handle factory for all
partitions
+ * @param partition data partition
+ * @return
+ */
+ public WriteHandleFactory getWriteHandleFactory(int partition) {
+ return defaultWriteHandleFactory;
+ }
+
+ /**
+ * Initialize a list of file id prefix randomly.
+ * In most cases, bulk_insert put all incoming records to randomly generated
file groups (i.e., the current default implementation).
Review comment:
nit: "randomly generated file groups"
-> "randomly generated file group ID prefixes"
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
##########
@@ -18,24 +18,64 @@
package org.apache.hudi.table;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.io.WriteHandleFactory;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
/**
* Repartition input records into at least expected number of output spark
partitions. It should give below guarantees -
* Output spark partition will have records from only one hoodie partition. -
Average records per output spark
* partitions should be almost equal to (#inputRecords /
#outputSparkPartitions) to avoid possible skews.
*/
-public interface BulkInsertPartitioner<I> {
+public abstract class BulkInsertPartitioner<I> implements Serializable {
Review comment:
This interface is public and users may implement their own bulk insert
partitioner as a plugin. The change from interface to abstract class is not
backward compatible. Could you keep it as an interface and use `default`
methods for new logic?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
##########
@@ -18,24 +18,64 @@
package org.apache.hudi.table;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.io.WriteHandleFactory;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
/**
* Repartition input records into at least expected number of output spark
partitions. It should give below guarantees -
* Output spark partition will have records from only one hoodie partition. -
Average records per output spark
* partitions should be almost equal to (#inputRecords /
#outputSparkPartitions) to avoid possible skews.
*/
-public interface BulkInsertPartitioner<I> {
+public abstract class BulkInsertPartitioner<I> implements Serializable {
+
+ private WriteHandleFactory defaultWriteHandleFactory;
+ private List<String> fileIdPfx;
/**
- * Repartitions the input records into at least expected number of output
spark partitions.
+ * Repartitions the input records into at least expected number of output
spark partitions,
+ * and generates fileIdPfx for each partition.
*
* @param records Input Hoodie records
* @param outputSparkPartitions Expected number of output partitions
* @return
*/
- I repartitionRecords(I records, int outputSparkPartitions);
+ public abstract I repartitionRecords(I records, int outputSparkPartitions);
/**
* @return {@code true} if the records within a partition are sorted; {@code
false} otherwise.
*/
- boolean arePartitionRecordsSorted();
+ public abstract boolean arePartitionRecordsSorted();
+
+ public List<String> getFileIdPfx() {
+ return fileIdPfx;
+ }
+
+ public void setDefaultWriteHandleFactory(WriteHandleFactory
defaultWriteHandleFactory) {
Review comment:
Should `setDefaultWriteHandleFactory()` functionality be implemented
through the constructor with the defaultWriteHandleFactory passed in? e.g.,
```
public GlobalSortPartitioner(WriteHandleFactory defaultWriteHandleFactory);
```
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java
##########
@@ -41,27 +41,24 @@
private boolean areRecordsSorted;
private HoodieWriteConfig config;
private HoodieTable hoodieTable;
- private List<String> fileIDPrefixes;
private boolean useWriterSchema;
- private WriteHandleFactory writeHandleFactory;
+ private BulkInsertPartitioner partitioner;
public BulkInsertMapFunction(String instantTime, boolean areRecordsSorted,
HoodieWriteConfig config, HoodieTable
hoodieTable,
- List<String> fileIDPrefixes, boolean
useWriterSchema,
- WriteHandleFactory writeHandleFactory) {
+ boolean useWriterSchema, BulkInsertPartitioner
partitioner) {
this.instantTime = instantTime;
this.areRecordsSorted = areRecordsSorted;
this.config = config;
this.hoodieTable = hoodieTable;
- this.fileIDPrefixes = fileIDPrefixes;
this.useWriterSchema = useWriterSchema;
- this.writeHandleFactory = writeHandleFactory;
+ this.partitioner = partitioner;
}
@Override
public Iterator<List<WriteStatus>> call(Integer partition,
Iterator<HoodieRecord<T>> recordItr) {
return new SparkLazyInsertIterable<>(recordItr, areRecordsSorted, config,
instantTime, hoodieTable,
- fileIDPrefixes.get(partition), hoodieTable.getTaskContextSupplier(),
useWriterSchema,
- writeHandleFactory);
+ (String)partitioner.getFileIdPfx().get(partition),
hoodieTable.getTaskContextSupplier(), useWriterSchema,
Review comment:
It's better to apply the partition ID -> file ID here if the partitioner
just stores the function.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
##########
@@ -18,24 +18,64 @@
package org.apache.hudi.table;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.io.WriteHandleFactory;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
/**
* Repartition input records into at least expected number of output spark
partitions. It should give below guarantees -
* Output spark partition will have records from only one hoodie partition. -
Average records per output spark
* partitions should be almost equal to (#inputRecords /
#outputSparkPartitions) to avoid possible skews.
*/
-public interface BulkInsertPartitioner<I> {
+public abstract class BulkInsertPartitioner<I> implements Serializable {
+
+ private WriteHandleFactory defaultWriteHandleFactory;
+ private List<String> fileIdPfx;
Review comment:
nit: rename to `fileIdPrefixList`
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
##########
@@ -18,24 +18,64 @@
package org.apache.hudi.table;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.io.WriteHandleFactory;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
/**
* Repartition input records into at least expected number of output spark
partitions. It should give below guarantees -
* Output spark partition will have records from only one hoodie partition. -
Average records per output spark
* partitions should be almost equal to (#inputRecords /
#outputSparkPartitions) to avoid possible skews.
*/
-public interface BulkInsertPartitioner<I> {
+public abstract class BulkInsertPartitioner<I> implements Serializable {
+
+ private WriteHandleFactory defaultWriteHandleFactory;
+ private List<String> fileIdPfx;
Review comment:
After looking at this PR as a whole, I'm thinking that it may be better
to store a generating function of partitionId -> fileIdPrefix, and partitionId
-> writeHandleFactory and have those functions passed in from the constructor.
Do you have any PoC of BulkInsertPartitioner implementation that provides
partition-specific file ID and write handle factory? I'd like to understand
how these are coupled with the repartition logic and how the interface design
can accommodate the use case.
##########
File path:
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java
##########
@@ -64,7 +65,11 @@ public JavaSortAndSizeExecutionStrategy(HoodieTable table,
props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(),
Boolean.FALSE.toString());
props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(),
String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
HoodieWriteConfig newConfig =
HoodieWriteConfig.newBuilder().withProps(props).build();
+
+ BulkInsertPartitioner partitioner = getPartitioner(strategyParams, schema);
+ partitioner.setDefaultWriteHandleFactory(new
CreateHandleFactory(preserveHoodieMetadata));
Review comment:
This can be achieved through constructor.
##########
File path:
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
##########
@@ -50,6 +50,7 @@ public JavaCustomColumnsSortPartitioner(String[] columnNames,
Schema schema, boo
@Override
public List<HoodieRecord<T>> repartitionRecords(
List<HoodieRecord<T>> records, int outputSparkPartitions) {
+ generateFileIdPfx(outputSparkPartitions);
Review comment:
Wondering if this can be achieved by a function (`func`) passed to the
constructor and the logic of sth like `IntStream.range(0,
outputSparkPartitions).mapToObj(i -> func.apply(i))`?
##########
File path:
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
##########
@@ -117,7 +116,7 @@ public static JavaBulkInsertHelper newInstance() {
new JavaLazyInsertIterable<>(repartitionedRecords.iterator(), true,
config, instantTime, table,
fileIdPrefixProvider.createFilePrefix(""),
table.getTaskContextSupplier(),
- new CreateHandleFactory<>()).forEachRemaining(writeStatuses::addAll);
+
partitioner.getWriteHandleFactory(0)).forEachRemaining(writeStatuses::addAll);
Review comment:
This looks hacky
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]