[
https://issues.apache.org/jira/browse/BEAM-10475?focusedWorklogId=546486&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-546486
]
ASF GitHub Bot logged work on BEAM-10475:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 03/Feb/21 01:09
Start Date: 03/Feb/21 01:09
Worklog Time Spent: 10m
Work Description: boyuanzz commented on a change in pull request #13805:
URL: https://github.com/apache/beam/pull/13805#discussion_r568346093
##########
File path:
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -1365,21 +1365,27 @@ void
addPCollectionRequiringIndexedFormat(PCollection<?> pcol) {
}
void maybeRecordPCollectionWithAutoSharding(PCollection<?> pcol) {
- if (hasExperiment(options, "beam_fn_api")) {
+ if (!options.isEnableStreamingEngine()) {
LOG.warn(
- "Runner determined sharding not available in Dataflow for
GroupIntoBatches for portable "
- + "jobs. Default sharding will be applied.");
+ "Runner determined sharding not available in Dataflow for
GroupIntoBatches for "
+ + "non-Streaming Engine jobs. Default sharding will be
applied.");
return;
}
- if (!options.isEnableStreamingEngine()) {
+ if (!hasExperiment(options, "enable_streaming_auto_sharding")) {
LOG.warn(
- "Runner determined sharding not available in Dataflow for
GroupIntoBatches for Streaming "
- + "Appliance jobs. Default sharding will be applied.");
+ "Runner determined sharding not enabled in Dataflow for
GroupIntoBatches for Streaming "
+ + "Engine jobs: --enable_streaming_auto_sharding=false. Default
sharding will be "
+ + "applied.");
return;
}
- if (hasExperiment(options, "enable_streaming_auto_sharding")) {
- pcollectionsRequiringAutoSharding.add(pcol);
+ if (hasExperiment(options, "beam_fn_api") && !hasExperiment(options,
"use_runner_v2")) {
+ LOG.warn(
+ "Runner determined sharding not available in Dataflow for
GroupIntoBatches for portable "
+ + "jobs not using runner v2: --beam_fn_api=true,
--use_runner_v2=false. Default "
+ + "sharding will be applied.");
+ return;
Review comment:
Will a warning message be enough here? Should we reject the job? (I
should comment on this early. Sorry that I forgot to do so : ( )
##########
File path:
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -1365,21 +1365,27 @@ void
addPCollectionRequiringIndexedFormat(PCollection<?> pcol) {
}
void maybeRecordPCollectionWithAutoSharding(PCollection<?> pcol) {
- if (hasExperiment(options, "beam_fn_api")) {
+ if (!options.isEnableStreamingEngine()) {
LOG.warn(
- "Runner determined sharding not available in Dataflow for
GroupIntoBatches for portable "
- + "jobs. Default sharding will be applied.");
+ "Runner determined sharding not available in Dataflow for
GroupIntoBatches for "
+ + "non-Streaming Engine jobs. Default sharding will be
applied.");
return;
}
- if (!options.isEnableStreamingEngine()) {
+ if (!hasExperiment(options, "enable_streaming_auto_sharding")) {
LOG.warn(
- "Runner determined sharding not available in Dataflow for
GroupIntoBatches for Streaming "
- + "Appliance jobs. Default sharding will be applied.");
+ "Runner determined sharding not enabled in Dataflow for
GroupIntoBatches for Streaming "
+ + "Engine jobs: --enable_streaming_auto_sharding=false. Default
sharding will be "
+ + "applied.");
return;
}
- if (hasExperiment(options, "enable_streaming_auto_sharding")) {
- pcollectionsRequiringAutoSharding.add(pcol);
+ if (hasExperiment(options, "beam_fn_api") && !hasExperiment(options,
"use_runner_v2")) {
+ LOG.warn(
+ "Runner determined sharding not available in Dataflow for
GroupIntoBatches for portable "
+ + "jobs not using runner v2: --beam_fn_api=true,
--use_runner_v2=false. Default "
+ + "sharding will be applied.");
+ return;
Review comment:
I would prefer rejecting the pipeline since it's also bad when customer
expects autosharding but it's not.
##########
File path:
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -1365,21 +1365,22 @@ void
addPCollectionRequiringIndexedFormat(PCollection<?> pcol) {
}
void maybeRecordPCollectionWithAutoSharding(PCollection<?> pcol) {
- if (hasExperiment(options, "beam_fn_api")) {
- LOG.warn(
- "Runner determined sharding not available in Dataflow for
GroupIntoBatches for portable "
- + "jobs. Default sharding will be applied.");
- return;
- }
- if (!options.isEnableStreamingEngine()) {
- LOG.warn(
- "Runner determined sharding not available in Dataflow for
GroupIntoBatches for Streaming "
- + "Appliance jobs. Default sharding will be applied.");
- return;
- }
- if (hasExperiment(options, "enable_streaming_auto_sharding")) {
- pcollectionsRequiringAutoSharding.add(pcol);
- }
+ // Auto-sharding is only supported in Streaming Engine.
+ checkArgument(
+ options.isEnableStreamingEngine(),
+ "Runner determined sharding not available in Dataflow for
GroupIntoBatches for"
Review comment:
The error messages here and below sounds confusing to me. If we find
that pipeline options is not valid, how about we give the correct pipeline
options in the message? We can have this improvement in a separated PR.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 546486)
Time Spent: 28h 50m (was: 28h 40m)
> GroupIntoBatches with Runner-determined Sharding
> ------------------------------------------------
>
> Key: BEAM-10475
> URL: https://issues.apache.org/jira/browse/BEAM-10475
> Project: Beam
> Issue Type: Improvement
> Components: runner-dataflow
> Reporter: Siyuan Chen
> Assignee: Siyuan Chen
> Priority: P2
> Labels: GCP, performance
> Time Spent: 28h 50m
> Remaining Estimate: 0h
>
> [https://s.apache.org/sharded-group-into-batches|https://s.apache.org/sharded-group-into-batches__]
> Improve the existing Beam transform, GroupIntoBatches, to allow runners to
> choose different sharding strategies depending on how the data needs to be
> grouped. The goal is to help with the situation where the elements to process
> need to be co-located to reduce the overhead that would otherwise be incurred
> per element, while not losing the ability to scale the parallelism. The
> essential idea is to build a stateful DoFn with shardable states.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)