This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 02ac93ed697 Add DiskProvisionedIops/ThroughputMibps pipeline options 
for the Python SDK (#38370)
02ac93ed697 is described below

commit 02ac93ed6977b52be21c9aef2924b6451a18c8c9
Author: bambadiouf1 <[email protected]>
AuthorDate: Thu May 7 14:13:48 2026 -0700

    Add DiskProvisionedIops/ThroughputMibps pipeline options for the Python SDK 
(#38370)
    
    * Restore Java and Go changes for disk provisioned IOPS and throughput
    
    * Add CHANGES.md entry for disk provisioned IOPS and throughput
    
    * restore go changes
    
    * initialize options map in dataflow job to prevent nil pointer exceptions
    
    * go fmt
    
    * add testDiskProvisionedOptionsConfig unit test
    
    * Update pr id in changes.md
    
    * add disk_provisioned_iops and disk_provisioned_throughput_mibps options 
to  pipeline configuration
    
    * add Python support to disk IOPS and throughput pipeline options in 
CHANGES.md
    
    * revert manual changes
    
    * ran gen_client
    
    * gen client
    
    * trigger postcommit_python
    
    * Delete R74
    
    * undo gen_client deletion
    
    * delete change to trigger postcom
---
 CHANGES.md                                         |  3 +-
 .../python/apache_beam/options/pipeline_options.py | 18 ++++
 .../apache_beam/options/pipeline_options_test.py   |  6 ++
 .../runners/dataflow/internal/apiclient.py         |  5 ++
 .../runners/dataflow/internal/apiclient_test.py    | 17 ++++
 .../clients/dataflow/dataflow_v1b3_messages.py     | 99 +++++++++++++---------
 6 files changed, 104 insertions(+), 44 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 922c38ffef3..0db7fddba4f 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -70,8 +70,7 @@
 
 ## New Features / Improvements
 
-* Added support for setting disk provisioned IOPS and throughput in Dataflow 
runner via `--diskProvisionedIops` and `--diskProvisionedThroughputMibps` 
pipeline options (Java/Go) 
([#38349](https://github.com/apache/beam/issues/38349)).
-* X feature added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
+* Added support for setting disk provisioned IOPS and throughput in Dataflow 
runner via `--diskProvisionedIops` and `--diskProvisionedThroughputMibps` 
pipeline options (Java/Go/Python) 
([#38349](https://github.com/apache/beam/issues/38349)).
 * TriggerStateMachineRunner changes from BitSetCoder to SentinelBitSetCoder to
   encode finished bitset. SentinelBitSetCoder and BitSetCoder are state
   compatible. Both coders can decode encoded bytes from the other coder
diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index a79bddb21ab..265313cd013 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -1402,6 +1402,24 @@ class WorkerOptions(PipelineOptions):
         dest='disk_type',
         default=None,
         help=('Specifies what type of persistent disk should be used.'))
+    parser.add_argument(
+        '--disk_provisioned_iops',
+        type=int,
+        default=None,
+        dest='disk_provisioned_iops',
+        help=(
+            'The provisioned IOPS of the disk. If not set, the Dataflow 
service'
+            ' will choose a reasonable default.'),
+    )
+    parser.add_argument(
+        '--disk_provisioned_throughput_mibps',
+        type=int,
+        default=None,
+        dest='disk_provisioned_throughput_mibps',
+        help=(
+            'The provisioned throughput of the disk in MiB/s. If not set, the'
+            ' Dataflow service will choose a reasonable default.'),
+    )
     parser.add_argument(
         '--worker_region',
         default=None,
diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py 
b/sdks/python/apache_beam/options/pipeline_options_test.py
index 215c44156ea..901f56b99cb 100644
--- a/sdks/python/apache_beam/options/pipeline_options_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_test.py
@@ -444,12 +444,18 @@ class PipelineOptionsTest(unittest.TestCase):
         'abc',
         '--disk_type',
         'def',
+        '--disk_provisioned_iops',
+        '4000',
+        '--disk_provisioned_throughput_mibps',
+        '200',
         '--element_processing_timeout_minutes',
         '10',
     ])
     worker_options = options.view_as(WorkerOptions)
     self.assertEqual(worker_options.machine_type, 'abc')
     self.assertEqual(worker_options.disk_type, 'def')
+    self.assertEqual(worker_options.disk_provisioned_iops, 4000)
+    self.assertEqual(worker_options.disk_provisioned_throughput_mibps, 200)
     self.assertEqual(worker_options.element_processing_timeout_minutes, 10)
 
     options = PipelineOptions(
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 097523a5131..4a7c61901de 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -208,6 +208,11 @@ class Environment(object):
       pool.diskSizeGb = self.worker_options.disk_size_gb
     if self.worker_options.disk_type:
       pool.diskType = self.worker_options.disk_type
+    if self.worker_options.disk_provisioned_iops is not None:
+      pool.diskProvisionedIops = self.worker_options.disk_provisioned_iops
+    if self.worker_options.disk_provisioned_throughput_mibps is not None:
+      pool.diskProvisionedThroughputMibps = (
+          self.worker_options.disk_provisioned_throughput_mibps)
     if self.worker_options.zone:
       pool.zone = self.worker_options.zone
     if self.worker_options.network:
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 43f51d0b39f..909ce9fd117 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -644,6 +644,23 @@ class UtilTest(unittest.TestCase):
                                 FAKE_PIPELINE_URL)
     self.assertEqual(env.proto.workerPools[0].numThreadsPerWorker, 2)
 
+  def test_disk_provisioning_options(self):
+    pipeline_options = PipelineOptions([
+        '--temp_location',
+        'gs://any-location/temp',
+        '--disk_provisioned_iops',
+        '4000',
+        '--disk_provisioned_throughput_mibps',
+        '200'
+    ])
+    env = apiclient.Environment([],
+                                pipeline_options,
+                                '2.0.0',
+                                FAKE_PIPELINE_URL)
+    self.assertEqual(env.proto.workerPools[0].diskProvisionedIops, 4000)
+    self.assertEqual(
+        env.proto.workerPools[0].diskProvisionedThroughputMibps, 200)
+
   @mock.patch(
       'apache_beam.runners.dataflow.internal.apiclient.'
       'beam_version.__version__',
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py
 
b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py
index 0c096e73c1a..582fb30b57b 100644
--- 
a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py
+++ 
b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py
@@ -2634,8 +2634,8 @@ class FlexTemplateRuntimeEnvironment(_messages.Message):
     ipConfiguration: Configuration for VM IPs.
     kmsKeyName: Name for the Cloud KMS key for the job. Key format is:
       projects//locations//keyRings//cryptoKeys/
-    launcherMachineType: The machine type to use for launching the job. The
-      default is n1-standard-1.
+    launcherMachineType: The machine type to use for launching the job. If not
+      set, Dataflow will select a default machine type.
     machineType: The machine type to use for the job. Defaults to the value
       from the template if not specified.
     maxWorkers: The maximum number of Google Compute Engine instances to be
@@ -3209,6 +3209,7 @@ class Job(_messages.Message):
       attempts to create a job with the same name as an active job that
       already exists, the attempt returns the existing job. The name must
       match the regular expression `[a-z]([-a-z0-9]{0,1022}[a-z0-9])?`
+    pausable: Output only. Indicates whether the job can be paused.
     pipelineDescription: Preliminary field: The format of this data may change
       at any time. A description of the user pipeline and stages through which
       it is executed. Created by Cloud Dataflow service. Only retrieved with
@@ -3498,22 +3499,23 @@ class Job(_messages.Message):
   labels = _messages.MessageField('LabelsValue', 10)
   location = _messages.StringField(11)
   name = _messages.StringField(12)
-  pipelineDescription = _messages.MessageField('PipelineDescription', 13)
-  projectId = _messages.StringField(14)
-  replaceJobId = _messages.StringField(15)
-  replacedByJobId = _messages.StringField(16)
-  requestedState = _messages.EnumField('RequestedStateValueValuesEnum', 17)
-  runtimeUpdatableParams = _messages.MessageField('RuntimeUpdatableParams', 18)
-  satisfiesPzi = _messages.BooleanField(19)
-  satisfiesPzs = _messages.BooleanField(20)
-  serviceResources = _messages.MessageField('ServiceResources', 21)
-  stageStates = _messages.MessageField('ExecutionStageState', 22, 
repeated=True)
-  startTime = _messages.StringField(23)
-  steps = _messages.MessageField('Step', 24, repeated=True)
-  stepsLocation = _messages.StringField(25)
-  tempFiles = _messages.StringField(26, repeated=True)
-  transformNameMapping = _messages.MessageField('TransformNameMappingValue', 
27)
-  type = _messages.EnumField('TypeValueValuesEnum', 28)
+  pausable = _messages.BooleanField(13)
+  pipelineDescription = _messages.MessageField('PipelineDescription', 14)
+  projectId = _messages.StringField(15)
+  replaceJobId = _messages.StringField(16)
+  replacedByJobId = _messages.StringField(17)
+  requestedState = _messages.EnumField('RequestedStateValueValuesEnum', 18)
+  runtimeUpdatableParams = _messages.MessageField('RuntimeUpdatableParams', 19)
+  satisfiesPzi = _messages.BooleanField(20)
+  satisfiesPzs = _messages.BooleanField(21)
+  serviceResources = _messages.MessageField('ServiceResources', 22)
+  stageStates = _messages.MessageField('ExecutionStageState', 23, 
repeated=True)
+  startTime = _messages.StringField(24)
+  steps = _messages.MessageField('Step', 25, repeated=True)
+  stepsLocation = _messages.StringField(26)
+  tempFiles = _messages.StringField(27, repeated=True)
+  transformNameMapping = _messages.MessageField('TransformNameMappingValue', 
28)
+  type = _messages.EnumField('TypeValueValuesEnum', 29)
 
 
 class JobExecutionDetails(_messages.Message):
@@ -5342,8 +5344,14 @@ class RuntimeUpdatableParams(_messages.Message):
   during job creation.
 
   Fields:
-    acceptableBacklogDuration: Optional. The backlog threshold duration in
-      seconds for autoscaling. Value must be non-negative.
+    acceptableBacklogDuration: Optional. Deprecated: Use `latency_tier`
+      instead. The backlog threshold duration in seconds for autoscaling.
+      Value must be non-negative.
+    autoscalingTier: Optional. Deprecated: Use `latency_tier` instead. The
+      backlog threshold tier for autoscaling. Value must be one of "low-
+      latency", "medium-latency", or "high-latency".
+    latencyTier: Optional. The backlog threshold tier for autoscaling. Value
+      must be one of "low-latency", "medium-latency", or "high-latency".
     maxNumWorkers: The maximum number of workers to cap autoscaling at. This
       field is currently only supported for Streaming Engine jobs.
     minNumWorkers: The minimum number of workers to scale down to. This field
@@ -5357,9 +5365,11 @@ class RuntimeUpdatableParams(_messages.Message):
   """
 
   acceptableBacklogDuration = _messages.StringField(1)
-  maxNumWorkers = _messages.IntegerField(2, variant=_messages.Variant.INT32)
-  minNumWorkers = _messages.IntegerField(3, variant=_messages.Variant.INT32)
-  workerUtilizationHint = _messages.FloatField(4)
+  autoscalingTier = _messages.StringField(2)
+  latencyTier = _messages.StringField(3)
+  maxNumWorkers = _messages.IntegerField(4, variant=_messages.Variant.INT32)
+  minNumWorkers = _messages.IntegerField(5, variant=_messages.Variant.INT32)
+  workerUtilizationHint = _messages.FloatField(6)
 
 
 class SDKInfo(_messages.Message):
@@ -7775,6 +7785,9 @@ class WorkerPool(_messages.Message):
     defaultPackageSet: The default package set to install. This allows the
       service to select a default set of packages which are useful to worker
       harnesses written in a particular language.
+    diskProvisionedIops: Optional. IOPS provisioned for the root disk for VMs.
+    diskProvisionedThroughputMibps: Optional. Throughput provisioned for the
+      root disk for VMs.
     diskSizeGb: Size of root disk for VMs, in GB. If zero or unspecified, the
       service will attempt to choose a reasonable default.
     diskSourceImage: Fully qualified source image for disks.
@@ -7938,25 +7951,27 @@ class WorkerPool(_messages.Message):
   autoscalingSettings = _messages.MessageField('AutoscalingSettings', 1)
   dataDisks = _messages.MessageField('Disk', 2, repeated=True)
   defaultPackageSet = _messages.EnumField('DefaultPackageSetValueValuesEnum', 
3)
-  diskSizeGb = _messages.IntegerField(4, variant=_messages.Variant.INT32)
-  diskSourceImage = _messages.StringField(5)
-  diskType = _messages.StringField(6)
-  ipConfiguration = _messages.EnumField('IpConfigurationValueValuesEnum', 7)
-  kind = _messages.StringField(8)
-  machineType = _messages.StringField(9)
-  metadata = _messages.MessageField('MetadataValue', 10)
-  network = _messages.StringField(11)
-  numThreadsPerWorker = _messages.IntegerField(12, 
variant=_messages.Variant.INT32)
-  numWorkers = _messages.IntegerField(13, variant=_messages.Variant.INT32)
-  onHostMaintenance = _messages.StringField(14)
-  packages = _messages.MessageField('Package', 15, repeated=True)
-  poolArgs = _messages.MessageField('PoolArgsValue', 16)
-  sdkHarnessContainerImages = 
_messages.MessageField('SdkHarnessContainerImage', 17, repeated=True)
-  subnetwork = _messages.StringField(18)
-  taskrunnerSettings = _messages.MessageField('TaskRunnerSettings', 19)
-  teardownPolicy = _messages.EnumField('TeardownPolicyValueValuesEnum', 20)
-  workerHarnessContainerImage = _messages.StringField(21)
-  zone = _messages.StringField(22)
+  diskProvisionedIops = _messages.IntegerField(4)
+  diskProvisionedThroughputMibps = _messages.IntegerField(5)
+  diskSizeGb = _messages.IntegerField(6, variant=_messages.Variant.INT32)
+  diskSourceImage = _messages.StringField(7)
+  diskType = _messages.StringField(8)
+  ipConfiguration = _messages.EnumField('IpConfigurationValueValuesEnum', 9)
+  kind = _messages.StringField(10)
+  machineType = _messages.StringField(11)
+  metadata = _messages.MessageField('MetadataValue', 12)
+  network = _messages.StringField(13)
+  numThreadsPerWorker = _messages.IntegerField(14, 
variant=_messages.Variant.INT32)
+  numWorkers = _messages.IntegerField(15, variant=_messages.Variant.INT32)
+  onHostMaintenance = _messages.StringField(16)
+  packages = _messages.MessageField('Package', 17, repeated=True)
+  poolArgs = _messages.MessageField('PoolArgsValue', 18)
+  sdkHarnessContainerImages = 
_messages.MessageField('SdkHarnessContainerImage', 19, repeated=True)
+  subnetwork = _messages.StringField(20)
+  taskrunnerSettings = _messages.MessageField('TaskRunnerSettings', 21)
+  teardownPolicy = _messages.EnumField('TeardownPolicyValueValuesEnum', 22)
+  workerHarnessContainerImage = _messages.StringField(23)
+  zone = _messages.StringField(24)
 
 
 class WorkerSettings(_messages.Message):

Reply via email to