This is an automated email from the ASF dual-hosted git repository.
vishesh pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/cloudstack-kubernetes-provider.git
The following commit(s) were added to refs/heads/main by this push:
new 1b75921b Associate loadBalancerIP to the network when specified in the
Spec (#82)
1b75921b is described below
commit 1b75921b8778d4ae78e04446ef069d6a72248837
Author: Vishesh <[email protected]>
AuthorDate: Fri Dec 12 13:00:02 2025 +0530
Associate loadBalancerIP to the network when specified in the Spec (#82)
---
cloudstack.go | 101 +++++++++++++++++++++++++++++++++++++++++++++
cloudstack_loadbalancer.go | 92 +++++++++++++++++++++++++++++++++++------
deployment.yaml | 1 +
3 files changed, 181 insertions(+), 13 deletions(-)
diff --git a/cloudstack.go b/cloudstack.go
index 92c31b96..2294de6f 100644
--- a/cloudstack.go
+++ b/cloudstack.go
@@ -21,17 +21,23 @@ package cloudstack
import (
"context"
+ "encoding/json"
"errors"
"fmt"
"io"
"os"
"strings"
+ "time"
"github.com/apache/cloudstack-go/v2/cloudstack"
"github.com/blang/semver/v4"
"gopkg.in/gcfg.v1"
+
+ corev1 "k8s.io/api/core/v1"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
+ "k8s.io/client-go/kubernetes"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog/v2"
)
@@ -329,6 +335,101 @@ func (cs *CSCloud) getNodeNameFromPod(ctx
context.Context) (string, error) {
return pod.Spec.NodeName, nil
}
+// setServiceAnnotation updates a service annotation using the Kubernetes
client.
+// It uses a patch operation with retry logic to handle concurrent updates
safely.
+func (cs *CSCloud) setServiceAnnotation(ctx context.Context, service
*corev1.Service, key, value string) error {
+ if cs.clientBuilder == nil {
+ klog.V(4).Infof("Client builder not available, skipping
annotation update for service %s/%s", service.Namespace, service.Name)
+ return nil
+ }
+
+ client, err := cs.clientBuilder.Client("cloud-controller-manager")
+ if err != nil {
+ return fmt.Errorf("failed to get Kubernetes client: %v", err)
+ }
+
+ // First, check if the annotation already has the correct value to
avoid unnecessary updates
+ svc, err := client.CoreV1().Services(service.Namespace).Get(ctx,
service.Name, metav1.GetOptions{})
+ if err != nil {
+ if apierrors.IsNotFound(err) {
+ klog.V(4).Infof("Service %s/%s not found, skipping
annotation update", service.Namespace, service.Name)
+ return nil
+ }
+ return fmt.Errorf("failed to get service: %v", err)
+ }
+
+ // Check if annotation already has the correct value
+ if svc.Annotations != nil {
+ if existingValue, exists := svc.Annotations[key]; exists &&
existingValue == value {
+ klog.V(4).Infof("Annotation %s already set to %s for
service %s/%s", key, value, service.Namespace, service.Name)
+ return nil
+ }
+ }
+
+ // Use patch operation with retry logic to handle concurrent updates
+ return cs.patchServiceAnnotation(ctx, client, service.Namespace,
service.Name, key, value)
+}
+
+// patchServiceAnnotation patches a service annotation using a JSON merge
patch with retry logic.
+// This method handles concurrent updates safely by retrying on conflicts.
+func (cs *CSCloud) patchServiceAnnotation(ctx context.Context, client
kubernetes.Interface, namespace, name, key, value string) error {
+ const maxRetries = 3
+ const retryDelay = 500 * time.Millisecond
+
+ // Prepare the patch payload - merge patch that updates only the
specific annotation
+ // JSON merge patch will preserve other annotations while
updating/adding this one
+ patchData := map[string]interface{}{
+ "metadata": map[string]interface{}{
+ "annotations": map[string]string{
+ key: value,
+ },
+ },
+ }
+
+ patchBytes, err := json.Marshal(patchData)
+ if err != nil {
+ return fmt.Errorf("failed to marshal patch data: %v", err)
+ }
+
+ for attempt := 0; attempt < maxRetries; attempt++ {
+ // Apply the patch using JSON merge patch type
+ // This is atomic and avoids race conditions by merging with
existing annotations
+ _, err = client.CoreV1().Services(namespace).Patch(
+ ctx,
+ name,
+ types.MergePatchType,
+ patchBytes,
+ metav1.PatchOptions{},
+ )
+
+ if err == nil {
+ klog.V(4).Infof("Successfully set annotation %s=%s on
service %s/%s", key, value, namespace, name)
+ return nil
+ }
+
+ // Handle conflict errors with retry logic
+ if apierrors.IsConflict(err) {
+ if attempt < maxRetries-1 {
+ klog.V(4).Infof("Conflict updating service
%s/%s annotation, retrying (attempt %d/%d): %v", namespace, name, attempt+1,
maxRetries, err)
+ time.Sleep(retryDelay)
+ continue
+ }
+ return fmt.Errorf("failed to update service annotation
after %d retries due to conflicts: %v", maxRetries, err)
+ }
+
+ // Handle not found errors
+ if apierrors.IsNotFound(err) {
+ klog.V(4).Infof("Service %s/%s not found during patch,
skipping annotation update", namespace, name)
+ return nil
+ }
+
+ // For other errors, return immediately
+ return fmt.Errorf("failed to patch service annotation: %v", err)
+ }
+
+ return fmt.Errorf("failed to update service annotation after %d
attempts", maxRetries)
+}
+
func (cs *CSCloud) getRegionFromZone(zone string) string {
if cs.region != "" {
return cs.region
diff --git a/cloudstack_loadbalancer.go b/cloudstack_loadbalancer.go
index 98de2fbd..ffbdd7cd 100644
--- a/cloudstack_loadbalancer.go
+++ b/cloudstack_loadbalancer.go
@@ -51,19 +51,25 @@ const (
// The CIDR list is a comma-separated list of CIDR ranges (e.g.,
"10.0.0.0/8,192.168.1.0/24").
// If not specified, the default is to allow all sources ("0.0.0.0/0").
ServiceAnnotationLoadBalancerSourceCidrs =
"service.beta.kubernetes.io/cloudstack-load-balancer-source-cidrs"
+
+ // ServiceAnnotationLoadBalancerIPAssociatedByController indicates that
the controller
+ // associated the IP address. This annotation is set by the controller
when it associates
+ // an unallocated IP, and is used to determine if the IP should be
disassociated on deletion.
+ ServiceAnnotationLoadBalancerIPAssociatedByController =
"service.beta.kubernetes.io/cloudstack-load-balancer-ip-associated-by-controller"
//nolint:gosec
)
type loadBalancer struct {
*cloudstack.CloudStackClient
- name string
- algorithm string
- hostIDs []string
- ipAddr string
- ipAddrID string
- networkID string
- projectID string
- rules map[string]*cloudstack.LoadBalancerRule
+ name string
+ algorithm string
+ hostIDs []string
+ ipAddr string
+ ipAddrID string
+ networkID string
+ projectID string
+ rules map[string]*cloudstack.LoadBalancerRule
+ ipAssociatedByController bool
}
// GetLoadBalancer returns whether the specified load balancer exists, and if
so, what its status is.
@@ -134,6 +140,14 @@ func (cs *CSCloud) EnsureLoadBalancer(ctx context.Context,
clusterName string, s
}
}(lb)
}
+
+ // If the controller associated the IP and matches the service
spec, set the annotation to persist this information.
+ if lb.ipAssociatedByController && lb.ipAddr ==
service.Spec.LoadBalancerIP {
+ if err := cs.setServiceAnnotation(ctx, service,
ServiceAnnotationLoadBalancerIPAssociatedByController, "true"); err != nil {
+ // Log the error but don't fail - the
annotation is helpful but not critical
+ klog.Warningf("Failed to set annotation on
service %s/%s: %v", service.Namespace, service.Name, err)
+ }
+ }
}
klog.V(4).Infof("Load balancer %v is associated with IP %v", lb.name,
lb.ipAddr)
@@ -360,10 +374,52 @@ func (cs *CSCloud) EnsureLoadBalancerDeleted(ctx
context.Context, clusterName st
}
}
- if lb.ipAddr != "" && lb.ipAddr != service.Spec.LoadBalancerIP {
- klog.V(4).Infof("Releasing load balancer IP: %v", lb.ipAddr)
- if err := lb.releaseLoadBalancerIP(); err != nil {
- return err
+ if lb.ipAddr != "" {
+ // If the IP was allocated by the controller (not specified in
service spec), release it.
+ if lb.ipAddr != service.Spec.LoadBalancerIP {
+ klog.V(4).Infof("Releasing load balancer IP: %v",
lb.ipAddr)
+ if err := lb.releaseLoadBalancerIP(); err != nil {
+ return err
+ }
+ } else {
+ // If the IP was specified in service spec, check if it
was associated by the controller.
+ // First, check if there's an annotation indicating the
controller associated it.
+ // If not, check if there are any other load balancer
rules using this IP.
+ shouldDisassociate :=
getBoolFromServiceAnnotation(service,
ServiceAnnotationLoadBalancerIPAssociatedByController, false)
+
+ if shouldDisassociate {
+ // Annotation is set, so check if there are any
other load balancer rules using this IP.
+ // Since we've already deleted all rules for
this service, any remaining rules must belong
+ // to other services. If no other rules exist,
it's safe to disassociate the IP.
+ ip, count, err :=
lb.Address.GetPublicIpAddressByID(lb.ipAddrID)
+ if err != nil {
+ klog.Errorf("Error retrieving IP
address %v for disassociation check: %v", lb.ipAddr, err)
+ shouldDisassociate = false
+ } else if count > 0 && ip.Allocated != "" {
+ p :=
lb.LoadBalancer.NewListLoadBalancerRulesParams()
+ p.SetPublicipid(lb.ipAddrID)
+ p.SetListall(true)
+ if lb.projectID != "" {
+ p.SetProjectid(lb.projectID)
+ }
+ otherRules, err :=
lb.LoadBalancer.ListLoadBalancerRules(p)
+ if err != nil {
+ klog.Errorf("Error checking for
other load balancer rules using IP %v: %v", lb.ipAddr, err)
+ shouldDisassociate = false
+ } else if otherRules.Count > 0 {
+ // Other load balancer rules
are using this IP (other services are using it),
+ // so don't disassociate.
+ shouldDisassociate = false
+ }
+ }
+ }
+
+ if shouldDisassociate {
+ klog.V(4).Infof("Disassociating IP %v that was
associated by the controller", lb.ipAddr)
+ if err := lb.releaseLoadBalancerIP(); err !=
nil {
+ return err
+ }
+ }
}
}
@@ -498,6 +554,7 @@ func (lb *loadBalancer) getPublicIPAddress(loadBalancerIP
string) error {
p := lb.Address.NewListPublicIpAddressesParams()
p.SetIpaddress(loadBalancerIP)
+ p.SetAllocatedonly(false)
p.SetListall(true)
if lb.projectID != "" {
@@ -510,12 +567,16 @@ func (lb *loadBalancer) getPublicIPAddress(loadBalancerIP
string) error {
}
if l.Count != 1 {
- return fmt.Errorf("could not find IP address %v",
loadBalancerIP)
+ return fmt.Errorf("could not find IP address %v. Found %d
addresses", loadBalancerIP, l.Count)
}
lb.ipAddr = l.PublicIpAddresses[0].Ipaddress
lb.ipAddrID = l.PublicIpAddresses[0].Id
+ // If the IP is not allocated, associate it.
+ if l.PublicIpAddresses[0].Allocated == "" {
+ return lb.associatePublicIPAddress()
+ }
return nil
}
@@ -544,6 +605,10 @@ func (lb *loadBalancer) associatePublicIPAddress() error {
p.SetProjectid(lb.projectID)
}
+ if lb.ipAddr != "" {
+ p.SetIpaddress(lb.ipAddr)
+ }
+
// Associate a new IP address
r, err := lb.Address.AssociateIpAddress(p)
if err != nil {
@@ -552,6 +617,7 @@ func (lb *loadBalancer) associatePublicIPAddress() error {
lb.ipAddr = r.Ipaddress
lb.ipAddrID = r.Id
+ lb.ipAssociatedByController = true
return nil
}
diff --git a/deployment.yaml b/deployment.yaml
index a5cb01f7..58b318fc 100644
--- a/deployment.yaml
+++ b/deployment.yaml
@@ -61,6 +61,7 @@ rules:
resources:
- services
verbs:
+ - get
- list
- patch
- update