[ 
https://issues.apache.org/jira/browse/BEAM-13225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brent Worden updated BEAM-13225:
--------------------------------
    Description: 
I have a classic Dataflow template written using the Apache Beam Java SDK 
v2.32.0.  The template simply consumes messages from a Pub/Sub subscription and 
writes them to Google Cloud Storage.

The template can successfully be used to run jobs with [Dataflow 
Prime|https://cloud.google.com/dataflow/docs/guides/enable-dataflow-prime] 
experimental features enabled through {{\-\-additional-experiments 
enable_prime}} and providing a pipeline level resource hint using 
{{\-\-parameters=resourceHints=min_ram=8GiB}}:
{code}
gcloud dataflow jobs run my-job-name \
  --additional-experiments enable_prime \
  --disable-public-ips \
  --gcs-location gs://bucket/path/to/template \
  --num-workers 1  \
  --max-workers 16 \
  --parameters=resourceHints=min_ram=8GiB,other_pipeline_options=true \
  --project my-project \
  --region us-central1 \
  --service-account-email [email protected] 
\
  --staging-location gs://bucket/path/to/staging
  --subnetwork 
https://www.googleapis.com/compute/v1/projects/my-project/regions/us-central1/subnetworks/my-subnet
{code}

In an attempt to use Dataflow Prime's [Right 
Fitting|https://cloud.google.com/dataflow/docs/guides/right-fitting] 
capability, I change the pipeline code to include a resource hint on the FileIO 
transform:
{code}
class WriteGcsFileTransform
    extends PTransform<PCollection<Input>, WriteFilesResult<Destination>> {

  private static final long serialVersionUID = 1L;

  @Override
  public WriteFilesResult<Destination> expand(PCollection<Input> input) {

    return input.apply(
        FileIO.<Destination, Input>writeDynamic()
            .by(myDynamicDestinationFunction)
            .withDestinationCoder(Destination.coder())
            .withNumShards(8)
            .withNaming(myDestinationFileNamingFunction)
            .withTempDirectory("gs://bucket/path/to/temp")
            .withCompression(Compression.GZIP)
            .setResourceHints(ResourceHints.create().withMinRam("32GiB"))
        );
  }
{code}

Attempting to run jobs from a template based on the new code results in a 
continuous crash loop with the job never successfully running.  The lone 
repeated error log entry is:
{code}
{
  "insertId": 
"s=97e1ecd30e0243609d555685318325b4;i=4e1;b=6c7f5d65f3994eada5f20672dab1daf1;m=912f16c;t=5d024689cb030;x=b36751718b3d80c1",
  "jsonPayload": {
    "line": "pod_workers.go:191",
    "message": "Error syncing pod 4cf7cbf98df4b5e2d054abce7da1262b 
(\"df-df-hvm-my-job-name-11061310-qn51-harness-jb9f_default(4cf7c6bf982df4b5eb2d054abce7da12)\"),
 skipping: failed to \"StartContainer\" for \"artifact\" with CrashLoopBackOff: 
\"back-off 40s restarting failed container=artifact 
pod=df-df-hvm-my-job-name-11061310-qn51-harness-jb9f_default(4cf7c6bf982df4b5eb2d054abce7da12)\"",
    "thread": "807"
  },
  "resource": {
    "type": "dataflow_step",
    "labels": {
      "project_id": "my-project",
      "region": "us-central1",
      "step_id": "",
      "job_id": "2021-11-06_12_10_27-510057810808146686",
      "job_name": "my-job-name"
    }
  },
  "timestamp": "2021-11-06T20:14:36.052491Z",
  "severity": "ERROR",
  "labels": {
    "compute.googleapis.com/resource_type": "instance",
    "dataflow.googleapis.com/log_type": "system",
    "compute.googleapis.com/resource_id": "4695846446965678007",
    "dataflow.googleapis.com/job_name": "my-job-name",
    "dataflow.googleapis.com/job_id": "2021-11-06_12_10_27-510057810808146686",
    "dataflow.googleapis.com/region": "us-central1",
    "dataflow.googleapis.com/service_option": "prime",
    "compute.googleapis.com/resource_name": 
"df-hvm-my-job-name-11061310-qn51-harness-jb9f"
  },
  "logName": "projects/my-project/logs/dataflow.googleapis.com%2Fkubelet",
  "receiveTimestamp": "2021-11-06T20:14:46.471285909Z"
}
{code}

If the pipeline level resources hints and step level resources hint are both 
set to 8GiB, the pipeline fails with the same repetitive error.

  was:
I have a classic Dataflow template written using the Apache Beam Java SDK 
v2.32.0.  The template simply consumes messages from a Pub/Sub subscription and 
writes them to Google Cloud Storage.

The template can successfully be used to run jobs with [Dataflow Prime][1] 
experimental features enabled through `--additional-experiments enable_prime` 
and providing a pipeline level resource hint using 
`--parameters=resourceHints=min_ram=8GiB`:
```lang-sh
gcloud dataflow jobs run my-job-name \
  --additional-experiments enable_prime \
  --disable-public-ips \
  --gcs-location gs://bucket/path/to/template \
  --num-workers 1  \
  --max-workers 16 \
  --parameters=resourceHints=min_ram=8GiB,other_pipeline_options=true \
  --project my-project \
  --region us-central1 \
  --service-account-email [email protected] 
\
  --staging-location gs://bucket/path/to/staging
  --subnetwork 
https://www.googleapis.com/compute/v1/projects/my-project/regions/us-central1/subnetworks/my-subnet
```

In an attempt to use Dataflow Prime's [Right Fitting][2] capability, I change 
the pipeline code to include a resource hint on the FileIO transform:
```lang-java
class WriteGcsFileTransform
    extends PTransform<PCollection<Input>, WriteFilesResult<Destination>> {

  private static final long serialVersionUID = 1L;

  @Override
  public WriteFilesResult<Destination> expand(PCollection<Input> input) {

    return input.apply(
        FileIO.<Destination, Input>writeDynamic()
            .by(myDynamicDestinationFunction)
            .withDestinationCoder(Destination.coder())
            .withNumShards(8)
            .withNaming(myDestinationFileNamingFunction)
            .withTempDirectory("gs://bucket/path/to/temp")
            .withCompression(Compression.GZIP)
            .setResourceHints(ResourceHints.create().withMinRam("32GiB"))
        );
  }
```

Attempting to run jobs from a template based on the new code results in a 
continuous crash loop with the job never successfully running.  The lone 
repeated error log entry is:
```lang-json
{
  "insertId": 
"s=97e1ecd30e0243609d555685318325b4;i=4e1;b=6c7f5d65f3994eada5f20672dab1daf1;m=912f16c;t=5d024689cb030;x=b36751718b3d80c1",
  "jsonPayload": {
    "line": "pod_workers.go:191",
    "message": "Error syncing pod 4cf7cbf98df4b5e2d054abce7da1262b 
(\"df-df-hvm-my-job-name-11061310-qn51-harness-jb9f_default(4cf7c6bf982df4b5eb2d054abce7da12)\"),
 skipping: failed to \"StartContainer\" for \"artifact\" with CrashLoopBackOff: 
\"back-off 40s restarting failed container=artifact 
pod=df-df-hvm-my-job-name-11061310-qn51-harness-jb9f_default(4cf7c6bf982df4b5eb2d054abce7da12)\"",
    "thread": "807"
  },
  "resource": {
    "type": "dataflow_step",
    "labels": {
      "project_id": "my-project",
      "region": "us-central1",
      "step_id": "",
      "job_id": "2021-11-06_12_10_27-510057810808146686",
      "job_name": "my-job-name"
    }
  },
  "timestamp": "2021-11-06T20:14:36.052491Z",
  "severity": "ERROR",
  "labels": {
    "compute.googleapis.com/resource_type": "instance",
    "dataflow.googleapis.com/log_type": "system",
    "compute.googleapis.com/resource_id": "4695846446965678007",
    "dataflow.googleapis.com/job_name": "my-job-name",
    "dataflow.googleapis.com/job_id": "2021-11-06_12_10_27-510057810808146686",
    "dataflow.googleapis.com/region": "us-central1",
    "dataflow.googleapis.com/service_option": "prime",
    "compute.googleapis.com/resource_name": 
"df-hvm-my-job-name-11061310-qn51-harness-jb9f"
  },
  "logName": "projects/my-project/logs/dataflow.googleapis.com%2Fkubelet",
  "receiveTimestamp": "2021-11-06T20:14:46.471285909Z"
}
```

Am I using resource hints on the transform incorrectly?


  [1]: https://cloud.google.com/dataflow/docs/guides/enable-dataflow-prime
  [2]: https://cloud.google.com/dataflow/docs/guides/right-fitting


> Dataflow Prime job fails when providing resource hints on a transform
> ---------------------------------------------------------------------
>
>                 Key: BEAM-13225
>                 URL: https://issues.apache.org/jira/browse/BEAM-13225
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow, sdk-java-core
>    Affects Versions: 2.32.0
>            Reporter: Brent Worden
>            Priority: P2
>
> I have a classic Dataflow template written using the Apache Beam Java SDK 
> v2.32.0.  The template simply consumes messages from a Pub/Sub subscription 
> and writes them to Google Cloud Storage.
> The template can successfully be used to run jobs with [Dataflow 
> Prime|https://cloud.google.com/dataflow/docs/guides/enable-dataflow-prime] 
> experimental features enabled through {{\-\-additional-experiments 
> enable_prime}} and providing a pipeline level resource hint using 
> {{\-\-parameters=resourceHints=min_ram=8GiB}}:
> {code}
> gcloud dataflow jobs run my-job-name \
>   --additional-experiments enable_prime \
>   --disable-public-ips \
>   --gcs-location gs://bucket/path/to/template \
>   --num-workers 1  \
>   --max-workers 16 \
>   --parameters=resourceHints=min_ram=8GiB,other_pipeline_options=true \
>   --project my-project \
>   --region us-central1 \
>   --service-account-email 
> [email protected] \
>   --staging-location gs://bucket/path/to/staging
>   --subnetwork 
> https://www.googleapis.com/compute/v1/projects/my-project/regions/us-central1/subnetworks/my-subnet
> {code}
> In an attempt to use Dataflow Prime's [Right 
> Fitting|https://cloud.google.com/dataflow/docs/guides/right-fitting] 
> capability, I change the pipeline code to include a resource hint on the 
> FileIO transform:
> {code}
> class WriteGcsFileTransform
>     extends PTransform<PCollection<Input>, WriteFilesResult<Destination>> {
>   private static final long serialVersionUID = 1L;
>   @Override
>   public WriteFilesResult<Destination> expand(PCollection<Input> input) {
>     return input.apply(
>         FileIO.<Destination, Input>writeDynamic()
>             .by(myDynamicDestinationFunction)
>             .withDestinationCoder(Destination.coder())
>             .withNumShards(8)
>             .withNaming(myDestinationFileNamingFunction)
>             .withTempDirectory("gs://bucket/path/to/temp")
>             .withCompression(Compression.GZIP)
>             .setResourceHints(ResourceHints.create().withMinRam("32GiB"))
>         );
>   }
> {code}
> Attempting to run jobs from a template based on the new code results in a 
> continuous crash loop with the job never successfully running.  The lone 
> repeated error log entry is:
> {code}
> {
>   "insertId": 
> "s=97e1ecd30e0243609d555685318325b4;i=4e1;b=6c7f5d65f3994eada5f20672dab1daf1;m=912f16c;t=5d024689cb030;x=b36751718b3d80c1",
>   "jsonPayload": {
>     "line": "pod_workers.go:191",
>     "message": "Error syncing pod 4cf7cbf98df4b5e2d054abce7da1262b 
> (\"df-df-hvm-my-job-name-11061310-qn51-harness-jb9f_default(4cf7c6bf982df4b5eb2d054abce7da12)\"),
>  skipping: failed to \"StartContainer\" for \"artifact\" with 
> CrashLoopBackOff: \"back-off 40s restarting failed container=artifact 
> pod=df-df-hvm-my-job-name-11061310-qn51-harness-jb9f_default(4cf7c6bf982df4b5eb2d054abce7da12)\"",
>     "thread": "807"
>   },
>   "resource": {
>     "type": "dataflow_step",
>     "labels": {
>       "project_id": "my-project",
>       "region": "us-central1",
>       "step_id": "",
>       "job_id": "2021-11-06_12_10_27-510057810808146686",
>       "job_name": "my-job-name"
>     }
>   },
>   "timestamp": "2021-11-06T20:14:36.052491Z",
>   "severity": "ERROR",
>   "labels": {
>     "compute.googleapis.com/resource_type": "instance",
>     "dataflow.googleapis.com/log_type": "system",
>     "compute.googleapis.com/resource_id": "4695846446965678007",
>     "dataflow.googleapis.com/job_name": "my-job-name",
>     "dataflow.googleapis.com/job_id": 
> "2021-11-06_12_10_27-510057810808146686",
>     "dataflow.googleapis.com/region": "us-central1",
>     "dataflow.googleapis.com/service_option": "prime",
>     "compute.googleapis.com/resource_name": 
> "df-hvm-my-job-name-11061310-qn51-harness-jb9f"
>   },
>   "logName": "projects/my-project/logs/dataflow.googleapis.com%2Fkubelet",
>   "receiveTimestamp": "2021-11-06T20:14:46.471285909Z"
> }
> {code}
> If the pipeline level resources hints and step level resources hint are both 
> set to 8GiB, the pipeline fails with the same repetitive error.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to