scwhittle commented on code in PR #36112:
URL: https://github.com/apache/beam/pull/36112#discussion_r2354742130
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java:
##########
@@ -157,8 +168,16 @@ public boolean getAllowDuplicates() {
@Override
public PCollection<T> expand(PCollection<T> input) {
- return input
- .apply("Pair with random key", ParDo.of(new
AssignShardFn<>(numBuckets)))
+ PCollection<KV<Integer, T>> sharded;
+ if (deterministicSharding) {
+ sharded =
+ input.apply(
+ "Pair with deterministic key",
+ ParDo.of(new AssignDeterministicShardFn<T>(numBuckets)));
Review Comment:
One idea would be to have pipeline level test verify the
AssignDeterministicShardFn is used and then a unit test of
AssignDeterministicShardFn that just creates fake ProcessContext to input and
verify the output.
But see other comment before doing that.
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java:
##########
@@ -132,33 +133,55 @@ public void processElement(
public static class RedistributeArbitrarily<T>
extends PTransform<PCollection<T>, PCollection<T>> {
// The number of buckets to shard into.
- // A runner is free to ignore this (a runner may ignore the transorm
+ // A runner is free to ignore this (a runner may ignore the transform
// entirely!) This is a performance optimization to prevent having
// unit sized bundles on the output. If unset, uses a random integer key.
private @Nullable Integer numBuckets = null;
private boolean allowDuplicates = false;
+ private boolean deterministicSharding = false;
- private RedistributeArbitrarily(@Nullable Integer numBuckets, boolean
allowDuplicates) {
+ private RedistributeArbitrarily(
+ @Nullable Integer numBuckets, boolean allowDuplicates, boolean
deterministicSharding) {
this.numBuckets = numBuckets;
this.allowDuplicates = allowDuplicates;
+ this.deterministicSharding = deterministicSharding;
}
public RedistributeArbitrarily<T> withNumBuckets(@Nullable Integer
numBuckets) {
- return new RedistributeArbitrarily<>(numBuckets, this.allowDuplicates);
+ return new RedistributeArbitrarily<>(
+ numBuckets, this.allowDuplicates, this.deterministicSharding);
}
public RedistributeArbitrarily<T> withAllowDuplicates(boolean
allowDuplicates) {
- return new RedistributeArbitrarily<>(this.numBuckets, allowDuplicates);
+ return new RedistributeArbitrarily<>(
+ this.numBuckets, allowDuplicates, this.deterministicSharding);
+ }
+
+ public RedistributeArbitrarily<T> withDeterministicSharding(boolean
deterministicSharding) {
Review Comment:
this seems dangerous as is because it is unclear that it is going to use the
offset for the sharing, and general uses will not have that set and just end up
going to a single key.
Another idea would be to fingerprint the timestamp/window instead or as well
(doing the T itself seems difficult without encoding it) but see my comment in
KafkaIO about possibly just doing it there instead of changing
Redistribute.arbitrariy
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1845,18 +1846,16 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin
input) {
"Offsets committed due to usage of commitOffsetsInFinalize()
and may not capture all work processed due to use of withRedistribute() with
duplicates enabled");
}
- if (kafkaRead.getRedistributeNumKeys() == 0) {
- return output.apply(
- "Insert Redistribute",
- Redistribute.<KafkaRecord<K, V>>arbitrarily()
- .withAllowDuplicates(kafkaRead.isAllowDuplicates()));
- } else {
- return output.apply(
- "Insert Redistribute with Shards",
- Redistribute.<KafkaRecord<K, V>>arbitrarily()
- .withAllowDuplicates(kafkaRead.isAllowDuplicates())
- .withNumBuckets((int) kafkaRead.getRedistributeNumKeys()));
+ RedistributeArbitrarily<KafkaRecord<K, V>> redistribute =
+ Redistribute.<KafkaRecord<K, V>>arbitrarily()
+ .withAllowDuplicates(kafkaRead.isAllowDuplicates());
+ if (kafkaRead.getOffsetDeduplication() != null &&
kafkaRead.getOffsetDeduplication()) {
+ redistribute = redistribute.withDeterministicSharding(true);
Review Comment:
thinking about this some more another option would be to not make this part
of Redistribute.arbitrarily, but to instead to assign deterministic keys as
part of KafkaIO read. We could for example redistribute records by key based
upon the K within KafkaRecord<K, V> or have some option to use the shards based
upon the offset like we are doing within redistribute.arbitrarily
I believe @stankiewicz indicated that this would be more useful anyway for
customers that we were hoping would benefit from this feature. And it keeps
the complexity of using a deterministic output within KafkaIO instead of
Redistribute.
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1845,18 +1846,16 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin
input) {
"Offsets committed due to usage of commitOffsetsInFinalize()
and may not capture all work processed due to use of withRedistribute() with
duplicates enabled");
}
- if (kafkaRead.getRedistributeNumKeys() == 0) {
- return output.apply(
- "Insert Redistribute",
- Redistribute.<KafkaRecord<K, V>>arbitrarily()
- .withAllowDuplicates(kafkaRead.isAllowDuplicates()));
- } else {
- return output.apply(
- "Insert Redistribute with Shards",
Review Comment:
this name is still changing
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]