chamikaramj commented on a change in pull request #15731:
URL: https://github.com/apache/beam/pull/15731#discussion_r737015128
##########
File path:
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
##########
@@ -716,13 +716,13 @@ private void addOutput(String name, PValue value,
Coder<?> valueCoder) {
translator.registerOutputName(value, name);
// If the output requires runner determined sharding, also append
necessary input properties.
- if (value instanceof PCollection
- &&
translator.runner.doesPCollectionRequireAutoSharding((PCollection<?>) value)) {
- addInput(PropertyNames.ALLOWS_SHARDABLE_STATE, "true");
- // Currently we only allow auto-sharding to be enabled through the
GroupIntoBatches
- // transform. So we also add the following property which
GroupIntoBatchesDoFn has, to allow
- // the backend to perform graph optimization.
- addInput(PropertyNames.PRESERVES_KEYS, "true");
+ if (value instanceof PCollection) {
+ if
(translator.runner.doesPCollectionRequireAutoSharding((PCollection<?>) value)) {
+ addInput(PropertyNames.ALLOWS_SHARDABLE_STATE, "true");
+ }
+ if (translator.runner.doesPCollectionPreserveKeys((PCollection<?>)
value)) {
+ addInput(PropertyNames.PRESERVES_KEYS, "true");
Review comment:
I think we need a similar update in the Dataflow service to support
Portable Job Submission.
##########
File path:
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java
##########
@@ -152,14 +180,84 @@ public void process(ProcessContext c) {
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>,
Iterable<V>>>> {
private final BatchingParams<V> batchingParams;
+ private final transient DataflowRunner runner;
+ private final transient PCollection<KV<ShardedKey<K>, Iterable<V>>>
originalOutput;
- private BatchGroupIntoBatchesWithShardedKey(BatchingParams<V>
batchingParams) {
+ private BatchGroupIntoBatchesWithShardedKey(
+ BatchingParams<V> batchingParams,
+ DataflowRunner runner,
+ PCollection<KV<ShardedKey<K>, Iterable<V>>> originalOutput) {
this.batchingParams = batchingParams;
+ this.runner = runner;
+ this.originalOutput = originalOutput;
}
@Override
public PCollection<KV<ShardedKey<K>, Iterable<V>>>
expand(PCollection<KV<K, V>> input) {
- return shardKeys(input).apply(new
BatchGroupIntoBatches<>(batchingParams));
+ return shardKeys(input)
+ .apply(new BatchGroupIntoBatches<>(batchingParams, runner,
originalOutput));
+ }
+ }
+
+ static class StreamingGroupIntoBatchesOverrideFactory<K, V>
+ implements PTransformOverrideFactory<
+ PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
GroupIntoBatches<K, V>> {
+
+ private final DataflowRunner runner;
+
+ StreamingGroupIntoBatchesOverrideFactory(DataflowRunner runner) {
+ this.runner = runner;
+ }
+
+ @Override
+ public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K,
Iterable<V>>>>
+ getReplacementTransform(
+ AppliedPTransform<
+ PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
GroupIntoBatches<K, V>>
+ transform) {
+ return PTransformReplacement.of(
+ PTransformReplacements.getSingletonMainInput(transform),
+ new StreamingGroupIntoBatches<>(
+ runner,
+ transform.getTransform(),
+ PTransformReplacements.getSingletonMainOutput(transform)));
+ }
+
+ @Override
+ public Map<PCollection<?>, ReplacementOutput> mapOutputs(
+ Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KV<K,
Iterable<V>>> newOutput) {
+ return ReplacementOutputs.singleton(outputs, newOutput);
+ }
+ }
+
+ /**
+ * Specialized implementation of {@link GroupIntoBatches} for unbounded
Dataflow pipelines. The
+ * override does the same thing as the original transform but additionally
records the output in
+ * order to append required step properties during the graph translation.
+ */
+ static class StreamingGroupIntoBatches<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K,
Iterable<V>>>> {
+
+ private final transient DataflowRunner runner;
Review comment:
Any downside to keeping a pointer to the runner object in the PTransform
? I think this might be OK since this is a Dataflow specific transform but
wanted to point this out.
cc: @kennknowles
##########
File path:
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
##########
@@ -1133,6 +1157,55 @@ private JobSpecification
runStreamingGroupIntoBatchesAndGetJobSpec(
pipeline, pipelineProto, sdkComponents, runner,
Collections.emptyList());
}
+ @Test
+ public void testBatchGroupIntoBatchesTranslation() throws Exception {
+ JobSpecification jobSpec =
+ runBatchGroupIntoBatchesAndGetJobSpec(false, Collections.emptyList());
+ List<Step> steps = jobSpec.getJob().getSteps();
+ Step shardedStateStep = steps.get(steps.size() - 1);
+ Map<String, Object> properties = shardedStateStep.getProperties();
+ assertTrue(properties.containsKey(PropertyNames.PRESERVES_KEYS));
+ assertEquals("true", getString(properties, PropertyNames.PRESERVES_KEYS));
+ }
+
+ @Test
+ public void testBatchGroupIntoBatchesWithShardedKeyTranslation() throws
Exception {
+ List<String> experiments = Collections.emptyList();
+ JobSpecification jobSpec = runBatchGroupIntoBatchesAndGetJobSpec(true,
experiments);
+ List<Step> steps = jobSpec.getJob().getSteps();
+ Step shardedStateStep = steps.get(steps.size() - 1);
+ Map<String, Object> properties = shardedStateStep.getProperties();
+ assertTrue(properties.containsKey(PropertyNames.PRESERVES_KEYS));
+ assertEquals("true", getString(properties, PropertyNames.PRESERVES_KEYS));
+ }
+
+ @Test
+ public void testBatchGroupIntoBatchesTranslationUnifiedWorker() throws
Exception {
Review comment:
Note that Java UW uses Portable Job Submission by default so probably
you need to update Dataflow service (beam.cc) as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]