robertwb commented on a change in pull request #13208:
URL: https://github.com/apache/beam/pull/13208#discussion_r527149063
##########
File path:
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java
##########
@@ -125,6 +126,45 @@ public Void apply(Iterable<KV<String, Iterable<String>>>
input) {
pipeline.run();
}
+ @Test
+ @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesStatefulParDo.class})
+ public void testWithShardedKeyInGlobalWindow() {
+ // Since with default sharding, the number of subshards of of a key is
nondeterministic, create
Review comment:
Could we go with a larger batch size (say 5 or 10) and also verify that
most batches are of the expected size?
##########
File path:
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java
##########
@@ -92,54 +98,141 @@ public void process(ProcessContext c) {
}
}
- static class StreamingGroupIntoBatchesOverrideFactory<K, V>
+ static class BatchGroupIntoBatchesWithShardedKeyOverrideFactory<K, V>
implements PTransformOverrideFactory<
- PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
GroupIntoBatches<K, V>> {
+ PCollection<KV<K, V>>,
+ PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+ GroupIntoBatches<K, V>.WithShardedKey> {
+
+ @Override
+ public PTransformReplacement<PCollection<KV<K, V>>,
PCollection<KV<ShardedKey<K>, Iterable<V>>>>
+ getReplacementTransform(
+ AppliedPTransform<
+ PCollection<KV<K, V>>,
+ PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+ GroupIntoBatches<K, V>.WithShardedKey>
+ transform) {
+ return PTransformReplacement.of(
+ PTransformReplacements.getSingletonMainInput(transform),
+ new
BatchGroupIntoBatchesWithShardedKey<>(transform.getTransform().getBatchSize()));
+ }
+
+ @Override
+ public Map<PCollection<?>, ReplacementOutput> mapOutputs(
+ Map<TupleTag<?>, PCollection<?>> outputs,
+ PCollection<KV<ShardedKey<K>, Iterable<V>>> newOutput) {
+ return ReplacementOutputs.singleton(outputs, newOutput);
+ }
+ }
+
+ /**
+ * Specialized implementation of {@link GroupIntoBatches.WithShardedKey} for
bounded Dataflow
+ * pipelines.
+ */
+ static class BatchGroupIntoBatchesWithShardedKey<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>,
Iterable<V>>>> {
+
+ private final long batchSize;
+
+ private BatchGroupIntoBatchesWithShardedKey(long batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public PCollection<KV<ShardedKey<K>, Iterable<V>>>
expand(PCollection<KV<K, V>> input) {
+ PCollection<KV<ShardedKey<K>, V>> intermediate_input = shardKeys(input);
+ return intermediate_input.apply(new BatchGroupIntoBatches<>(batchSize));
+ }
+ }
+
+ static class StreamingGroupIntoBatchesWithShardedKeyOverrideFactory<K, V>
+ implements PTransformOverrideFactory<
+ PCollection<KV<K, V>>,
+ PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+ GroupIntoBatches<K, V>.WithShardedKey> {
private final DataflowRunner runner;
- StreamingGroupIntoBatchesOverrideFactory(DataflowRunner runner) {
+ StreamingGroupIntoBatchesWithShardedKeyOverrideFactory(DataflowRunner
runner) {
this.runner = runner;
}
@Override
- public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K,
Iterable<V>>>>
+ public PTransformReplacement<PCollection<KV<K, V>>,
PCollection<KV<ShardedKey<K>, Iterable<V>>>>
getReplacementTransform(
AppliedPTransform<
- PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
GroupIntoBatches<K, V>>
+ PCollection<KV<K, V>>,
+ PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+ GroupIntoBatches<K, V>.WithShardedKey>
transform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
- new StreamingGroupIntoBatches(runner, transform.getTransform()));
+ new StreamingGroupIntoBatchesWithShardedKey<>(
+ runner,
+ transform.getTransform(),
+ PTransformReplacements.getSingletonMainOutput(transform)));
}
@Override
public Map<PCollection<?>, ReplacementOutput> mapOutputs(
- Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KV<K,
Iterable<V>>> newOutput) {
+ Map<TupleTag<?>, PCollection<?>> outputs,
+ PCollection<KV<ShardedKey<K>, Iterable<V>>> newOutput) {
return ReplacementOutputs.singleton(outputs, newOutput);
}
}
/**
- * Specialized implementation of {@link GroupIntoBatches} for unbounded
Dataflow pipelines. The
- * override does the same thing as the original transform but additionally
record the input to add
- * corresponding properties during the graph translation.
+ * Specialized implementation of {@link GroupIntoBatches.WithShardedKey} for
unbounded Dataflow
+ * pipelines. The override does the same thing as the original transform but
additionally records
+ * the output in order to append required step properties during the graph
translation.
*/
- static class StreamingGroupIntoBatches<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K,
Iterable<V>>>> {
+ static class StreamingGroupIntoBatchesWithShardedKey<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>,
Iterable<V>>>> {
private final transient DataflowRunner runner;
- private final GroupIntoBatches<K, V> original;
+ private final GroupIntoBatches<K, V>.WithShardedKey original_transform;
+ private final PCollection<KV<ShardedKey<K>, Iterable<V>>> original_output;
- public StreamingGroupIntoBatches(DataflowRunner runner,
GroupIntoBatches<K, V> original) {
+ public StreamingGroupIntoBatchesWithShardedKey(
+ DataflowRunner runner,
+ GroupIntoBatches<K, V>.WithShardedKey original,
+ PCollection<KV<ShardedKey<K>, Iterable<V>>> output) {
this.runner = runner;
- this.original = original;
+ this.original_transform = original;
+ this.original_output = output;
}
@Override
- public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input)
{
- runner.maybeRecordPCollectionWithAutoSharding(input);
- return input.apply(original);
+ public PCollection<KV<ShardedKey<K>, Iterable<V>>>
expand(PCollection<KV<K, V>> input) {
+ // Record the output PCollection of the original transform since the new
output will be
+ // replaced by the original one when the replacement transform is wired
to other nodes in the
+ // graph, although the old and the new outputs are effectively the same.
+ runner.maybeRecordPCollectionWithAutoSharding(original_output);
+ return input.apply(original_transform);
}
}
+
+ private static final long uuid = UUID.randomUUID().getMostSignificantBits();
Review comment:
Call this workerUuid?
##########
File path:
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java
##########
@@ -92,54 +98,141 @@ public void process(ProcessContext c) {
}
}
- static class StreamingGroupIntoBatchesOverrideFactory<K, V>
+ static class BatchGroupIntoBatchesWithShardedKeyOverrideFactory<K, V>
implements PTransformOverrideFactory<
- PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
GroupIntoBatches<K, V>> {
+ PCollection<KV<K, V>>,
+ PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+ GroupIntoBatches<K, V>.WithShardedKey> {
+
+ @Override
+ public PTransformReplacement<PCollection<KV<K, V>>,
PCollection<KV<ShardedKey<K>, Iterable<V>>>>
+ getReplacementTransform(
+ AppliedPTransform<
+ PCollection<KV<K, V>>,
+ PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+ GroupIntoBatches<K, V>.WithShardedKey>
+ transform) {
+ return PTransformReplacement.of(
+ PTransformReplacements.getSingletonMainInput(transform),
+ new
BatchGroupIntoBatchesWithShardedKey<>(transform.getTransform().getBatchSize()));
+ }
+
+ @Override
+ public Map<PCollection<?>, ReplacementOutput> mapOutputs(
+ Map<TupleTag<?>, PCollection<?>> outputs,
+ PCollection<KV<ShardedKey<K>, Iterable<V>>> newOutput) {
+ return ReplacementOutputs.singleton(outputs, newOutput);
+ }
+ }
+
+ /**
+ * Specialized implementation of {@link GroupIntoBatches.WithShardedKey} for
bounded Dataflow
+ * pipelines.
+ */
+ static class BatchGroupIntoBatchesWithShardedKey<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>,
Iterable<V>>>> {
+
+ private final long batchSize;
+
+ private BatchGroupIntoBatchesWithShardedKey(long batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public PCollection<KV<ShardedKey<K>, Iterable<V>>>
expand(PCollection<KV<K, V>> input) {
+ PCollection<KV<ShardedKey<K>, V>> intermediate_input = shardKeys(input);
+ return intermediate_input.apply(new BatchGroupIntoBatches<>(batchSize));
+ }
+ }
+
+ static class StreamingGroupIntoBatchesWithShardedKeyOverrideFactory<K, V>
+ implements PTransformOverrideFactory<
+ PCollection<KV<K, V>>,
+ PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+ GroupIntoBatches<K, V>.WithShardedKey> {
private final DataflowRunner runner;
- StreamingGroupIntoBatchesOverrideFactory(DataflowRunner runner) {
+ StreamingGroupIntoBatchesWithShardedKeyOverrideFactory(DataflowRunner
runner) {
this.runner = runner;
}
@Override
- public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K,
Iterable<V>>>>
+ public PTransformReplacement<PCollection<KV<K, V>>,
PCollection<KV<ShardedKey<K>, Iterable<V>>>>
getReplacementTransform(
AppliedPTransform<
- PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
GroupIntoBatches<K, V>>
+ PCollection<KV<K, V>>,
+ PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+ GroupIntoBatches<K, V>.WithShardedKey>
transform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
- new StreamingGroupIntoBatches(runner, transform.getTransform()));
+ new StreamingGroupIntoBatchesWithShardedKey<>(
+ runner,
+ transform.getTransform(),
+ PTransformReplacements.getSingletonMainOutput(transform)));
}
@Override
public Map<PCollection<?>, ReplacementOutput> mapOutputs(
- Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KV<K,
Iterable<V>>> newOutput) {
+ Map<TupleTag<?>, PCollection<?>> outputs,
+ PCollection<KV<ShardedKey<K>, Iterable<V>>> newOutput) {
return ReplacementOutputs.singleton(outputs, newOutput);
}
}
/**
- * Specialized implementation of {@link GroupIntoBatches} for unbounded
Dataflow pipelines. The
- * override does the same thing as the original transform but additionally
record the input to add
- * corresponding properties during the graph translation.
+ * Specialized implementation of {@link GroupIntoBatches.WithShardedKey} for
unbounded Dataflow
+ * pipelines. The override does the same thing as the original transform but
additionally records
+ * the output in order to append required step properties during the graph
translation.
*/
- static class StreamingGroupIntoBatches<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K,
Iterable<V>>>> {
+ static class StreamingGroupIntoBatchesWithShardedKey<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>,
Iterable<V>>>> {
private final transient DataflowRunner runner;
- private final GroupIntoBatches<K, V> original;
+ private final GroupIntoBatches<K, V>.WithShardedKey original_transform;
+ private final PCollection<KV<ShardedKey<K>, Iterable<V>>> original_output;
- public StreamingGroupIntoBatches(DataflowRunner runner,
GroupIntoBatches<K, V> original) {
+ public StreamingGroupIntoBatchesWithShardedKey(
+ DataflowRunner runner,
+ GroupIntoBatches<K, V>.WithShardedKey original,
+ PCollection<KV<ShardedKey<K>, Iterable<V>>> output) {
this.runner = runner;
- this.original = original;
+ this.original_transform = original;
+ this.original_output = output;
}
@Override
- public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input)
{
- runner.maybeRecordPCollectionWithAutoSharding(input);
- return input.apply(original);
+ public PCollection<KV<ShardedKey<K>, Iterable<V>>>
expand(PCollection<KV<K, V>> input) {
+ // Record the output PCollection of the original transform since the new
output will be
+ // replaced by the original one when the replacement transform is wired
to other nodes in the
+ // graph, although the old and the new outputs are effectively the same.
+ runner.maybeRecordPCollectionWithAutoSharding(original_output);
+ return input.apply(original_transform);
}
}
+
+ private static final long uuid = UUID.randomUUID().getMostSignificantBits();
Review comment:
Maybe take the sum of getMostSignificantBits() and
getLeastSignificantBits()?
##########
File path:
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java
##########
@@ -92,9 +96,58 @@ public void process(ProcessContext c) {
}
}
+ static class BatchGroupIntoBatchesWithShardedKeyOverrideFactory<K, V>
Review comment:
Well, we currently give different overrides in the two cases for
Dataflow. Everything works, but there are different performance characteristics
that encourage different implementations. (One could argue that this is
runner-specific logic.) Fine not to change now.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]