This is an automated email from the ASF dual-hosted git repository.

ricardozanini pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-tools.git


The following commit(s) were added to refs/heads/main by this push:
     new 913d221a5a2 [sonataflow-operator] ]incubator-kie-tools-3471: Implement 
nativelly PDB support for SonataFlow workflows and Data Index service 
deployments (#3475)
913d221a5a2 is described below

commit 913d221a5a26cd072521492851b26a544aee978c
Author: Walter Medvedeo <[email protected]>
AuthorDate: Thu Mar 5 17:32:52 2026 +0100

    [sonataflow-operator] ]incubator-kie-tools-3471: Implement nativelly PDB 
support for SonataFlow workflows and Data Index service deployments (#3475)
---
 packages/sonataflow-operator/Makefile              |  14 +-
 .../api/v1alpha08/podtemplate_types.go             |  32 ++-
 .../api/v1alpha08/sonataflow_types.go              |   5 +
 .../api/v1alpha08/zz_generated.deepcopy.go         |  36 ++++
 packages/sonataflow-operator/config/rbac/role.yaml |  12 ++
 .../internal/controller/platform/autoscaling.go    |  73 -------
 .../internal/controller/platform/k8s.go            |  80 ++++++--
 .../controller/platform/services/services.go       |  22 +++
 .../controller/profiles/common/object_creators.go  |  21 ++
 .../profiles/preview/deployment_handler.go         |   9 +
 .../profiles/preview/disruption_budget_handler.go  | 105 ++++++++++
 .../internal/controller/sonataflow_controller.go   |  55 +++++-
 .../controller/sonataflowplatform_controller.go    |  75 +++++---
 .../internal/controller/workflowdef/utils.go       |  12 ++
 packages/sonataflow-operator/operator.yaml         |  93 +++++++++
 packages/sonataflow-operator/test/e2e/helpers.go   | 124 +++++++++++-
 .../sonataflow-operator/test/e2e/platform_test.go  | 141 ++++++++++++++
 .../01-postgres.yaml                               |  89 +++++++++
 .../02-sonataflow_platform.yaml                    |  66 +++++++
 .../kustomization.yaml                             |  34 ++++
 .../01-postgres.yaml                               |  89 +++++++++
 .../02-sonataflow_platform.yaml                    |  65 +++++++
 .../03-data-index-service-hpa.yaml                 |  41 ++++
 .../kustomization.yaml                             |  35 ++++
 .../sonataflow-operator/test/e2e/workflow_test.go  | 214 +++++++++++++++++++++
 .../utils/kubernetes/autoscaling.go                | 126 ++++++++++++
 .../utils/kubernetes/deployment.go                 |  12 ++
 .../utils/kubernetes/disruption_budget.go          |  69 +++++++
 28 files changed, 1628 insertions(+), 121 deletions(-)

diff --git a/packages/sonataflow-operator/Makefile 
b/packages/sonataflow-operator/Makefile
index 306887d378d..bf7ac63d3a7 100644
--- a/packages/sonataflow-operator/Makefile
+++ b/packages/sonataflow-operator/Makefile
@@ -423,7 +423,7 @@ generate-all: generate generate-deploy bundle
        @$(MAKE) fmt
 
 .PHONY: test-e2e # You will need to have a Minikube/Kind cluster up and 
running to run this target, and run container-builder before the test
-label = "flows-ephemeral" # possible values are flows-ephemeral, 
flows-persistence, flows-monitoring, flows-hpa, platform, cluster
+label = "flows-ephemeral" # possible values are flows-ephemeral, 
flows-persistence, flows-monitoring, flows-hpa, flows-pdb-with-hpa, flows-pdb, 
platform, cluster
 test-e2e:
 ifeq ($(label), cluster)
        @echo "🌐 Running e2e tests for cluster..."
@@ -455,8 +455,18 @@ else ifeq ($(label), flows-hpa)
        go test ./test/e2e/e2e_suite_test.go ./test/e2e/helpers.go 
./test/e2e/workflow_test.go \
        -v -ginkgo.v -ginkgo.no-color -ginkgo.github-output 
-ginkgo.label-filter=$(label) \
        -ginkgo.junit-report=./e2e-test-report-workflow_test.xml -timeout 60m 
KUSTOMIZE=$(KUSTOMIZE);
+else ifeq ($(label), flows-pdb)
+       @echo "🔁 Running e2e tests for flows-pdb..."
+       go test ./test/e2e/e2e_suite_test.go ./test/e2e/helpers.go 
./test/e2e/workflow_test.go \
+       -v -ginkgo.v -ginkgo.no-color -ginkgo.github-output 
-ginkgo.label-filter=$(label) \
+       -ginkgo.junit-report=./e2e-test-report-workflow_test.xml -timeout 60m 
KUSTOMIZE=$(KUSTOMIZE);
+else ifeq ($(label), flows-pdb-with-hpa)
+       @echo "🔁 Running e2e tests for flows-pdb-with-hpa..."
+       go test ./test/e2e/e2e_suite_test.go ./test/e2e/helpers.go 
./test/e2e/workflow_test.go \
+       -v -ginkgo.v -ginkgo.no-color -ginkgo.github-output 
-ginkgo.label-filter=$(label) \
+       -ginkgo.junit-report=./e2e-test-report-workflow_test.xml -timeout 60m 
KUSTOMIZE=$(KUSTOMIZE);
 else
-       @echo "❌  Invalid label. Please use one of: cluster, platform, 
flows-ephemeral, flows-persistence, flows-monitoring, flows-hpa"
+       @echo "❌  Invalid label. Please use one of: cluster, platform, 
flows-ephemeral, flows-persistence, flows-monitoring, flows-hpa, flows-pdb, 
flows-pdb-with-hpa"
 endif
 
 .PHONY: full-test-e2e
diff --git a/packages/sonataflow-operator/api/v1alpha08/podtemplate_types.go 
b/packages/sonataflow-operator/api/v1alpha08/podtemplate_types.go
index 99e7f9335e3..574dbedbc2f 100644
--- a/packages/sonataflow-operator/api/v1alpha08/podtemplate_types.go
+++ b/packages/sonataflow-operator/api/v1alpha08/podtemplate_types.go
@@ -17,7 +17,10 @@
 
 package v1alpha08
 
-import corev1 "k8s.io/api/core/v1"
+import (
+       corev1 "k8s.io/api/core/v1"
+       "k8s.io/apimachinery/pkg/util/intstr"
+)
 
 // ContainerSpec is the container for the internal deployments based on the 
default Kubernetes Container API
 type ContainerSpec struct {
@@ -532,9 +535,27 @@ func (f *PodSpec) ToPodSpec() corev1.PodSpec {
        }
 }
 
-// PodTemplateSpec describes the desired custom Kubernetes PodTemplate 
definition for the deployed flow or service.
+// PodDisruptionBudgetSpec describes the Kubernetes pod disruption budget 
configuration for the SonataFlow and supporting
+// services pods.
+type PodDisruptionBudgetSpec struct {
+       // An eviction is allowed if at least "minAvailable" pods selected by
+       // "selector" will still be available after the eviction, i.e. even in 
the
+       // absence of the evicted pod.  So for example you can prevent all 
voluntary
+       // evictions by specifying "100%". This is a mutually exclusive setting 
with "maxUnavailable".
+       // +optional
+       MinAvailable *intstr.IntOrString `json:"minAvailable,omitempty" 
protobuf:"bytes,1,opt,name=minAvailable"`
+
+       // An eviction is allowed if at most "maxUnavailable" pods selected by
+       // "selector" are unavailable after the eviction, i.e. even in absence 
of
+       // the evicted pod. For example, one can prevent all voluntary evictions
+       // by specifying 0. This is a mutually exclusive setting with 
"minAvailable".
+       // +optional
+       MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty" 
protobuf:"bytes,3,opt,name=maxUnavailable"`
+}
+
+// PodTemplateSpec describes the desired custom Kubernetes PodTemplate 
definition for a service.
 //
-// The ContainerSpec describes the container where the actual flow or service 
is running. It will override any default definitions.
+// The ContainerSpec describes the container where the service is running. It 
will override any default definitions.
 // For example, to override the image one can use 
`.spec.podTemplate.container.image = my/image:tag`.
 type PodTemplateSpec struct {
        // Container is the Kubernetes container where the application should 
run.
@@ -545,4 +566,9 @@ type PodTemplateSpec struct {
        PodSpec `json:",inline"`
        // +optional
        Replicas *int32 `json:"replicas,omitempty"`
+       // Defines the Kubernetes PodDisruptionBudgetSpec for this service. 
When configured, the SonataFlowPlatform controller
+       // will automatically create a PodDisruptionBudget based on this 
specification that targets the service Deployment.
+       // Currently only apply for the Data Index.
+       // +optional
+       PodDisruptionBudget *PodDisruptionBudgetSpec 
`json:"podDisruptionBudget,omitempty"`
 }
diff --git a/packages/sonataflow-operator/api/v1alpha08/sonataflow_types.go 
b/packages/sonataflow-operator/api/v1alpha08/sonataflow_types.go
index eb204d1553d..ca959549d29 100644
--- a/packages/sonataflow-operator/api/v1alpha08/sonataflow_types.go
+++ b/packages/sonataflow-operator/api/v1alpha08/sonataflow_types.go
@@ -56,6 +56,11 @@ type FlowPodTemplateSpec struct {
        // Defines the kind of deployment model for this pod spec. In dev 
profile, only "kubernetes" is valid.
        // +optional
        DeploymentModel DeploymentModel `json:"deploymentModel,omitempty"`
+       // Defines the Kubernetes PodDisruptionBudgetSpec for this workflow. 
When configured, the SonataFlow controller will
+       // automatically create a PodDisruptionBudget based on this 
specification that targets the workflow Deployment.
+       // Ignored in "knative" deployment model, and dev profile workflows.
+       // +optional
+       PodDisruptionBudget *PodDisruptionBudgetSpec 
`json:"podDisruptionBudget,omitempty"`
 }
 
 // Flow describes the contents of the Workflow definition following the CNCF 
Serverless Workflow Specification.
diff --git 
a/packages/sonataflow-operator/api/v1alpha08/zz_generated.deepcopy.go 
b/packages/sonataflow-operator/api/v1alpha08/zz_generated.deepcopy.go
index 22faee96338..bf8c458e524 100644
--- a/packages/sonataflow-operator/api/v1alpha08/zz_generated.deepcopy.go
+++ b/packages/sonataflow-operator/api/v1alpha08/zz_generated.deepcopy.go
@@ -28,6 +28,7 @@ import (
        v1 "k8s.io/api/core/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/util/intstr"
        "knative.dev/pkg/apis"
        duckv1 "knative.dev/pkg/apis/duck/v1"
 )
@@ -354,6 +355,11 @@ func (in *FlowPodTemplateSpec) DeepCopyInto(out 
*FlowPodTemplateSpec) {
                *out = new(int32)
                **out = **in
        }
+       if in.PodDisruptionBudget != nil {
+               in, out := &in.PodDisruptionBudget, &out.PodDisruptionBudget
+               *out = new(PodDisruptionBudgetSpec)
+               (*in).DeepCopyInto(*out)
+       }
 }
 
 // DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new FlowPodTemplateSpec.
@@ -549,6 +555,31 @@ func (in *PlatformServicesStatus) DeepCopy() 
*PlatformServicesStatus {
        return out
 }
 
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
+func (in *PodDisruptionBudgetSpec) DeepCopyInto(out *PodDisruptionBudgetSpec) {
+       *out = *in
+       if in.MinAvailable != nil {
+               in, out := &in.MinAvailable, &out.MinAvailable
+               *out = new(intstr.IntOrString)
+               **out = **in
+       }
+       if in.MaxUnavailable != nil {
+               in, out := &in.MaxUnavailable, &out.MaxUnavailable
+               *out = new(intstr.IntOrString)
+               **out = **in
+       }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new PodDisruptionBudgetSpec.
+func (in *PodDisruptionBudgetSpec) DeepCopy() *PodDisruptionBudgetSpec {
+       if in == nil {
+               return nil
+       }
+       out := new(PodDisruptionBudgetSpec)
+       in.DeepCopyInto(out)
+       return out
+}
+
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
 func (in *PodSpec) DeepCopyInto(out *PodSpec) {
        *out = *in
@@ -722,6 +753,11 @@ func (in *PodTemplateSpec) DeepCopyInto(out 
*PodTemplateSpec) {
                *out = new(int32)
                **out = **in
        }
+       if in.PodDisruptionBudget != nil {
+               in, out := &in.PodDisruptionBudget, &out.PodDisruptionBudget
+               *out = new(PodDisruptionBudgetSpec)
+               (*in).DeepCopyInto(*out)
+       }
 }
 
 // DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new PodTemplateSpec.
diff --git a/packages/sonataflow-operator/config/rbac/role.yaml 
b/packages/sonataflow-operator/config/rbac/role.yaml
index f5bc2d75e7b..845fe910a07 100644
--- a/packages/sonataflow-operator/config/rbac/role.yaml
+++ b/packages/sonataflow-operator/config/rbac/role.yaml
@@ -52,6 +52,18 @@ rules:
       - list
       - update
       - watch
+  - apiGroups:
+      - policy
+    resources:
+      - poddisruptionbudgets
+    verbs:
+      - create
+      - delete
+      - get
+      - list
+      - patch
+      - update
+      - watch
   - apiGroups:
       - serving.knative.dev
     resources:
diff --git 
a/packages/sonataflow-operator/internal/controller/platform/autoscaling.go 
b/packages/sonataflow-operator/internal/controller/platform/autoscaling.go
deleted file mode 100644
index 2a4dad03109..00000000000
--- a/packages/sonataflow-operator/internal/controller/platform/autoscaling.go
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package platform
-
-import (
-       "context"
-
-       v1 "k8s.io/api/core/v1"
-       "k8s.io/klog/v2"
-
-       autoscalingv2 "k8s.io/api/autoscaling/v2"
-       "sigs.k8s.io/controller-runtime/pkg/client"
-
-       "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log"
-)
-
-// findHPAForDeployment returns the HorizontalPodAutoscaler targeting a 
deployment in a given namespace, or nil if it
-// doesn't exist.
-// Note: By k8s definition, the HorizontalPodAutoscaler must belong to the 
same namespace as the managed deployment.
-func findHPAForDeployment(ctx context.Context, c client.Client, namespace 
string, name string) (*autoscalingv2.HorizontalPodAutoscaler, error) {
-       klog.V(log.D).Infof("Querying HorizontalPodAutoscalers in namespace: 
%s", namespace)
-       var hpaList autoscalingv2.HorizontalPodAutoscalerList
-       if err := c.List(ctx, &hpaList, client.InNamespace(namespace)); err != 
nil {
-               return nil, err
-       }
-       klog.V(log.D).Infof("Total number of returned HorizontalPodAutoscalers 
is: %d", len(hpaList.Items))
-       for _, hpa := range hpaList.Items {
-               ref := hpa.Spec.ScaleTargetRef
-               klog.V(log.D).Infof("Checking if HorizontalPodAutoscaler name: 
%s, ref.Kind: %s, ref.Name: %s, ref.APIVersion: %s, targets deployment: %s.", 
hpa.Name, ref.Kind, ref.Name, ref.APIVersion, name)
-               if ref.Kind == "Deployment" && ref.Name == name && 
ref.APIVersion == "apps/v1" {
-                       klog.V(log.D).Infof("HorizontalPodAutoscaler name: %s 
targets deployment: %s.", hpa.Name, name)
-                       return &hpa, nil
-               }
-       }
-       klog.V(log.D).Infof("No HorizontalPodAutoscaler targets deployment: %s 
in namespace: %s.", name, namespace)
-       return nil, nil
-}
-
-// hpaIsActive returns true if the HorizontalPodAutoscaler is active.
-func hpaIsActive(hpa *autoscalingv2.HorizontalPodAutoscaler) bool {
-       klog.V(log.D).Infof("Checking if HorizontalPodAutoscaler is Active.")
-       for _, cond := range hpa.Status.Conditions {
-               klog.V(log.D).Infof("Checking Status condition type: %s, %s.", 
cond.Type, cond.Status)
-               if cond.Type == autoscalingv2.ScalingActive {
-                       return cond.Status == v1.ConditionTrue
-               }
-       }
-       return false
-}
-
-// hpaIsWorking returns true if the HorizontalPodAutoscaler has started to 
take care of the scaling for the
-// corresponding target ref. At this point, our controllers must transfer the 
control to the HorizontalPodAutoscaler
-// and let it manage the replicas.
-func hpaIsWorking(hpa *autoscalingv2.HorizontalPodAutoscaler) bool {
-       return hpaIsActive(hpa) || hpa.Status.DesiredReplicas > 0
-}
diff --git a/packages/sonataflow-operator/internal/controller/platform/k8s.go 
b/packages/sonataflow-operator/internal/controller/platform/k8s.go
index 71a65e08146..753416d9553 100644
--- a/packages/sonataflow-operator/internal/controller/platform/k8s.go
+++ b/packages/sonataflow-operator/internal/controller/platform/k8s.go
@@ -24,6 +24,7 @@ import (
        "fmt"
 
        v2 "k8s.io/api/autoscaling/v2"
+       policyv1 "k8s.io/api/policy/v1"
 
        
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/version"
 
@@ -103,19 +104,25 @@ func (action *serviceAction) Handle(ctx context.Context, 
platform *operatorapi.S
 }
 
 func createOrUpdateServiceComponents(ctx context.Context, client 
client.Client, platform *operatorapi.SonataFlowPlatform, psh 
services.PlatformServiceHandler) (*corev1.Event, error) {
-       if err := createOrUpdateConfigMap(ctx, client, platform, psh); err != 
nil {
+       var deployment *appsv1.Deployment
+       var hpa *v2.HorizontalPodAutoscaler
+       var err error
+       if err = createOrUpdateConfigMap(ctx, client, platform, psh); err != 
nil {
                return nil, err
        }
-       if err := createOrUpdateDeployment(ctx, client, platform, psh); err != 
nil {
+       if deployment, hpa, err = createOrUpdateDeployment(ctx, client, 
platform, psh); err != nil {
                return nil, err
        }
-       if err := createOrUpdateService(ctx, client, platform, psh); err != nil 
{
+       if err = createOrUpdatePDB(ctx, client, platform, psh, deployment, 
hpa); err != nil {
+               return nil, err
+       }
+       if err = createOrUpdateService(ctx, client, platform, psh); err != nil {
                return nil, err
        }
        return createOrUpdateKnativeResources(ctx, client, platform, psh)
 }
 
-func createOrUpdateDeployment(ctx context.Context, client client.Client, 
platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) 
error {
+func createOrUpdateDeployment(ctx context.Context, client client.Client, 
platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) 
(*appsv1.Deployment, *v2.HorizontalPodAutoscaler, error) {
        readyProbe := &corev1.Probe{
                ProbeHandler: corev1.ProbeHandler{
                        HTTPGet: &corev1.HTTPGetAction{
@@ -157,7 +164,7 @@ func createOrUpdateDeployment(ctx context.Context, client 
client.Client, platfor
        serviceContainer = psh.ConfigurePersistence(serviceContainer)
        serviceContainer, err := psh.MergeContainerSpec(serviceContainer)
        if err != nil {
-               return err
+               return nil, nil, err
        }
 
        // immutable
@@ -165,15 +172,15 @@ func createOrUpdateDeployment(ctx context.Context, client 
client.Client, platfor
 
        var hpa *v2.HorizontalPodAutoscaler = nil
        if psh.AcceptsHPA() {
-               hpa, err = findHPAForDeployment(ctx, utils.GetClient(), 
platform.Namespace, psh.GetServiceName())
+               hpa, err = kubeutil.FindHPAForDeployment(ctx, 
utils.GetClient(), platform.Namespace, psh.GetServiceName())
                if err != nil {
-                       return fmt.Errorf("failed to find a potential 
HorizontalPodAutoscaler for deployment %s/%s: %v", platform.Namespace, 
psh.GetServiceName(), err)
+                       return nil, nil, fmt.Errorf("failed to find a potential 
HorizontalPodAutoscaler for deployment %s/%s: %v", platform.Namespace, 
psh.GetServiceName(), err)
                }
                klog.V(log.D).Infof("HorizontalPodAutoscaler exists for 
deployment %s/%s: %t.", platform.Namespace, psh.GetServiceName(), hpa != nil)
        }
        kSinkInjected, err := psh.CheckKSinkInjected()
        if err != nil {
-               return nil
+               return nil, nil, err
        }
        lbl, selectorLbl := getLabels(platform, psh)
        serviceDeploymentSpec := appsv1.DeploymentSpec{
@@ -204,7 +211,7 @@ func createOrUpdateDeployment(ctx context.Context, client 
client.Client, platfor
 
        serviceDeploymentSpec.Template.Spec, err = 
psh.MergePodSpec(serviceDeploymentSpec.Template.Spec)
        if err != nil {
-               return err
+               return nil, nil, err
        }
        kubeutil.AddOrReplaceContainer(serviceContainer.Name, 
*serviceContainer, &serviceDeploymentSpec.Template.Spec)
 
@@ -215,7 +222,7 @@ func createOrUpdateDeployment(ctx context.Context, client 
client.Client, platfor
                        Labels:    lbl,
                }}
        if err := controllerutil.SetControllerReference(platform, 
serviceDeployment, client.Scheme()); err != nil {
-               return err
+               return nil, nil, err
        }
 
        // Create or Update the deployment
@@ -224,7 +231,7 @@ func createOrUpdateDeployment(ctx context.Context, client 
client.Client, platfor
                err := mergo.Merge(&(serviceDeployment.Spec), 
serviceDeploymentSpec, mergo.WithOverride)
                // mergo.Merge algorithm is not setting the 
serviceDeployment.Spec.Replicas when the
                // *serviceDeploymentSpec.Replicas is 0. Making impossible to 
scale to zero. Ensure the value.
-               if hpa == nil || !hpaIsWorking(hpa) || psh.GetReplicaCount() == 
0 {
+               if hpa == nil || !kubeutil.HPAIsWorking(hpa) || 
psh.GetReplicaCount() == 0 {
                        // Only when no HorizontalPodAutoscaler was created for 
current deployment, we should manage the replicas.
                        // Or, when the existing one did not wake up from a 
previous inactive period due to a replicas set to 0.
                        // In this last case, we should still let the 
controller the chance to set the replicas to wake up the 
HorizontalPodAutoscaler.
@@ -240,10 +247,59 @@ func createOrUpdateDeployment(ctx context.Context, client 
client.Client, platfor
                }
                return nil
        }); err != nil {
-               return err
+               return nil, nil, err
        } else {
                klog.V(log.I).InfoS("Deployment successfully reconciled", 
"operation", op)
        }
+       return serviceDeployment, hpa, nil
+}
+
+func createOrUpdatePDB(ctx context.Context, c client.Client, platform 
*operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler, 
deployment *appsv1.Deployment, hpa *v2.HorizontalPodAutoscaler) error {
+       if psh.AcceptsPDB() {
+               createOrUpdate := false
+               pdbSpec := psh.GetPDBSpec()
+               if !kubeutil.IsEmptyPodDisruptionBudgetSpec(pdbSpec) {
+                       if hpa != nil {
+                               // The HPA determines the replicas. Be sure 
that the service can't be later downscaled to a number of replicas that blocks 
a drain.
+                               // And also, that the user didn't voluntary 
scaled the service to 0.
+                               createOrUpdate = 
kubeutil.HPAMinReplicasIsGreaterThan(hpa, int32(1)) && 
!kubeutil.DeploymentIsScaledToZero(deployment)
+                       } else {
+                               // The just reconciled deployment replicas were 
already configured properly, we can rely on this number.
+                               // Be sure that the number of replicas don't 
block a drain.
+                               createOrUpdate = 
kubeutil.DeploymentReplicasIsGreaterThan(deployment, int32(1))
+                       }
+               }
+
+               if createOrUpdate {
+                       lbl, selectorLbl := getLabels(platform, psh)
+                       podDisruptionBudget := &policyv1.PodDisruptionBudget{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Name:      psh.GetServiceName(),
+                                       Namespace: platform.Namespace,
+                                       Labels:    lbl,
+                               },
+                       }
+                       if err := 
controllerutil.SetControllerReference(platform, podDisruptionBudget, 
c.Scheme()); err != nil {
+                               return err
+                       }
+                       if op, err := controllerutil.CreateOrUpdate(ctx, c, 
podDisruptionBudget, func() error {
+                               
kubeutil.ApplyPodDisruptionBudgetSpec(podDisruptionBudget, pdbSpec)
+                               podDisruptionBudget.Spec.Selector = 
&metav1.LabelSelector{
+                                       MatchLabels: selectorLbl,
+                               }
+                               return nil
+                       }); err != nil {
+                               return err
+                       } else {
+                               klog.V(log.I).Infof("PodDisruptionBudget %s/%s 
successfully reconciled, op: %s.", podDisruptionBudget.Namespace, 
podDisruptionBudget.Name, op)
+                       }
+               } else {
+                       // Delete a potentially existing PDB.
+                       if err := kubeutil.SafeDeletePodDisruptionBudget(ctx, 
c, platform.Namespace, psh.GetServiceName()); err != nil {
+                               return err
+                       }
+               }
+       }
        return nil
 }
 
diff --git 
a/packages/sonataflow-operator/internal/controller/platform/services/services.go
 
b/packages/sonataflow-operator/internal/controller/platform/services/services.go
index f596821f335..e0732bdc3dd 100644
--- 
a/packages/sonataflow-operator/internal/controller/platform/services/services.go
+++ 
b/packages/sonataflow-operator/internal/controller/platform/services/services.go
@@ -125,6 +125,12 @@ type PlatformServiceHandler interface {
 
        // AcceptsHPA returns true if the service accepts an external HPA 
configuration.
        AcceptsHPA() bool
+
+       // AcceptsPDB returns true if the service accepts the operator managed 
PDB generation.
+       AcceptsPDB() bool
+
+       // GetPDBSpec returns the configured PodDisruptionBudgetSpec for the 
given service.
+       GetPDBSpec() *operatorapi.PodDisruptionBudgetSpec
 }
 
 type DataIndexHandler struct {
@@ -135,6 +141,14 @@ func (d *DataIndexHandler) AcceptsHPA() bool {
        return true
 }
 
+func (d *DataIndexHandler) AcceptsPDB() bool {
+       return true
+}
+
+func (d *DataIndexHandler) GetPDBSpec() *operatorapi.PodDisruptionBudgetSpec {
+       return 
d.platform.Spec.Services.DataIndex.ServiceSpec.PodTemplate.PodDisruptionBudget
+}
+
 // GetDBMigrationStrategy returns DB migration approach
 func (d *DataIndexHandler) GetDBMigrationStrategy() 
operatorapi.DBMigrationStrategyType {
        return 
GetDBMigrationStrategy(d.platform.Spec.Services.DataIndex.Persistence)
@@ -354,6 +368,14 @@ func (d *JobServiceHandler) AcceptsHPA() bool {
        return false
 }
 
+func (d *JobServiceHandler) AcceptsPDB() bool {
+       return false
+}
+
+func (d *JobServiceHandler) GetPDBSpec() *operatorapi.PodDisruptionBudgetSpec {
+       return nil
+}
+
 // GetDBMigrationStrategy returns db migration approach otherwise
 func (j *JobServiceHandler) GetDBMigrationStrategy() 
operatorapi.DBMigrationStrategyType {
        return 
GetDBMigrationStrategy(j.platform.Spec.Services.JobService.Persistence)
diff --git 
a/packages/sonataflow-operator/internal/controller/profiles/common/object_creators.go
 
b/packages/sonataflow-operator/internal/controller/profiles/common/object_creators.go
index 2a88206ac31..c082b7960bf 100644
--- 
a/packages/sonataflow-operator/internal/controller/profiles/common/object_creators.go
+++ 
b/packages/sonataflow-operator/internal/controller/profiles/common/object_creators.go
@@ -24,6 +24,8 @@ import (
        "fmt"
        "strings"
 
+       policyv1 "k8s.io/api/policy/v1"
+
        
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles"
 
        servingv1 "knative.dev/serving/pkg/apis/serving/v1"
@@ -465,3 +467,22 @@ func ServiceMonitorCreator(workflow 
*operatorapi.SonataFlow) (client.Object, err
        }
        return serviceMonitor, nil
 }
+
+// PodDisruptionBudgetCreator creates a PodDisruptionBudget for workflow.
+func PodDisruptionBudgetCreator(workflow *operatorapi.SonataFlow) 
(client.Object, error) {
+       lbl := workflowproj.GetMergedLabels(workflow)
+       podDisruptionBudget := &policyv1.PodDisruptionBudget{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      workflow.Name,
+                       Namespace: workflow.Namespace,
+                       Labels:    lbl,
+               },
+               Spec: policyv1.PodDisruptionBudgetSpec{
+                       Selector: &metav1.LabelSelector{
+                               MatchLabels: 
workflowproj.GetSelectorLabels(workflow),
+                       },
+               },
+       }
+       kubeutil.ApplyPodDisruptionBudgetSpec(podDisruptionBudget, 
workflow.Spec.PodTemplate.PodDisruptionBudget)
+       return podDisruptionBudget, nil
+}
diff --git 
a/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler.go
 
b/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler.go
index 80031116d77..865242c7c6c 100644
--- 
a/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler.go
+++ 
b/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler.go
@@ -149,6 +149,15 @@ func (d *DeploymentReconciler) ensureObjects(ctx 
context.Context, workflow *oper
        }
 
        objs := []client.Object{deployment, managedPropsCM, service}
+
+       podDisruptionBudget, err := 
NewPodDisruptionBudgetHandler(d.StateSupport).Ensure(ctx, workflow)
+       if err != nil {
+               return reconcile.Result{}, nil, err
+       }
+       if podDisruptionBudget != nil {
+               objs = append(objs, podDisruptionBudget)
+       }
+
        eventingObjs, err := common.NewKnativeEventingHandler(d.StateSupport, 
pl).Ensure(ctx, workflow)
        if err != nil {
                return reconcile.Result{}, nil, err
diff --git 
a/packages/sonataflow-operator/internal/controller/profiles/preview/disruption_budget_handler.go
 
b/packages/sonataflow-operator/internal/controller/profiles/preview/disruption_budget_handler.go
new file mode 100644
index 00000000000..dac01581ba9
--- /dev/null
+++ 
b/packages/sonataflow-operator/internal/controller/profiles/preview/disruption_budget_handler.go
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package preview
+
+import (
+       "context"
+       "fmt"
+
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/workflowproj"
+
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/workflowdef"
+
+       "k8s.io/klog/v2"
+
+       "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log"
+
+       policyv1 "k8s.io/api/policy/v1"
+       "sigs.k8s.io/controller-runtime/pkg/client"
+       "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
+
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/utils/kubernetes"
+
+       operatorapi 
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08"
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common"
+)
+
+type podDisruptionBudgetHandler struct {
+       stateSupport        *common.StateSupport
+       podDisruptionBudget common.ObjectEnsurer
+}
+
+type PodDisruptionBudgetHandler interface {
+       Ensure(ctx context.Context, workflow *operatorapi.SonataFlow) 
(client.Object, error)
+}
+
+func NewPodDisruptionBudgetHandler(support *common.StateSupport) 
PodDisruptionBudgetHandler {
+       return podDisruptionBudgetHandler{
+               stateSupport:        support,
+               podDisruptionBudget: common.NewObjectEnsurer(support.C, 
common.PodDisruptionBudgetCreator),
+       }
+}
+
+func (h podDisruptionBudgetHandler) Ensure(ctx context.Context, workflow 
*operatorapi.SonataFlow) (client.Object, error) {
+       if workflow.Spec.PodTemplate.DeploymentModel == 
operatorapi.KnativeDeploymentModel {
+               return nil, nil
+       }
+
+       createOrUpdate := false
+       if 
!kubernetes.IsEmptyPodDisruptionBudgetSpec(workflow.Spec.PodTemplate.PodDisruptionBudget)
 {
+               klog.V(log.D).Infof("Finding HPA for workflow: %s/%s", 
workflow.Namespace, workflow.Name)
+               hpa, err := kubernetes.FindHPAForWorkflow(ctx, 
h.stateSupport.C, workflow.Namespace, workflow.Name)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to find a potential 
HorizontalPodAutoscaler for workflow: %s/%s: %v", workflow.Namespace, 
workflow.Name, err)
+               }
+               if hpa != nil {
+                       klog.V(log.D).Infof("HPA %s/%s was found for workflow 
%s/%s", hpa.Namespace, hpa.Name, workflow.Namespace, workflow.Name)
+                       // The HPA determines the replicas. Be sure that the 
workflow can't be later downscaled to a number of replicas that blocks a drain.
+                       // And, also that the user didn't voluntary scaled the 
workflow to 0.
+                       createOrUpdate = 
kubernetes.HPAMinReplicasIsGreaterThan(hpa, int32(1)) && 
!workflowdef.IsScaledToZero(workflow)
+               } else {
+                       // The replicas are determined from the workflow spec. 
Be sure that the number of replicas don't block a drain.
+                       createOrUpdate = 
workflowdef.ReplicasIsGreaterThan(workflow, int32(1))
+               }
+       }
+
+       if createOrUpdate {
+               pdb, _, err := h.podDisruptionBudget.Ensure(ctx, workflow, 
func(object client.Object) controllerutil.MutateFn {
+                       return func() error {
+                               targetPdb := 
object.(*policyv1.PodDisruptionBudget)
+                               targetPdb.Spec.Selector = &metav1.LabelSelector{
+                                       MatchLabels: 
workflowproj.GetSelectorLabels(workflow),
+                               }
+                               
kubernetes.ApplyPodDisruptionBudgetSpec(targetPdb, 
workflow.Spec.PodTemplate.PodDisruptionBudget)
+                               return nil
+                       }
+               })
+               if err != nil {
+                       return nil, fmt.Errorf("failed to create or update 
PodDiscruptionBudget for workflow's deployment %s/%s: %v", workflow.Namespace, 
workflow.Name, err)
+               }
+               return pdb, nil
+       } else {
+               // Remove a potential previously created PDB if any.
+               if err := kubernetes.SafeDeletePodDisruptionBudget(ctx, 
h.stateSupport.C, workflow.Namespace, workflow.Name); err != nil {
+                       return nil, err
+               }
+               return nil, nil
+       }
+}
diff --git 
a/packages/sonataflow-operator/internal/controller/sonataflow_controller.go 
b/packages/sonataflow-operator/internal/controller/sonataflow_controller.go
index 65a3d184aa4..800b8c8191a 100644
--- a/packages/sonataflow-operator/internal/controller/sonataflow_controller.go
+++ b/packages/sonataflow-operator/internal/controller/sonataflow_controller.go
@@ -24,6 +24,14 @@ import (
        "fmt"
        "time"
 
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/utils/kubernetes"
+
+       autoscalingv2 "k8s.io/api/autoscaling/v2"
+       pkgbuilder "sigs.k8s.io/controller-runtime/pkg/builder"
+       "sigs.k8s.io/controller-runtime/pkg/predicate"
+
+       v1 "k8s.io/api/policy/v1"
+
        
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles"
        
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/manager"
 
@@ -72,6 +80,8 @@ import (
 
        operatorapi 
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08"
        
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform"
+
+       ctrlevent "sigs.k8s.io/controller-runtime/pkg/event"
 )
 
 // SonataFlowReconciler reconciles a SonataFlow object
@@ -88,6 +98,7 @@ type SonataFlowReconciler struct {
 
//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflows/finalizers,verbs=update
 
//+kubebuilder:rbac:groups="monitoring.coreos.com",resources=servicemonitors,verbs=get;list;watch;create;update;delete
 
//+kubebuilder:rbac:groups="serving.knative.dev",resources=revisions,verbs=list;watch;delete
+//+kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=get;list;watch;create;update;patch;delete
 
 // Reconcile is part of the main kubernetes reconciliation loop which aims to
 // move the current state of the cluster closer to the desired state.
@@ -374,6 +385,7 @@ func (r *SonataFlowReconciler) SetupWithManager(mgr 
ctrl.Manager) error {
        builder := ctrl.NewControllerManagedBy(mgr).
                For(&operatorapi.SonataFlow{}).
                Owns(&appsv1.Deployment{}).
+               Owns(&v1.PodDisruptionBudget{}).
                Owns(&corev1.Service{}).
                Owns(&corev1.ConfigMap{}).
                Owns(&operatorapi.SonataFlowBuild{}).
@@ -392,7 +404,8 @@ func (r *SonataFlowReconciler) SetupWithManager(mgr 
ctrl.Manager) error {
                                return []reconcile.Request{}
                        }
                        return buildEnqueueRequestsFromMapFunc(mgr.GetClient(), 
build)
-               }))
+               })).
+               Watches(&autoscalingv2.HorizontalPodAutoscaler{}, 
handler.EnqueueRequestsFromMapFunc(r.mapHPAToSonataFlowRequests), 
pkgbuilder.WithPredicates(hpaToSonataFlowPredicate()))
 
        knativeAvail, err := knative.GetKnativeAvailability(mgr.GetConfig())
        if err != nil {
@@ -416,3 +429,43 @@ func (r *SonataFlowReconciler) SetupWithManager(mgr 
ctrl.Manager) error {
 
        return builder.Complete(r)
 }
+
+// hpaToSonataFlowPredicate filters the HorizontalPodAutoscaler events that 
might require attention by the SonataFlow
+// controller, i.e., those HorizontalPodAutoscalers that points to a 
SonataFlow and has relevant changes rather than
+// status updates. Note that changes in such HorizontalPodAutoscaler might for 
example have impact on the potentially
+// generated PodDisruptionBudget.
+func hpaToSonataFlowPredicate() predicate.Funcs {
+       return predicate.Funcs{
+               CreateFunc: func(e ctrlevent.CreateEvent) bool {
+                       return 
kubernetes.IsHPAndTargetsASonataFlowAsBool(e.Object)
+               },
+               UpdateFunc: func(e ctrlevent.UpdateEvent) bool {
+                       oldHpa, oldHpaOk := 
kubernetes.IsHPAndTargetsASonataFlow(e.ObjectOld)
+                       newHpa, newHpaOK := 
kubernetes.IsHPAndTargetsASonataFlow(e.ObjectNew)
+                       if oldHpaOk || newHpaOK {
+                               return !kubernetes.HPAEqualsBySpec(oldHpa, 
newHpa)
+                       }
+                       return false
+               },
+               DeleteFunc: func(e ctrlevent.DeleteEvent) bool {
+                       return 
kubernetes.IsHPAndTargetsASonataFlowAsBool(e.Object)
+               },
+               GenericFunc: func(e ctrlevent.GenericEvent) bool {
+                       return 
kubernetes.IsHPAndTargetsASonataFlowAsBool(e.Object)
+               },
+       }
+}
+
+// mapHPAToSonataFlowRequests given a HorizontalPodAutoscaler that targets a 
SonataFlow, returns the recon request to
+// that workflow.
+func (r *SonataFlowReconciler) mapHPAToSonataFlowRequests(ctx context.Context, 
object client.Object) []reconcile.Request {
+       hpa := object.(*autoscalingv2.HorizontalPodAutoscaler)
+       return []reconcile.Request{
+               {
+                       NamespacedName: types.NamespacedName{
+                               Namespace: hpa.Namespace,
+                               Name:      hpa.Spec.ScaleTargetRef.Name,
+                       },
+               },
+       }
+}
diff --git 
a/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller.go
 
b/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller.go
index ecddba47527..e554c7cbde2 100644
--- 
a/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller.go
+++ 
b/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller.go
@@ -22,6 +22,7 @@ package controller
 import (
        "context"
        "fmt"
+       "strings"
        "time"
 
        autoscalingv2 "k8s.io/api/autoscaling/v2"
@@ -59,6 +60,8 @@ import (
        
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform/services"
        
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/constants"
        "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log"
+
+       kubeutil 
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/utils/kubernetes"
 )
 
 // SonataFlowPlatformReconciler reconciles a SonataFlowPlatform object
@@ -78,6 +81,7 @@ type SonataFlowPlatformReconciler struct {
 
//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowplatforms/finalizers,verbs=update
 
//+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
 
//+kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs=get;list;watch
+//+kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=get;list;watch;create;update;patch;delete
 
 // Reconcile is part of the main kubernetes reconciliation loop which aims to
 // move the current state of the cluster closer to the desired state.
@@ -277,21 +281,6 @@ func (r *SonataFlowPlatformReconciler) 
updateIfActiveClusterPlatformExists(ctx c
 
 // SetupWithManager sets up the controller with the Manager.
 func (r *SonataFlowPlatformReconciler) SetupWithManager(mgr ctrlrun.Manager) 
error {
-       pred := predicate.Funcs{
-               CreateFunc: func(e ctrlevent.CreateEvent) bool {
-                       return false
-               },
-               UpdateFunc: func(e ctrlevent.UpdateEvent) bool {
-                       return false
-               },
-               DeleteFunc: func(e ctrlevent.DeleteEvent) bool {
-                       return true
-               },
-               GenericFunc: func(e ctrlevent.GenericEvent) bool {
-                       return false
-               },
-       }
-
        builder := ctrlrun.NewControllerManagedBy(mgr).
                For(&operatorapi.SonataFlowPlatform{}).
                Owns(&appsv1.Deployment{}).
@@ -299,7 +288,7 @@ func (r *SonataFlowPlatformReconciler) SetupWithManager(mgr 
ctrlrun.Manager) err
                Owns(&corev1.ConfigMap{}).
                Watches(&operatorapi.SonataFlowPlatform{}, 
handler.EnqueueRequestsFromMapFunc(r.mapPlatformToPlatformRequests)).
                Watches(&operatorapi.SonataFlowClusterPlatform{}, 
handler.EnqueueRequestsFromMapFunc(r.mapClusterPlatformToPlatformRequests)).
-               Watches(&autoscalingv2.HorizontalPodAutoscaler{}, 
handler.EnqueueRequestsFromMapFunc(r.mapHPAToPlatformRequests), 
pkgbuilder.WithPredicates(pred))
+               Watches(&autoscalingv2.HorizontalPodAutoscaler{}, 
handler.EnqueueRequestsFromMapFunc(r.mapHPAToPlatformRequests), 
pkgbuilder.WithPredicates(hpaToSonataFlowPlatformServicePredicate()))
 
        knativeAvail, err := knative.GetKnativeAvailability(mgr.GetConfig())
        if err != nil {
@@ -369,15 +358,15 @@ func contains(slice []operatorapi.WorkFlowCapability, s 
operatorapi.WorkFlowCapa
        return false
 }
 
-// mapHPAToPlatformRequests determines if the referred HorizontalPodAutoscaler 
targets a DI service in given namespace,
-// if any. Analyses if that namespace has an SFP that manges the DI service. 
If that is the case, enqueues a Request to
-// the SFP to give the chance to take an action if needed. We normally want to 
do that in situations where the
-// HorizontalPodAutoscaler was removed (DeleteEvent) to let the SFP restore 
back the DI replicas. In other situations
-// the given HorizontalPodAutoscaler will take the needed actions if needed.
+// mapHPAToPlatformRequests determines if the referred HorizontalPodAutoscaler 
targets a DI service in the given namespace,
+// if any. Analyses if that namespace has an SPF that manges the DI service. 
If that is the case, enqueues a Request to
+// the SPF to give the chance to take an action if needed. We normally want to 
do that in situations where the
+// HorizontalPodAutoscaler was removed or has changed to let the SFP 
controller restore back the DI replicas or manage
+// a potential PodDisruptionBudget configuration accordingly.
 func (r *SonataFlowPlatformReconciler) mapHPAToPlatformRequests(ctx 
context.Context, object client.Object) []reconcile.Request {
-       hpa := object.(*autoscalingv2.HorizontalPodAutoscaler)
-       klog.V(log.D).Infof("HorizontalPodAutoscaler %s/%s with 
spec.ScaleTargetRef.Kind Kind: %s was deleted.", hpa.Namespace, hpa.Name, 
hpa.Spec.ScaleTargetRef.Kind)
-       if hpa.Spec.ScaleTargetRef.Kind == "Deployment" {
+       hpa, ok := kubeutil.IsHPAndTargetsADeployment(object)
+       klog.V(log.D).Infof("HorizontalPodAutoscaler %s/%s with 
spec.ScaleTargetRef.Kind Kind: %s requires SFP controller attention.", 
hpa.Namespace, hpa.Name, hpa.Spec.ScaleTargetRef.Kind)
+       if ok {
                sfp, err := platform.GetActivePlatform(ctx, r.Client, 
hpa.Namespace, false)
                if err != nil {
                        klog.V(log.D).Infof("failed to get active platform in 
namespace: %s, %v", hpa.Namespace, err)
@@ -401,3 +390,41 @@ func (r *SonataFlowPlatformReconciler) 
mapHPAToPlatformRequests(ctx context.Cont
        }
        return nil
 }
+
+// hpaToSonataFlowPlatformServicePredicate returns the predicate functions to 
filter HorizontalPodAutoscaler events
+// that might require attention from the SFP controller.
+func hpaToSonataFlowPlatformServicePredicate() predicate.Funcs {
+       return predicate.Funcs{
+               CreateFunc: func(e ctrlevent.CreateEvent) bool {
+                       return 
isHPAndCandidateToTargetADataIndexDeploymentAsBool(e.Object)
+               },
+               UpdateFunc: func(e ctrlevent.UpdateEvent) bool {
+                       oldHpa, oldHpaOk := 
isHPAndCandidateToTargetADataIndexDeployment(e.ObjectOld)
+                       newHpa, newHpaOK := 
isHPAndCandidateToTargetADataIndexDeployment(e.ObjectNew)
+                       if oldHpaOk || newHpaOK {
+                               return !kubeutil.HPAEqualsBySpec(oldHpa, newHpa)
+                       }
+                       return false
+               },
+               DeleteFunc: func(e ctrlevent.DeleteEvent) bool {
+                       return 
isHPAndCandidateToTargetADataIndexDeploymentAsBool(e.Object)
+               },
+               GenericFunc: func(e ctrlevent.GenericEvent) bool {
+                       return 
isHPAndCandidateToTargetADataIndexDeploymentAsBool(e.Object)
+               },
+       }
+}
+
+func isHPAndCandidateToTargetADataIndexDeployment(obj client.Object) 
(*autoscalingv2.HorizontalPodAutoscaler, bool) {
+       if hpa, ok := kubeutil.IsHPAndTargetsADeployment(obj); ok {
+               if strings.HasSuffix(hpa.Spec.ScaleTargetRef.Name, 
constants.DataIndexServiceName) {
+                       return hpa, true
+               }
+       }
+       return nil, false
+}
+
+func isHPAndCandidateToTargetADataIndexDeploymentAsBool(obj client.Object) 
bool {
+       _, ok := isHPAndCandidateToTargetADataIndexDeployment(obj)
+       return ok
+}
diff --git 
a/packages/sonataflow-operator/internal/controller/workflowdef/utils.go 
b/packages/sonataflow-operator/internal/controller/workflowdef/utils.go
index 17a0eb06bc7..d4b97b5bd07 100644
--- a/packages/sonataflow-operator/internal/controller/workflowdef/utils.go
+++ b/packages/sonataflow-operator/internal/controller/workflowdef/utils.go
@@ -127,3 +127,15 @@ func GenerateScaleSelector(workflow 
*operatorapi.SonataFlow) string {
        labelSet := labels.Set(workflowproj.GetSelectorLabels(workflow))
        return labelSet.AsSelector().String()
 }
+
+// IsScaledToZero returns true if the workflow has been explicitly scaled to 
zero, i.e., by setting
+// the spec.podTemplate.replicas == 0. False in any other case, including when 
spec.podTemplate.replicas  == nil.
+func IsScaledToZero(workflow *operatorapi.SonataFlow) bool {
+       return workflow.Spec.PodTemplate.Replicas != nil && 
*workflow.Spec.PodTemplate.Replicas == int32(0)
+}
+
+// ReplicasIsGreaterThan returns true if the workflow configured 
Spec.PodTemplate.Replicas is != nil, and greater
+// than the given value. False in any other case.
+func ReplicasIsGreaterThan(workflow *operatorapi.SonataFlow, value int32) bool 
{
+       return workflow.Spec.PodTemplate.Replicas != nil && 
*workflow.Spec.PodTemplate.Replicas > value
+}
diff --git a/packages/sonataflow-operator/operator.yaml 
b/packages/sonataflow-operator/operator.yaml
index eee14236f37..86f720fea0b 100644
--- a/packages/sonataflow-operator/operator.yaml
+++ b/packages/sonataflow-operator/operator.yaml
@@ -6764,6 +6764,33 @@ spec:
                                 defined in the corresponding RuntimeClass, 
otherwise it will remain unset and treated as zero.
                                 More info: 
https://git.k8s.io/enhancements/keps/sig-node/688-pod-overhead/README.md
                               type: object
+                            podDisruptionBudget:
+                              description: |-
+                                Defines the Kubernetes PodDisruptionBudgetSpec 
for this service. When configured, the SonataFlowPlatform controller
+                                will automatically create a 
PodDisruptionBudget based on this specification that targets the service 
Deployment.
+                                Currently only apply for the Data Index.
+                              properties:
+                                maxUnavailable:
+                                  anyOf:
+                                    - type: integer
+                                    - type: string
+                                  description: |-
+                                    An eviction is allowed if at most 
"maxUnavailable" pods selected by
+                                    "selector" are unavailable after the 
eviction, i.e. even in absence of
+                                    the evicted pod. For example, one can 
prevent all voluntary evictions
+                                    by specifying 0. This is a mutually 
exclusive setting with "minAvailable".
+                                  x-kubernetes-int-or-string: true
+                                minAvailable:
+                                  anyOf:
+                                    - type: integer
+                                    - type: string
+                                  description: |-
+                                    An eviction is allowed if at least 
"minAvailable" pods selected by
+                                    "selector" will still be available after 
the eviction, i.e. even in the
+                                    absence of the evicted pod.  So for 
example you can prevent all voluntary
+                                    evictions by specifying "100%". This is a 
mutually exclusive setting with "maxUnavailable".
+                                  x-kubernetes-int-or-string: true
+                              type: object
                             preemptionPolicy:
                               description: |-
                                 PreemptionPolicy is the Policy for preempting 
pods with lower priority.
@@ -14797,6 +14824,33 @@ spec:
                                 defined in the corresponding RuntimeClass, 
otherwise it will remain unset and treated as zero.
                                 More info: 
https://git.k8s.io/enhancements/keps/sig-node/688-pod-overhead/README.md
                               type: object
+                            podDisruptionBudget:
+                              description: |-
+                                Defines the Kubernetes PodDisruptionBudgetSpec 
for this service. When configured, the SonataFlowPlatform controller
+                                will automatically create a 
PodDisruptionBudget based on this specification that targets the service 
Deployment.
+                                Currently only apply for the Data Index.
+                              properties:
+                                maxUnavailable:
+                                  anyOf:
+                                    - type: integer
+                                    - type: string
+                                  description: |-
+                                    An eviction is allowed if at most 
"maxUnavailable" pods selected by
+                                    "selector" are unavailable after the 
eviction, i.e. even in absence of
+                                    the evicted pod. For example, one can 
prevent all voluntary evictions
+                                    by specifying 0. This is a mutually 
exclusive setting with "minAvailable".
+                                  x-kubernetes-int-or-string: true
+                                minAvailable:
+                                  anyOf:
+                                    - type: integer
+                                    - type: string
+                                  description: |-
+                                    An eviction is allowed if at least 
"minAvailable" pods selected by
+                                    "selector" will still be available after 
the eviction, i.e. even in the
+                                    absence of the evicted pod.  So for 
example you can prevent all voluntary
+                                    evictions by specifying "100%". This is a 
mutually exclusive setting with "maxUnavailable".
+                                  x-kubernetes-int-or-string: true
+                              type: object
                             preemptionPolicy:
                               description: |-
                                 PreemptionPolicy is the Policy for preempting 
pods with lower priority.
@@ -24894,6 +24948,33 @@ spec:
                         defined in the corresponding RuntimeClass, otherwise 
it will remain unset and treated as zero.
                         More info: 
https://git.k8s.io/enhancements/keps/sig-node/688-pod-overhead/README.md
                       type: object
+                    podDisruptionBudget:
+                      description: |-
+                        Defines the Kubernetes PodDisruptionBudgetSpec for 
this workflow. When configured, the SonataFlow controller will
+                        automatically create a PodDisruptionBudget based on 
this specification that targets the workflow Deployment.
+                        Ignored in "knative" deployment model, and dev profile 
workflows.
+                      properties:
+                        maxUnavailable:
+                          anyOf:
+                            - type: integer
+                            - type: string
+                          description: |-
+                            An eviction is allowed if at most "maxUnavailable" 
pods selected by
+                            "selector" are unavailable after the eviction, 
i.e. even in absence of
+                            the evicted pod. For example, one can prevent all 
voluntary evictions
+                            by specifying 0. This is a mutually exclusive 
setting with "minAvailable".
+                          x-kubernetes-int-or-string: true
+                        minAvailable:
+                          anyOf:
+                            - type: integer
+                            - type: string
+                          description: |-
+                            An eviction is allowed if at least "minAvailable" 
pods selected by
+                            "selector" will still be available after the 
eviction, i.e. even in the
+                            absence of the evicted pod.  So for example you 
can prevent all voluntary
+                            evictions by specifying "100%". This is a mutually 
exclusive setting with "maxUnavailable".
+                          x-kubernetes-int-or-string: true
+                      type: object
                     preemptionPolicy:
                       description: |-
                         PreemptionPolicy is the Policy for preempting pods 
with lower priority.
@@ -27833,6 +27914,18 @@ rules:
       - list
       - update
       - watch
+  - apiGroups:
+      - policy
+    resources:
+      - poddisruptionbudgets
+    verbs:
+      - create
+      - delete
+      - get
+      - list
+      - patch
+      - update
+      - watch
   - apiGroups:
       - serving.knative.dev
     resources:
diff --git a/packages/sonataflow-operator/test/e2e/helpers.go 
b/packages/sonataflow-operator/test/e2e/helpers.go
index a92c1173bb9..e467c5c7f87 100644
--- a/packages/sonataflow-operator/test/e2e/helpers.go
+++ b/packages/sonataflow-operator/test/e2e/helpers.go
@@ -30,6 +30,8 @@ import (
        "strings"
        "time"
 
+       "k8s.io/apimachinery/pkg/util/intstr"
+
        "sigs.k8s.io/yaml"
 
        operatorapi 
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08"
@@ -250,7 +252,7 @@ func verifyObjectReplicasFromPath(name string, ns string, 
objetType string, subR
                return false
        }
        GinkgoWriter.Println(fmt.Sprintf("Got response %s", response))
-       replicas, err := extractReplicasFromResponse(response)
+       replicas, err := extractInt32FromResponse(response)
        if err != nil {
                GinkgoWriter.Println(fmt.Errorf("failed to get scale replicas 
from response for object: %s -> %s/%s, subResource: %s, and replicasPath: %s, 
%v", objetType, ns, name, subResource, replicasPath, err))
                return false
@@ -258,15 +260,12 @@ func verifyObjectReplicasFromPath(name string, ns string, 
objetType string, subR
        return replicas == expectedReplicas
 }
 
-func extractReplicasFromResponse(response []byte) (int32, error) {
+func extractInt32FromResponse(response []byte) (int32, error) {
        strResponse := strings.ToLower(string(response))
        if strings.Contains(strResponse, "error") || 
strings.Contains(strResponse, "not found") {
                return -1, fmt.Errorf("%s", response)
        }
-       strResponse = strings.TrimSpace(strings.ReplaceAll(strResponse, "'", 
""))
-       if len(strResponse) == 0 {
-               return -1, fmt.Errorf("%s", response)
-       }
+       strResponse = extractFromSingleQuotedResponse(strResponse)
        replicas, err := strconv.ParseInt(strResponse, 10, 32)
        if err != nil {
                return -1, err
@@ -274,6 +273,51 @@ func extractReplicasFromResponse(response []byte) (int32, 
error) {
        return int32(replicas), nil
 }
 
+func verifyResourceExists(name string, resourceType string, ns string) bool {
+       cmd := exec.Command("kubectl", "get", resourceType, name, "-n", ns, 
"-o", "jsonpath='{.metadata.name}'")
+       response, err := utils.Run(cmd)
+       if err != nil {
+               GinkgoWriter.Println(fmt.Errorf("failed to check if resource 
exists, name: %s, resourceType: %s, ns: %s, %v", name, resourceType, ns, err))
+               return false
+       }
+       strResponse := string(response)
+       if strings.Contains(strResponse, "NotFound") || 
strings.Contains(strResponse, "not found") {
+               return false
+       }
+       return name == extractFromSingleQuotedResponse(strResponse)
+}
+
+func verifyPodDisruptionBudgetConditionHasStatus(name string, ns string, 
condition string, status string) bool {
+       jsonPath := 
fmt.Sprintf("jsonpath='{.status.conditions[?(@.type==\"%s\")].status}'", 
condition)
+       cmd := exec.Command("kubectl", "get", "pdb", name, "-n", ns, "-o", 
jsonPath)
+       response, err := utils.Run(cmd)
+       if err != nil {
+               GinkgoWriter.Println(fmt.Errorf("failed to get status value for 
condition: %s from PodDisruptionBudget: %s/%s, %v", condition, ns, name, err))
+               return false
+       }
+       return status == extractFromSingleQuotedResponse(string(response))
+}
+
+func verifyPodDisruptionBudgetAllowsDisruptionNumber(name string, ns string, 
expectedAllowedDisruptions int32) bool {
+       cmd := exec.Command("kubectl", "get", "pdb", name, "-n", ns, "-o", 
"jsonpath='{.status.disruptionsAllowed}'")
+       response, err := utils.Run(cmd)
+       if err != nil {
+               GinkgoWriter.Println(fmt.Errorf("failed to get the number of 
disruptionsAllowed from PodDisruptionBudget: %s/%s, %v", ns, name, err))
+               return false
+       }
+       allowedDisruptions, err := extractInt32FromResponse(response)
+       if err != nil {
+               return false
+       }
+       return allowedDisruptions == expectedAllowedDisruptions
+}
+
+func extractFromSingleQuotedResponse(response string) string {
+       result := strings.TrimSpace(response)
+       result = strings.TrimPrefix(response, "'")
+       return strings.TrimSuffix(result, "'")
+}
+
 func createTmpCopy(srcPath string) string {
        tmpDir := GinkgoT().TempDir()
        fileName := filepath.Base(srcPath)
@@ -290,12 +334,40 @@ func createTmpCopy(srcPath string) string {
        return dstPath
 }
 
+func setReplicasOrFail(workflowFile string, replicas int32) {
+       if err := setReplicas(workflowFile, replicas); err != nil {
+               GinkgoT().Fatal(err)
+       }
+}
+
 func setImageAndReplicasOrFail(workflowFile, newImage string, replicas int32) {
        if err := setImageAndReplicas(workflowFile, newImage, replicas); err != 
nil {
                GinkgoT().Fatal(err)
        }
 }
 
+func setPodDisruptionBudgetOrFail(workflowFile string, minAvailable 
*intstr.IntOrString, maxUnavailable *intstr.IntOrString) {
+       err := applyWorkflowTransform(workflowFile, func(workflow 
*operatorapi.SonataFlow) {
+               if minAvailable == nil && maxUnavailable == nil {
+                       workflow.Spec.PodTemplate.PodDisruptionBudget = nil
+               } else {
+                       workflow.Spec.PodTemplate.PodDisruptionBudget = 
&operatorapi.PodDisruptionBudgetSpec{
+                               MinAvailable:   minAvailable,
+                               MaxUnavailable: maxUnavailable,
+                       }
+               }
+       })
+       if err != nil {
+               GinkgoT().Fatal(err)
+       }
+}
+
+func setReplicas(workflowFile string, replicas int32) error {
+       return applyWorkflowTransform(workflowFile, func(workflow 
*operatorapi.SonataFlow) {
+               workflow.Spec.PodTemplate.Replicas = &replicas
+       })
+}
+
 func setImageAndReplicas(workflowFile, newImage string, replicas int32) error {
        return applyWorkflowTransform(workflowFile, func(workflow 
*operatorapi.SonataFlow) {
                workflow.Spec.PodTemplate.Container.Image = newImage
@@ -620,3 +692,43 @@ func getWorkflowDefinitionStatus(podName string, 
containerName string, namespace
        }
        return available.(string), true, nil
 }
+
+// patchSFPDataIndexReplicas executes a patch command for the dataIndex 
replicas in the given platform.
+func patchSFPDataIndexReplicas(sfpName string, ns string, replicas string) 
error {
+       return patchSFPServiceReplicas(sfpName, ns, "dataIndex", replicas)
+}
+
+// patchSFPDataIndexReplicas executes a patch command for the jobService 
replicas in the given platform.
+func patchSFPJobServiceReplicas(sfpName string, ns string, replicas string) 
error {
+       return patchSFPServiceReplicas(sfpName, ns, "jobService", replicas)
+}
+
+// patchSFPServiceReplicas use any of the patchSFPDataIndexReplicas or 
patchSFPJobServiceReplicas variants.
+func patchSFPServiceReplicas(sfpName string, ns string, serviceField string, 
replicas string) error {
+       patch := 
fmt.Sprintf(`{"spec":{"services":{"%s":{"podTemplate":{"replicas":%s}}}}}`, 
serviceField, replicas)
+       cmd := exec.Command("kubectl", "patch", "sonataflowplatform", sfpName, 
"-n", ns, "--type=merge", "-p", patch)
+       response, err := utils.Run(cmd)
+       if err != nil {
+               return fmt.Errorf("failed to patch the replicas the service: %s 
in SonataFlowPlatform %s/%s, %v", serviceField, ns, sfpName, err)
+       }
+       strResponse := extractFromSingleQuotedResponse(string(response))
+       if !strings.HasPrefix(strResponse, 
fmt.Sprintf("sonataflowplatform.sonataflow.org/%s patched", sfpName)) {
+               return fmt.Errorf("failed to patch the replicas for the 
service: %s in SonataFlowPlatform %s/%s, verify that the platform exists in the 
namespace: %s, %v", serviceField, ns, sfpName, ns, strResponse)
+       }
+       return nil
+}
+
+// patchHPAMinReplicas executes a patch command for the 
HorizontalPodAutoscaler minReplicas.
+func patchHPAMinReplicas(name string, ns string, replicas string) error {
+       patch := fmt.Sprintf(`{"spec":{"minReplicas":%s}}`, replicas)
+       cmd := exec.Command("kubectl", "patch", "horizontalpodautoscaler", 
name, "-n", ns, "--type=merge", "-p", patch)
+       response, err := utils.Run(cmd)
+       if err != nil {
+               return fmt.Errorf("failed to patch the minReplicas for the HPA: 
%s/%s, %v", ns, name, err)
+       }
+       strResponse := extractFromSingleQuotedResponse(string(response))
+       if !strings.HasPrefix(strResponse, 
fmt.Sprintf("horizontalpodautoscaler.autoscaling/%s patched", name)) {
+               return fmt.Errorf("failed to patch the minReplicas for the HPA 
%s/%s, verify that the HPA exists in the namespace: %s, %s", ns, name, ns, 
strResponse)
+       }
+       return nil
+}
diff --git a/packages/sonataflow-operator/test/e2e/platform_test.go 
b/packages/sonataflow-operator/test/e2e/platform_test.go
index 01615e1e331..251f17652d8 100644
--- a/packages/sonataflow-operator/test/e2e/platform_test.go
+++ b/packages/sonataflow-operator/test/e2e/platform_test.go
@@ -24,6 +24,7 @@ import (
        "math/rand"
        "os/exec"
        "path/filepath"
+       "strconv"
        "strings"
        "time"
 
@@ -526,6 +527,146 @@ var _ = Describe("Platform Use Cases :: ", 
Label("platform"), Ordered, func() {
                Entry("and the HPA is created after the platform", 
test.GetPathFromE2EDirectory("platform", "persistence", 
"generic_from_platform_cr"), test.GetPathFromE2EDirectory("platform", "hpa", 
"generic-data-index-service-hpa.yaml"), false),
                Entry("and the HPA is created before the platform", 
test.GetPathFromE2EDirectory("platform", "persistence", 
"generic_from_platform_cr"), test.GetPathFromE2EDirectory("platform", "hpa", 
"generic-data-index-service-hpa.yaml"), true),
        )
+
+       DescribeTable("when deploying a SonataFlowPlatform with PDB configured 
for the Data Index", Label("data-index-pdb"), func(testcaseDir string, 
dataIndexPatchReplicas string, shouldHavePDBAfterPatch bool, 
shouldHaveDisruptionsAllowedAfterPatch string, 
shouldHaveDisruptionsAllowedNumberAfterPatch int32) {
+               sfpName := "sonataflow-platform"
+               dataIndexServiceName := "sonataflow-platform-data-index-service"
+               expectedPDB := dataIndexServiceName
+
+               By("Deploy the platform and database CRs")
+               EventuallyWithOffset(1, func() error {
+                       return kubectlApplyKustomizeOnCluster(testcaseDir, 
targetNamespace)
+               }, 2*time.Minute, time.Second).Should(Succeed())
+
+               By("Wait for SonataFlowPlatform CR to complete the deployments")
+               EventuallyWithOffset(1, func() error {
+                       return 
verifyDataIndexAndJobsServiceAreReady(targetNamespace, "5s")
+               }, 10*time.Minute, 5).Should(Succeed())
+
+               By("Evaluate status of all service's health endpoint")
+               verifyDataIndexAndJobsServiceAreHealthy(targetNamespace)
+
+               By("Check that the Data Index deployment has 3 replicas as 
expected")
+               EventuallyWithOffset(1, func() bool {
+                       return verifyDeploymentReplicas(dataIndexServiceName, 
targetNamespace, int32(3))
+               }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+               By("check that the PodDisruptionBudget for the Data Index was 
created")
+               EventuallyWithOffset(1, func() bool {
+                       return verifyResourceExists(dataIndexServiceName, 
"pdb", targetNamespace)
+               }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+               By("check that the PodDisruptionBudget's DisruptionAllowed 
condition is True")
+               EventuallyWithOffset(1, func() bool {
+                       return 
verifyPodDisruptionBudgetConditionHasStatus(expectedPDB, targetNamespace, 
"DisruptionAllowed", "True")
+               }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+               By("check that the PodDisruptionBudget is allowing 2 
disruptions")
+               EventuallyWithOffset(1, func() bool {
+                       return 
verifyPodDisruptionBudgetAllowsDisruptionNumber(expectedPDB, targetNamespace, 
int32(2))
+               }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+               By(fmt.Sprintf("scale the Data Index service to Replicas = %s", 
dataIndexPatchReplicas))
+               EventuallyWithOffset(1, func() error {
+                       return patchSFPDataIndexReplicas(sfpName, 
targetNamespace, dataIndexPatchReplicas)
+               }, 3*time.Minute, 30*time.Second).Should(Succeed())
+
+               if shouldHavePDBAfterPatch {
+                       By("check that the PodDisruptionBudget for the Data 
Index still exists")
+               } else {
+                       By("check that the PodDisruptionBudget for the Data 
Index was removed")
+               }
+               EventuallyWithOffset(1, func() bool {
+                       return verifyResourceExists(dataIndexServiceName, 
"pdb", targetNamespace)
+               }, 3*time.Minute, 
30*time.Second).Should(Equal(shouldHavePDBAfterPatch))
+
+               if shouldHavePDBAfterPatch {
+                       By(fmt.Sprintf("check that the PodDisruptionBudget's 
DisruptionAllowed condition in the expected status = %s", 
shouldHaveDisruptionsAllowedAfterPatch))
+                       EventuallyWithOffset(1, func() bool {
+                               return 
verifyPodDisruptionBudgetConditionHasStatus(expectedPDB, targetNamespace, 
"DisruptionAllowed", shouldHaveDisruptionsAllowedAfterPatch)
+                       }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+                       By(fmt.Sprintf("check that the PodDisruptionBudget is 
allowing the disruptions number = %s", 
strconv.Itoa(int(shouldHaveDisruptionsAllowedNumberAfterPatch))))
+                       EventuallyWithOffset(1, func() bool {
+                               return 
verifyPodDisruptionBudgetAllowsDisruptionNumber(expectedPDB, targetNamespace, 
shouldHaveDisruptionsAllowedNumberAfterPatch)
+                       }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+               }
+       },
+
+               Entry("and the Data Index is rescaled to 4 replicas", 
test.GetPathFromE2EDirectory("platform", "persistence", 
"generic_from_platform_cr_with_pdb"), "4", true, "True", int32(3)),
+               Entry("and the Data Index is rescaled to 2 replicas", 
test.GetPathFromE2EDirectory("platform", "persistence", 
"generic_from_platform_cr_with_pdb"), "2", true, "True", int32(1)),
+               Entry("and the Data Index is rescaled to 1 replicas", 
test.GetPathFromE2EDirectory("platform", "persistence", 
"generic_from_platform_cr_with_pdb"), "1", false, "UnUsed", int32(-1)),
+       )
+
+       DescribeTable("when deploying a SonataFlowPlatform with PDB configured 
for the Data Index combined with HPA", Label("data-index-pdb-with-hpa"), 
func(testcaseDir string, hpaPatchReplicas string, shouldHavePDBAfterPatch bool, 
shouldHaveDisruptionsAllowedAfterPatch string, 
shouldHaveDisruptionsAllowedNumberAfterPatch int32) {
+               dataIndexServiceName := "sonataflow-platform-data-index-service"
+               expectedPDB := dataIndexServiceName
+
+               By("Deploy the platform and database CRs")
+               EventuallyWithOffset(1, func() error {
+                       return kubectlApplyKustomizeOnCluster(testcaseDir, 
targetNamespace)
+               }, 2*time.Minute, time.Second).Should(Succeed())
+
+               By("Wait for SonataFlowPlatform CR to complete the deployments")
+               EventuallyWithOffset(1, func() error {
+                       return 
verifyDataIndexAndJobsServiceAreReady(targetNamespace, "5s")
+               }, 10*time.Minute, 5).Should(Succeed())
+
+               By("Evaluate status of all service's health endpoint")
+               verifyDataIndexAndJobsServiceAreHealthy(targetNamespace)
+
+               By("Check that the Data Index deployment has 2 replicas as 
expected from the initial HPA configuration")
+               EventuallyWithOffset(1, func() bool {
+                       return verifyDeploymentReplicas(dataIndexServiceName, 
targetNamespace, int32(2))
+               }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+               By("check that the PodDisruptionBudget for the Data Index was 
created")
+               EventuallyWithOffset(1, func() bool {
+                       return verifyResourceExists(dataIndexServiceName, 
"pdb", targetNamespace)
+               }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+               By("check that the PodDisruptionBudget's DisruptionAllowed 
condition is True")
+               EventuallyWithOffset(1, func() bool {
+                       return 
verifyPodDisruptionBudgetConditionHasStatus(expectedPDB, targetNamespace, 
"DisruptionAllowed", "True")
+               }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+               By("check that the PodDisruptionBudget is allowing 1 
disruption")
+               EventuallyWithOffset(1, func() bool {
+                       return 
verifyPodDisruptionBudgetAllowsDisruptionNumber(expectedPDB, targetNamespace, 
int32(1))
+               }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+               By(fmt.Sprintf("patch the HPA minReplicas to: %s", 
hpaPatchReplicas))
+               EventuallyWithOffset(1, func() error {
+                       return 
patchHPAMinReplicas("sonataflow-platform-data-index-service-hpa", 
targetNamespace, hpaPatchReplicas)
+               }, 3*time.Minute, 30*time.Second).Should(Succeed())
+
+               if shouldHavePDBAfterPatch {
+                       By("check that the PodDisruptionBudget for the Data 
Index still exists after patching the HPA")
+               } else {
+                       By("check that the PodDisruptionBudget for the Data 
Index was removed after patching the HPA")
+               }
+
+               EventuallyWithOffset(1, func() bool {
+                       return verifyResourceExists(dataIndexServiceName, 
"pdb", targetNamespace)
+               }, 3*time.Minute, 
30*time.Second).Should(Equal(shouldHavePDBAfterPatch))
+
+               if shouldHavePDBAfterPatch {
+                       By(fmt.Sprintf("check that the PodDisruptionBudget's 
DisruptionAllowed condition in the expected status = %s", 
shouldHaveDisruptionsAllowedAfterPatch))
+                       EventuallyWithOffset(1, func() bool {
+                               return 
verifyPodDisruptionBudgetConditionHasStatus(expectedPDB, targetNamespace, 
"DisruptionAllowed", shouldHaveDisruptionsAllowedAfterPatch)
+                       }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+                       By(fmt.Sprintf("check that the PodDisruptionBudget is 
allowing the disruptions number = %s", 
strconv.Itoa(int(shouldHaveDisruptionsAllowedNumberAfterPatch))))
+                       EventuallyWithOffset(1, func() bool {
+                               return 
verifyPodDisruptionBudgetAllowsDisruptionNumber(expectedPDB, targetNamespace, 
shouldHaveDisruptionsAllowedNumberAfterPatch)
+                       }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+               }
+       },
+
+               Entry("and the HPA minReplicas is configured with 4", 
test.GetPathFromE2EDirectory("platform", "persistence", 
"generic_from_platform_cr_with_pdb_and_hpa"), "4", true, "True", int32(3)),
+               Entry("and the HPA minReplicas is configured with 3", 
test.GetPathFromE2EDirectory("platform", "persistence", 
"generic_from_platform_cr_with_pdb_and_hpa"), "3", true, "True", int32(2)),
+               Entry("and the HPA minReplicas is configured with 1", 
test.GetPathFromE2EDirectory("platform", "persistence", 
"generic_from_platform_cr_with_pdb_and_hpa"), "1", false, "UnUsed", int32(-1)),
+       )
 })
 
 func verifyDataIndexAndJobsServiceAreReady(namespace string, timeout string) 
error {
diff --git 
a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb/01-postgres.yaml
 
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb/01-postgres.yaml
new file mode 100644
index 00000000000..89f6e6fc81d
--- /dev/null
+++ 
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb/01-postgres.yaml
@@ -0,0 +1,89 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+  labels:
+    app.kubernetes.io/name: postgres
+  name: postgres-pvc
+spec:
+  accessModes:
+    - ReadWriteOnce
+  resources:
+    requests:
+      storage: 1Gi
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  labels:
+    app.kubernetes.io/name: postgres
+  name: postgres
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app.kubernetes.io/name: postgres
+  template:
+    metadata:
+      labels:
+        app.kubernetes.io/name: postgres
+    spec:
+      containers:
+        - name: postgres
+          image: postgres:13.2-alpine
+          imagePullPolicy: "IfNotPresent"
+          ports:
+            - containerPort: 5432
+          volumeMounts:
+            - name: storage
+              mountPath: /var/lib/postgresql/data
+          envFrom:
+            - secretRef:
+                name: postgres-secrets
+          readinessProbe:
+            exec:
+              command: ["pg_isready"]
+            initialDelaySeconds: 15
+            timeoutSeconds: 2
+          livenessProbe:
+            exec:
+              command: ["pg_isready"]
+            initialDelaySeconds: 15
+            timeoutSeconds: 2
+          resources:
+            limits:
+              memory: "256Mi"
+              cpu: "500m"
+      volumes:
+        - name: storage
+          persistentVolumeClaim:
+            claimName: postgres-pvc
+---
+apiVersion: v1
+kind: Service
+metadata:
+  labels:
+    app.kubernetes.io/name: postgres
+  name: postgres
+spec:
+  selector:
+    app.kubernetes.io/name: postgres
+  ports:
+    - port: 5432
diff --git 
a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb/02-sonataflow_platform.yaml
 
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb/02-sonataflow_platform.yaml
new file mode 100644
index 00000000000..d43db8d7cba
--- /dev/null
+++ 
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb/02-sonataflow_platform.yaml
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+apiVersion: sonataflow.org/v1alpha08
+kind: SonataFlowPlatform
+metadata:
+  name: sonataflow-platform
+spec:
+  persistence:
+    postgresql:
+      secretRef:
+        name: postgres-secrets
+        userKey: POSTGRES_USER
+        passwordKey: POSTGRES_PASSWORD
+      serviceRef:
+        name: postgres
+        port: 5432
+        databaseName: sonataflow
+  services:
+    dataIndex:
+      enabled: false
+      persistence:
+        dbMigrationStrategy: service
+      podTemplate:
+        replicas: 3
+        podDisruptionBudget:
+          minAvailable: 1
+        initContainers:
+          - name: init-postgres
+            image: registry.access.redhat.com/ubi9/ubi-micro:latest
+            imagePullPolicy: IfNotPresent
+            command:
+              [
+                "sh",
+                "-c",
+                'until (echo 1 > /dev/tcp/postgres.$(cat 
/var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local/5432)
 >/dev/null 2>&1; do echo "Waiting for postgres server"; sleep 3; done;',
+              ]
+    jobService:
+      enabled: false
+      persistence:
+        dbMigrationStrategy: service
+      podTemplate:
+        initContainers:
+          - name: init-postgres
+            image: registry.access.redhat.com/ubi9/ubi-micro:latest
+            imagePullPolicy: IfNotPresent
+            command:
+              [
+                "sh",
+                "-c",
+                'until (echo 1 > /dev/tcp/postgres.$(cat 
/var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local/5432)
 >/dev/null 2>&1; do echo "Waiting for postgres server"; sleep 3; done;',
+              ]
diff --git 
a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb/kustomization.yaml
 
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb/kustomization.yaml
new file mode 100644
index 00000000000..48d72cbd0b8
--- /dev/null
+++ 
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb/kustomization.yaml
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+resources:
+  - 01-postgres.yaml
+  - 02-sonataflow_platform.yaml
+
+generatorOptions:
+  disableNameSuffixHash: true
+
+secretGenerator:
+  - name: postgres-secrets
+    literals:
+      - POSTGRES_USER=sonataflow
+      - POSTGRES_PASSWORD=sonataflow
+      - POSTGRES_DATABASE=sonataflow
+      - PGDATA=/var/lib/pgsql/data/userdata
+
+sortOptions:
+  order: fifo
diff --git 
a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/01-postgres.yaml
 
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/01-postgres.yaml
new file mode 100644
index 00000000000..89f6e6fc81d
--- /dev/null
+++ 
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/01-postgres.yaml
@@ -0,0 +1,89 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+  labels:
+    app.kubernetes.io/name: postgres
+  name: postgres-pvc
+spec:
+  accessModes:
+    - ReadWriteOnce
+  resources:
+    requests:
+      storage: 1Gi
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  labels:
+    app.kubernetes.io/name: postgres
+  name: postgres
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app.kubernetes.io/name: postgres
+  template:
+    metadata:
+      labels:
+        app.kubernetes.io/name: postgres
+    spec:
+      containers:
+        - name: postgres
+          image: postgres:13.2-alpine
+          imagePullPolicy: "IfNotPresent"
+          ports:
+            - containerPort: 5432
+          volumeMounts:
+            - name: storage
+              mountPath: /var/lib/postgresql/data
+          envFrom:
+            - secretRef:
+                name: postgres-secrets
+          readinessProbe:
+            exec:
+              command: ["pg_isready"]
+            initialDelaySeconds: 15
+            timeoutSeconds: 2
+          livenessProbe:
+            exec:
+              command: ["pg_isready"]
+            initialDelaySeconds: 15
+            timeoutSeconds: 2
+          resources:
+            limits:
+              memory: "256Mi"
+              cpu: "500m"
+      volumes:
+        - name: storage
+          persistentVolumeClaim:
+            claimName: postgres-pvc
+---
+apiVersion: v1
+kind: Service
+metadata:
+  labels:
+    app.kubernetes.io/name: postgres
+  name: postgres
+spec:
+  selector:
+    app.kubernetes.io/name: postgres
+  ports:
+    - port: 5432
diff --git 
a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/02-sonataflow_platform.yaml
 
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/02-sonataflow_platform.yaml
new file mode 100644
index 00000000000..799c6554e02
--- /dev/null
+++ 
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/02-sonataflow_platform.yaml
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+apiVersion: sonataflow.org/v1alpha08
+kind: SonataFlowPlatform
+metadata:
+  name: sonataflow-platform
+spec:
+  persistence:
+    postgresql:
+      secretRef:
+        name: postgres-secrets
+        userKey: POSTGRES_USER
+        passwordKey: POSTGRES_PASSWORD
+      serviceRef:
+        name: postgres
+        port: 5432
+        databaseName: sonataflow
+  services:
+    dataIndex:
+      enabled: false
+      persistence:
+        dbMigrationStrategy: service
+      podTemplate:
+        podDisruptionBudget:
+          minAvailable: 1
+        initContainers:
+          - name: init-postgres
+            image: registry.access.redhat.com/ubi9/ubi-micro:latest
+            imagePullPolicy: IfNotPresent
+            command:
+              [
+                "sh",
+                "-c",
+                'until (echo 1 > /dev/tcp/postgres.$(cat 
/var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local/5432)
 >/dev/null 2>&1; do echo "Waiting for postgres server"; sleep 3; done;',
+              ]
+    jobService:
+      enabled: false
+      persistence:
+        dbMigrationStrategy: service
+      podTemplate:
+        initContainers:
+          - name: init-postgres
+            image: registry.access.redhat.com/ubi9/ubi-micro:latest
+            imagePullPolicy: IfNotPresent
+            command:
+              [
+                "sh",
+                "-c",
+                'until (echo 1 > /dev/tcp/postgres.$(cat 
/var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local/5432)
 >/dev/null 2>&1; do echo "Waiting for postgres server"; sleep 3; done;',
+              ]
diff --git 
a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/03-data-index-service-hpa.yaml
 
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/03-data-index-service-hpa.yaml
new file mode 100644
index 00000000000..27c2408dc2f
--- /dev/null
+++ 
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/03-data-index-service-hpa.yaml
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+apiVersion: autoscaling/v2
+kind: HorizontalPodAutoscaler
+metadata:
+  name: sonataflow-platform-data-index-service-hpa
+spec:
+  scaleTargetRef:
+    apiVersion: apps/v1
+    kind: Deployment
+    name: sonataflow-platform-data-index-service
+  minReplicas: 2
+  maxReplicas: 5
+  metrics:
+    - type: Resource
+      resource:
+        name: cpu
+        target:
+          type: Utilization
+          averageUtilization: 70
+    - type: Resource
+      resource:
+        name: memory
+        target:
+          type: Utilization
+          averageUtilization: 80
diff --git 
a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/kustomization.yaml
 
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/kustomization.yaml
new file mode 100644
index 00000000000..c0b08ff093c
--- /dev/null
+++ 
b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr_with_pdb_and_hpa/kustomization.yaml
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+resources:
+  - 01-postgres.yaml
+  - 02-sonataflow_platform.yaml
+  - 03-data-index-service-hpa.yaml
+
+generatorOptions:
+  disableNameSuffixHash: true
+
+secretGenerator:
+  - name: postgres-secrets
+    literals:
+      - POSTGRES_USER=sonataflow
+      - POSTGRES_PASSWORD=sonataflow
+      - POSTGRES_DATABASE=sonataflow
+      - PGDATA=/var/lib/pgsql/data/userdata
+
+sortOptions:
+  order: fifo
diff --git a/packages/sonataflow-operator/test/e2e/workflow_test.go 
b/packages/sonataflow-operator/test/e2e/workflow_test.go
index b160b653ff8..620c377e001 100644
--- a/packages/sonataflow-operator/test/e2e/workflow_test.go
+++ b/packages/sonataflow-operator/test/e2e/workflow_test.go
@@ -27,6 +27,8 @@ import (
        "strings"
        "time"
 
+       "k8s.io/apimachinery/pkg/util/intstr"
+
        
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/test"
        
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/test/utils"
 
@@ -504,3 +506,215 @@ var _ = Describe("Workflow HorizontalPodAutoscaling Use 
Cases :: ", Label("flows
                })
        })
 })
+
+var _ = Describe("Workflow PodDisruptionBudget Use Cases :: ", 
Label("flows-pdb"), Ordered, func() {
+       var targetNamespace string
+       BeforeEach(func() {
+               targetNamespace = fmt.Sprintf("test-%d", 
rand.Intn(randomIntRange)+1)
+               err := kubectlCreateNamespace(targetNamespace)
+               Expect(err).NotTo(HaveOccurred())
+       })
+       AfterEach(func() {
+               // Remove resources in test namespace
+               if !CurrentSpecReport().Failed() && len(targetNamespace) > 0 {
+                       cmd := exec.Command("kubectl", "delete", "sonataflow", 
"--all", "-n", targetNamespace, "--wait")
+                       _, err := utils.Run(cmd)
+                       Expect(err).NotTo(HaveOccurred())
+                       err = kubectlDeleteNamespace(targetNamespace)
+                       Expect(err).NotTo(HaveOccurred())
+               }
+       })
+
+       DescribeTable("Ensure PDB works by workflow profile", func(profile 
string, usePercent bool) {
+
+               By("create the template file")
+               sonataFlowCrFileTemplate := 
test.GetPathFromE2EDirectory("workflows", prebuiltWorkflows.Greetings.Name, 
"01-sonataflow.org_v1alpha08_sonataflow.yaml")
+               sonataFlowCrFile := createTmpCopy(sonataFlowCrFileTemplate)
+
+               if profile == "gitops" {
+                       By("setup the workflow file for the gitops profile")
+                       setImageAndReplicasOrFail(sonataFlowCrFile, 
prebuiltWorkflows.Greetings.Tag, int32(4))
+                       setProfileOrFail(sonataFlowCrFile, "gitops")
+               } else {
+                       By("setup the workflow file for the preview profile")
+                       setImageAndReplicasOrFail(sonataFlowCrFile, "", 
int32(4))
+                       setProfileOrFail(sonataFlowCrFile, "preview")
+               }
+
+               var minReplicas intstr.IntOrString
+               if usePercent {
+                       By("setup the workflow podDisruptionBudget using 50% 
minReplicas")
+                       minReplicas = intstr.FromString("50%")
+               } else {
+                       By("setup the workflow podDisruptionBudget using 2 
minReplicas")
+                       minReplicas = intstr.FromInt32(int32(2))
+               }
+               setPodDisruptionBudgetOrFail(sonataFlowCrFile, &minReplicas, 
nil)
+
+               By("deploy the workflow")
+               EventuallyWithOffset(1, func() error {
+                       return kubectlApplyFileOnCluster(sonataFlowCrFile, 
targetNamespace)
+               }, 3*time.Minute, time.Second).Should(Succeed())
+
+               By("check the workflow is in running state")
+               EventuallyWithOffset(1, func() bool { return 
verifyWorkflowIsInRunningState(prebuiltWorkflows.Greetings.Name, 
targetNamespace) }, 15*time.Minute, 30*time.Second).Should(BeTrue())
+
+               By("check that the workflow has 4 replica")
+               EventuallyWithOffset(1, func() bool {
+                       return 
verifyWorkflowReplicas(prebuiltWorkflows.Greetings.Name, targetNamespace, 
int32(4))
+               }, 2*time.Minute, 30*time.Second).Should(BeTrue())
+
+               By("check that the PodDisruptionBudget was created")
+               EventuallyWithOffset(1, func() bool {
+                       return 
verifyResourceExists(prebuiltWorkflows.Greetings.Name, "pdb", targetNamespace)
+               }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+               By("check that the PodDisruptionBudget's DisruptionAllowed 
condition is True")
+               EventuallyWithOffset(1, func() bool {
+                       return 
verifyPodDisruptionBudgetConditionHasStatus(prebuiltWorkflows.Greetings.Name, 
targetNamespace, "DisruptionAllowed", "True")
+               }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+               By("check that the PodDisruptionBudget is allowing 2 
disruptions")
+               EventuallyWithOffset(1, func() bool {
+                       return 
verifyPodDisruptionBudgetAllowsDisruptionNumber(prebuiltWorkflows.Greetings.Name,
 targetNamespace, int32(2))
+               }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+               By("scale the workflow to 1")
+               setReplicasOrFail(sonataFlowCrFile, int32(1))
+               EventuallyWithOffset(1, func() error {
+                       return kubectlApplyFileOnCluster(sonataFlowCrFile, 
targetNamespace)
+               }, 3*time.Minute, time.Second).Should(Succeed())
+
+               By("check that the workflow has 1 replica")
+               EventuallyWithOffset(1, func() bool {
+                       return 
verifyWorkflowReplicas(prebuiltWorkflows.Greetings.Name, targetNamespace, 
int32(1))
+               }, 2*time.Minute, 30*time.Second).Should(BeTrue())
+
+               By("check that the PodDisruptionBudget is removed after scaling 
the workflow to 1")
+               EventuallyWithOffset(1, func() bool {
+                       return 
verifyResourceExists(prebuiltWorkflows.Greetings.Name, "pdb", targetNamespace)
+               }, 3*time.Minute, 30*time.Second).Should(BeFalse())
+       },
+               Entry("Workflow with gitops profile and disruption budget with 
minReplicas: 50%", "gitops", true),
+               Entry("Workflow with gitops profile and disruption budget with 
minReplicas: 2", "gitops", false),
+               Entry("Workflow with preview profile and disruption budget with 
minReplicas: 50%", "preview", true),
+               Entry("Workflow with preview profile and disruption budget with 
minReplicas: 2", "preview", false),
+       )
+})
+
+var _ = Describe("Workflow PodDisruptionBudget + HorizontalPodAutoscaler Use 
Cases :: ", Label("flows-pdb-with-hpa"), Ordered, func() {
+       var targetNamespace string
+       BeforeEach(func() {
+               targetNamespace = fmt.Sprintf("test-%d", 
rand.Intn(randomIntRange)+1)
+               err := kubectlCreateNamespace(targetNamespace)
+               Expect(err).NotTo(HaveOccurred())
+       })
+       AfterEach(func() {
+               // Remove resources in test namespace
+               if !CurrentSpecReport().Failed() && len(targetNamespace) > 0 {
+                       cmd := exec.Command("kubectl", "delete", "sonataflow", 
"--all", "-n", targetNamespace, "--wait")
+                       _, err := utils.Run(cmd)
+                       Expect(err).NotTo(HaveOccurred())
+                       err = kubectlDeleteNamespace(targetNamespace)
+                       Expect(err).NotTo(HaveOccurred())
+               }
+       })
+
+       DescribeTable("Ensure PDB works by workflow profile when using a HPA", 
func(profile string, usePercent bool) {
+
+               By("create the template file")
+               sonataFlowCrFileTemplate := 
test.GetPathFromE2EDirectory("workflows", prebuiltWorkflows.Greetings.Name, 
"01-sonataflow.org_v1alpha08_sonataflow.yaml")
+               sonataFlowCrFile := createTmpCopy(sonataFlowCrFileTemplate)
+
+               if profile == "gitops" {
+                       By("setup the workflow file for the gitops profile and 
1 replica")
+                       setImageAndReplicasOrFail(sonataFlowCrFile, 
prebuiltWorkflows.Greetings.Tag, int32(1))
+                       setProfileOrFail(sonataFlowCrFile, "gitops")
+               } else {
+                       By("setup the workflow file for the preview profile and 
1 replica")
+                       setImageAndReplicasOrFail(sonataFlowCrFile, "", 
int32(1))
+                       setProfileOrFail(sonataFlowCrFile, "preview")
+               }
+
+               var minReplicas intstr.IntOrString
+               if usePercent {
+                       By("setup the workflow podDisruptionBudget using 30% 
minReplicas")
+                       minReplicas = intstr.FromString("30%")
+               } else {
+                       By("setup the workflow podDisruptionBudget using 1 
minReplicas")
+                       minReplicas = intstr.FromInt32(int32(1))
+               }
+               setPodDisruptionBudgetOrFail(sonataFlowCrFile, &minReplicas, 
nil)
+
+               By("deploy the workflow")
+               EventuallyWithOffset(1, func() error {
+                       return kubectlApplyFileOnCluster(sonataFlowCrFile, 
targetNamespace)
+               }, 3*time.Minute, time.Second).Should(Succeed())
+
+               By("check the workflow is in running state")
+               EventuallyWithOffset(1, func() bool { return 
verifyWorkflowIsInRunningState(prebuiltWorkflows.Greetings.Name, 
targetNamespace) }, 15*time.Minute, 30*time.Second).Should(BeTrue())
+
+               By("check that the workflow has 1 replica")
+               EventuallyWithOffset(1, func() bool {
+                       return 
verifyWorkflowReplicas(prebuiltWorkflows.Greetings.Name, targetNamespace, 
int32(1))
+               }, 2*time.Minute, 30*time.Second).Should(BeTrue())
+
+               By("check that no PodDisruptionBudget was created since the 
workflow has 1 replica")
+               EventuallyWithOffset(1, func() bool {
+                       return 
verifyResourceExists(prebuiltWorkflows.Greetings.Name, "pdb", targetNamespace)
+               }, 3*time.Minute, 30*time.Second).Should(BeFalse())
+
+               By("create the HPA to manage the workflow replicas and 
configured to ensure 3 min replicas")
+               sonataFlowCrFileHPA := 
test.GetPathFromE2EDirectory("workflows", prebuiltWorkflows.Greetings.Name, 
"01-sonataflow.org_v1alpha08_sonataflow_hpa.yaml")
+               EventuallyWithOffset(1, func() error {
+                       return kubectlApplyFileOnCluster(sonataFlowCrFileHPA, 
targetNamespace)
+               }, 3*time.Minute, time.Second).Should(Succeed())
+
+               By("check that the workflow has 3 replicas after creating the 
HPA")
+               EventuallyWithOffset(1, func() bool {
+                       return 
verifyWorkflowReplicas(prebuiltWorkflows.Greetings.Name, targetNamespace, 
int32(3))
+               }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+               By("check that the PodDisruptionBudget was created after 
creating the HPA")
+               EventuallyWithOffset(1, func() bool {
+                       return 
verifyResourceExists(prebuiltWorkflows.Greetings.Name, "pdb", targetNamespace)
+               }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+               By("check that the PodDisruptionBudget's DisruptionAllowed 
condition is True")
+               EventuallyWithOffset(1, func() bool {
+                       return 
verifyPodDisruptionBudgetConditionHasStatus(prebuiltWorkflows.Greetings.Name, 
targetNamespace, "DisruptionAllowed", "True")
+               }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+               By("check that the PodDisruptionBudget is allowing 2 
disruptions")
+               EventuallyWithOffset(1, func() bool {
+                       return 
verifyPodDisruptionBudgetAllowsDisruptionNumber(prebuiltWorkflows.Greetings.Name,
 targetNamespace, int32(2))
+               }, 3*time.Minute, 30*time.Second).Should(BeTrue())
+
+               By("delete the HPA")
+               EventuallyWithOffset(1, func() error {
+                       return kubectlDeleteFileOnCluster(sonataFlowCrFileHPA, 
targetNamespace)
+               }, 3*time.Minute, time.Second).Should(Succeed())
+
+               By("scale the workflow to 1")
+               setReplicasOrFail(sonataFlowCrFile, int32(1))
+               EventuallyWithOffset(1, func() error {
+                       return kubectlApplyFileOnCluster(sonataFlowCrFile, 
targetNamespace)
+               }, 3*time.Minute, time.Second).Should(Succeed())
+
+               By("check that the workflow has 1 replica")
+               EventuallyWithOffset(1, func() bool {
+                       return 
verifyWorkflowReplicas(prebuiltWorkflows.Greetings.Name, targetNamespace, 
int32(1))
+               }, 2*time.Minute, 30*time.Second).Should(BeTrue())
+
+               By("check that the PodDisruptionBudget is removed after scaling 
the workflow to 1")
+               EventuallyWithOffset(1, func() bool {
+                       return 
verifyResourceExists(prebuiltWorkflows.Greetings.Name, "pdb", targetNamespace)
+               }, 3*time.Minute, 30*time.Second).Should(BeFalse())
+
+       },
+               Entry("Workflow with gitops profile and disruption budget with 
minReplicas: 30%", "gitops", true),
+               Entry("Workflow with gitops profile and disruption budget with 
minReplicas: 1", "gitops", false),
+               Entry("Workflow with preview profile and disruption budget with 
minReplicas: 30%", "preview", true),
+               Entry("Workflow with preview profile and disruption budget with 
minReplicas: 1", "preview", false),
+       )
+})
diff --git a/packages/sonataflow-operator/utils/kubernetes/autoscaling.go 
b/packages/sonataflow-operator/utils/kubernetes/autoscaling.go
new file mode 100644
index 00000000000..d5274b26dca
--- /dev/null
+++ b/packages/sonataflow-operator/utils/kubernetes/autoscaling.go
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package kubernetes
+
+import (
+       "context"
+       "reflect"
+
+       v1 "k8s.io/api/core/v1"
+       "k8s.io/klog/v2"
+
+       autoscalingv2 "k8s.io/api/autoscaling/v2"
+       "sigs.k8s.io/controller-runtime/pkg/client"
+
+       "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log"
+)
+
+// FindHPAForDeployment returns the HorizontalPodAutoscaler targeting a 
deployment in a given namespace, or nil if it
+// doesn't exist.
+// Note: By k8s definition, the HorizontalPodAutoscaler must belong to the 
same namespace as the managed deployment.
+func FindHPAForDeployment(ctx context.Context, c client.Client, namespace 
string, name string) (*autoscalingv2.HorizontalPodAutoscaler, error) {
+       return findHPAForTarget(ctx, c, namespace, "apps/v1", "Deployment", 
name)
+}
+
+// FindHPAForWorkflow returns the HorizontalPodAutoscaler targeting a workflow 
in a given namespace, or nil if it
+// doesn't exist.
+// Note: By k8s definition, the HorizontalPodAutoscaler must belong to the 
same namespace as the managed workflow.
+func FindHPAForWorkflow(ctx context.Context, c client.Client, namespace 
string, name string) (*autoscalingv2.HorizontalPodAutoscaler, error) {
+       return findHPAForTarget(ctx, c, namespace, "sonataflow.org/v1alpha08", 
"SonataFlow", name)
+}
+
+func findHPAForTarget(ctx context.Context, c client.Client, namespace string, 
apiVersion string, kind string, name string) 
(*autoscalingv2.HorizontalPodAutoscaler, error) {
+       klog.V(log.D).Infof("Querying HorizontalPodAutoscalers in namespace: 
%s.", namespace)
+       var hpaList autoscalingv2.HorizontalPodAutoscalerList
+       if err := c.List(ctx, &hpaList, client.InNamespace(namespace)); err != 
nil {
+               return nil, err
+       }
+       klog.V(log.D).Infof("Total number of returned HorizontalPodAutoscalers 
is: %d.", len(hpaList.Items))
+       for _, hpa := range hpaList.Items {
+               ref := hpa.Spec.ScaleTargetRef
+               klog.V(log.D).Infof("Checking if HorizontalPodAutoscaler name: 
%s, ref.APIVersion: %s, ref.Kind: %s, ref.Name: %s, targets apiVersion: %s, 
kind: %s, name: %s.", hpa.Name, ref.APIVersion, ref.Kind, ref.Name, apiVersion, 
kind, name)
+               if ref.Kind == kind && ref.Name == name && ref.APIVersion == 
apiVersion {
+                       klog.V(log.D).Infof("HorizontalPodAutoscaler name: %s 
targets apiVersion: %s, kind: %s, name: %s.", hpa.Name, apiVersion, kind, name)
+                       return &hpa, nil
+               }
+       }
+       klog.V(log.D).Infof("No HorizontalPodAutoscaler targets apiVersion: %s, 
kind: %s, name: %s.", apiVersion, kind, name)
+       return nil, nil
+}
+
+// HPAIsActive returns true if the HorizontalPodAutoscaler is active.
+func HPAIsActive(hpa *autoscalingv2.HorizontalPodAutoscaler) bool {
+       klog.V(log.D).Infof("Checking if HorizontalPodAutoscaler is Active.")
+       for _, cond := range hpa.Status.Conditions {
+               klog.V(log.D).Infof("Checking Status condition type: %s, %s.", 
cond.Type, cond.Status)
+               if cond.Type == autoscalingv2.ScalingActive {
+                       return cond.Status == v1.ConditionTrue
+               }
+       }
+       return false
+}
+
+// HPAIsWorking returns true if the HorizontalPodAutoscaler has started to 
take care of the scaling for the
+// corresponding target ref. At this point, our controllers must transfer the 
control to the HorizontalPodAutoscaler
+// and let it manage the replicas.
+func HPAIsWorking(hpa *autoscalingv2.HorizontalPodAutoscaler) bool {
+       return HPAIsActive(hpa) || hpa.Status.DesiredReplicas > 0
+}
+
+// HPAEqualsBySpec returns true if to HorizontalPodAutoscaler has the same 
Spec, false in any other case.
+func HPAEqualsBySpec(hpa1, hpa2 *autoscalingv2.HorizontalPodAutoscaler) bool {
+       return reflect.DeepEqual(hpa1, hpa2)
+}
+
+// IsHPAndTargetsAKind returns (*autoscalingv2.HorizontalPodAutoscaler, true) 
if the object is a HorizontalPodAutoscaler
+// and targets a given kind, (nil, false) in other cases.
+func IsHPAndTargetsAKind(obj client.Object, kind string) 
(*autoscalingv2.HorizontalPodAutoscaler, bool) {
+       if hpa, ok := obj.(*autoscalingv2.HorizontalPodAutoscaler); ok {
+               if hpa != nil && hpa.Spec.ScaleTargetRef.Kind == kind {
+                       return hpa, true
+               }
+       }
+       return nil, false
+}
+
+// IsHPAndTargetsADeployment returns (*autoscalingv2.HorizontalPodAutoscaler, 
true) if the object is a HorizontalPodAutoscaler
+// and targets a Deployment, (nil, false) in other cases.
+func IsHPAndTargetsADeployment(obj client.Object) 
(*autoscalingv2.HorizontalPodAutoscaler, bool) {
+       return IsHPAndTargetsAKind(obj, "Deployment")
+}
+
+// IsHPAndTargetsASonataFlow returns (*autoscalingv2.HorizontalPodAutoscaler, 
true) if the object is a HorizontalPodAutoscaler
+// and targets a SonataFlow, (nil, false) in other cases.
+func IsHPAndTargetsASonataFlow(obj client.Object) 
(*autoscalingv2.HorizontalPodAutoscaler, bool) {
+       return IsHPAndTargetsAKind(obj, "SonataFlow")
+}
+
+// IsHPAndTargetsASonataFlowAsBool returns true if the object is a 
HorizontalPodAutoscaler and targets a SonataFlow,
+// false in other cases.
+func IsHPAndTargetsASonataFlowAsBool(obj client.Object) bool {
+       _, ok := IsHPAndTargetsAKind(obj, "SonataFlow")
+       return ok
+}
+
+// HPAMinReplicasIsGreaterThan returns true if the HorizontalPodAutoscaler 
configured minReplicas is != nil, and greater
+// than the given value. False in any other case.
+func HPAMinReplicasIsGreaterThan(hpa *autoscalingv2.HorizontalPodAutoscaler, 
value int32) bool {
+       return hpa.Spec.MinReplicas != nil && *hpa.Spec.MinReplicas > value
+}
diff --git a/packages/sonataflow-operator/utils/kubernetes/deployment.go 
b/packages/sonataflow-operator/utils/kubernetes/deployment.go
index 775a35e8b81..08c04b9612a 100644
--- a/packages/sonataflow-operator/utils/kubernetes/deployment.go
+++ b/packages/sonataflow-operator/utils/kubernetes/deployment.go
@@ -204,3 +204,15 @@ func AddOrReplaceContainer(containerName string, container 
v1.Container, podSpec
                podSpec.Containers[idx] = container
        }
 }
+
+// DeploymentReplicasIsGreaterThan returns true if the Deployment configured 
replicas is != nil, and greater than
+// the given value. False in any other case.
+func DeploymentReplicasIsGreaterThan(deployment *appsv1.Deployment, value 
int32) bool {
+       return deployment.Spec.Replicas != nil && *deployment.Spec.Replicas > 
value
+}
+
+// DeploymentIsScaledToZero returns true if the Deployment has been explicitly 
scaled to zero, i.e., by setting
+// the replicas == 0. False in any other case, including when replicas == nil.
+func DeploymentIsScaledToZero(deployment *appsv1.Deployment) bool {
+       return deployment.Spec.Replicas != nil && *deployment.Spec.Replicas == 
int32(0)
+}
diff --git a/packages/sonataflow-operator/utils/kubernetes/disruption_budget.go 
b/packages/sonataflow-operator/utils/kubernetes/disruption_budget.go
new file mode 100644
index 00000000000..348e256596a
--- /dev/null
+++ b/packages/sonataflow-operator/utils/kubernetes/disruption_budget.go
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package kubernetes
+
+import (
+       "context"
+
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+       operatorapi 
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08"
+
+       policyv1 "k8s.io/api/policy/v1"
+       "k8s.io/apimachinery/pkg/api/errors"
+       "k8s.io/klog/v2"
+       "sigs.k8s.io/controller-runtime/pkg/client"
+
+       "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log"
+)
+
+// IsEmptyPodDisruptionBudgetSpec returns true if the PodDisruptionBudgetSpec 
is nil or has no configured values at all,
+// false in any other case.
+func IsEmptyPodDisruptionBudgetSpec(spec *operatorapi.PodDisruptionBudgetSpec) 
bool {
+       return spec == nil || (spec.MinAvailable == nil && spec.MaxUnavailable 
== nil)
+}
+
+// ApplyPodDisruptionBudgetSpec applies an operatorapi.PodDisruptionBudgetSpec 
to the PodDisruptionBudget.
+func ApplyPodDisruptionBudgetSpec(pdb *policyv1.PodDisruptionBudget, spec 
*operatorapi.PodDisruptionBudgetSpec) {
+       if spec.MinAvailable != nil {
+               pdb.Spec.MinAvailable = spec.MinAvailable
+               pdb.Spec.MaxUnavailable = nil
+       } else {
+               pdb.Spec.MaxUnavailable = spec.MaxUnavailable
+               pdb.Spec.MinAvailable = nil
+       }
+}
+
+// SafeDeletePodDisruptionBudget deletes a potentially existing 
PodDisruptionBudget, ignoring the not existing error.
+func SafeDeletePodDisruptionBudget(ctx context.Context, c client.Client, 
namespace, name string) error {
+       err := c.Delete(ctx, &policyv1.PodDisruptionBudget{
+               ObjectMeta: metav1.ObjectMeta{
+                       Namespace: namespace,
+                       Name:      name,
+               },
+       })
+       if err != nil {
+               if errors.IsNotFound(err) {
+                       klog.V(log.D).Infof("PodDisruptionBudget %s/%s was 
already deleted or never existed.", namespace, name)
+                       return nil
+               } else {
+                       return err
+               }
+       }
+       return nil
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to