Davis-Zhang-Onehouse commented on code in PR #13489:
URL: https://github.com/apache/hudi/pull/13489#discussion_r2220125626
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java:
##########
@@ -129,4 +130,27 @@ public abstract <I, K, V> List<V> reduceByKey(
public abstract <I, O> O aggregate(HoodieData<I> data, O zeroValue,
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp);
public abstract <T> ReaderContextFactory<T>
getReaderContextFactory(HoodieTableMetaClient metaClient);
+
+ /**
+ * Groups values by key and applies a function to each group of values.
+ * [1 iterator maps to 1 key] It only guarantees that items returned by the
same iterator shares to the same key.
+ * [exact once across iterators] The item returned by the same iterator will
not be returned by other iterators.
+ * [1 key maps to >= 1 iterators] Items belong to the same shard can be
load-balanced across multiple iterators. It's up to API implementations to
decide
+ * load balancing pattern and how many
iterators to split into.
+ * [iterator return sorted values] Values returned via iterator is sorted.
+ *
+ * @param data The input pair<ShardIndex, Item> to process.
+ * @param func Function to apply to each group of items with the same shard
+ * @param shardIndices Set of all possible shard indices that may appear in
the data. This is used for efficient partitioning and load balancing.
+ * @param preservesPartitioning whether to preserve partitioning in the
resulting collection.
+ * @param <S> Type of the shard index (must be Comparable)
+ * @param <V> Type of the value in the input data (must be Comparable)
+ * @param <R> Type of the result
+ * @return Result of applying the function to each group
+ */
+ public <S extends Comparable<S>, V extends Comparable<V>, R> HoodieData<R>
processValuesOfTheSameShards(
+ HoodiePairData<S, V> data, SerializableFunction<Iterator<V>,
Iterator<R>> func, List<S> shardIndices, boolean preservesPartitioning) {
Review Comment:
> should we enforce that Iterator<V> is sorted?
individual implementation should make sure this happens via individual tests.
> in the first arg to the SerializableFunction parameter?
not sure what you mean exactly. Do you want me to change the type from Iter
to ClosableSortingIterator? ClosableSortingIterator internally collect
everything and sort which is not good.
Or else I can implemement a iterator wrapper that errors out if it found non
sorted value is served, this will come with the overhead
lmk which do you prefer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]