Brent Worden created BEAM-13225:
-----------------------------------
Summary: Dataflow Prime job fails when providing resource hints on
a transform
Key: BEAM-13225
URL: https://issues.apache.org/jira/browse/BEAM-13225
Project: Beam
Issue Type: Bug
Components: runner-dataflow, sdk-java-core
Affects Versions: 2.32.0
Reporter: Brent Worden
I have a classic Dataflow template written using the Apache Beam Java SDK
v2.32.0. The template simply consumes messages from a Pub/Sub subscription and
writes them to Google Cloud Storage.
The template can successfully be used to run jobs with [Dataflow Prime][1]
experimental features enabled through `--additional-experiments enable_prime`
and providing a pipeline level resource hint using
`--parameters=resourceHints=min_ram=8GiB`:
```lang-sh
gcloud dataflow jobs run my-job-name \
--additional-experiments enable_prime \
--disable-public-ips \
--gcs-location gs://bucket/path/to/template \
--num-workers 1 \
--max-workers 16 \
--parameters=resourceHints=min_ram=8GiB,other_pipeline_options=true \
--project my-project \
--region us-central1 \
--service-account-email [email protected]
\
--staging-location gs://bucket/path/to/staging
--subnetwork
https://www.googleapis.com/compute/v1/projects/my-project/regions/us-central1/subnetworks/my-subnet
```
In an attempt to use Dataflow Prime's [Right Fitting][2] capability, I change
the pipeline code to include a resource hint on the FileIO transform:
```lang-java
class WriteGcsFileTransform
extends PTransform<PCollection<Input>, WriteFilesResult<Destination>> {
private static final long serialVersionUID = 1L;
@Override
public WriteFilesResult<Destination> expand(PCollection<Input> input) {
return input.apply(
FileIO.<Destination, Input>writeDynamic()
.by(myDynamicDestinationFunction)
.withDestinationCoder(Destination.coder())
.withNumShards(8)
.withNaming(myDestinationFileNamingFunction)
.withTempDirectory("gs://bucket/path/to/temp")
.withCompression(Compression.GZIP)
.setResourceHints(ResourceHints.create().withMinRam("32GiB"))
);
}
```
Attempting to run jobs from a template based on the new code results in a
continuous crash loop with the job never successfully running. The lone
repeated error log entry is:
```lang-json
{
"insertId":
"s=97e1ecd30e0243609d555685318325b4;i=4e1;b=6c7f5d65f3994eada5f20672dab1daf1;m=912f16c;t=5d024689cb030;x=b36751718b3d80c1",
"jsonPayload": {
"line": "pod_workers.go:191",
"message": "Error syncing pod 4cf7cbf98df4b5e2d054abce7da1262b
(\"df-df-hvm-my-job-name-11061310-qn51-harness-jb9f_default(4cf7c6bf982df4b5eb2d054abce7da12)\"),
skipping: failed to \"StartContainer\" for \"artifact\" with CrashLoopBackOff:
\"back-off 40s restarting failed container=artifact
pod=df-df-hvm-my-job-name-11061310-qn51-harness-jb9f_default(4cf7c6bf982df4b5eb2d054abce7da12)\"",
"thread": "807"
},
"resource": {
"type": "dataflow_step",
"labels": {
"project_id": "my-project",
"region": "us-central1",
"step_id": "",
"job_id": "2021-11-06_12_10_27-510057810808146686",
"job_name": "my-job-name"
}
},
"timestamp": "2021-11-06T20:14:36.052491Z",
"severity": "ERROR",
"labels": {
"compute.googleapis.com/resource_type": "instance",
"dataflow.googleapis.com/log_type": "system",
"compute.googleapis.com/resource_id": "4695846446965678007",
"dataflow.googleapis.com/job_name": "my-job-name",
"dataflow.googleapis.com/job_id": "2021-11-06_12_10_27-510057810808146686",
"dataflow.googleapis.com/region": "us-central1",
"dataflow.googleapis.com/service_option": "prime",
"compute.googleapis.com/resource_name":
"df-hvm-my-job-name-11061310-qn51-harness-jb9f"
},
"logName": "projects/my-project/logs/dataflow.googleapis.com%2Fkubelet",
"receiveTimestamp": "2021-11-06T20:14:46.471285909Z"
}
```
Am I using resource hints on the transform incorrectly?
[1]: https://cloud.google.com/dataflow/docs/guides/enable-dataflow-prime
[2]: https://cloud.google.com/dataflow/docs/guides/right-fitting
--
This message was sent by Atlassian Jira
(v8.20.1#820001)