ricardozanini commented on code in PR #283:
URL:
https://github.com/apache/incubator-kie-kogito-serverless-operator/pull/283#discussion_r1377531943
##########
controllers/discovery/port_utils.go:
##########
@@ -0,0 +1,108 @@
+package discovery
+
+import (
+ corev1 "k8s.io/api/core/v1"
+)
+
+const (
+ httpProtocol = "http"
+ httpsProtocol = "https"
+ webProtocol = "web"
+ securePort = 443
+ appSecurePort = 8443
+)
+
+func isSecurePort(port int) bool {
+ return port == securePort || port == appSecurePort
+}
+
+// findServicePort returns the best suited ServicePort to connect to a service.
+// The optional customPort can be used to determine which port should be used
for the communication, when not set,
+// the best suited port is returned. For this last, a secure port has
precedence over a no-secure port.
+func findServicePort(servicePorts []corev1.ServicePort, customPort string)
*corev1.ServicePort {
+ // customPort is provided and is configured?
+ if len(customPort) > 0 {
+ if result := findServicePortByName(servicePorts, customPort);
result != nil {
+ return result
+ }
+ }
+ // has ssl port?
+ if result := findServicePortByName(servicePorts, httpsProtocol); result
!= nil {
+ return result
+ }
+ // has http port?
+ if result := findServicePortByName(servicePorts, httpProtocol); result
!= nil {
+ return result
+ }
+ // has web port?
+ if result := findServicePortByName(servicePorts, webProtocol); result
!= nil {
+ return result
+ }
+ // by definition a service must always have at least one port, get the
first port.
+ return &servicePorts[0]
+}
+
+func findServicePortByName(ports []corev1.ServicePort, name string)
*corev1.ServicePort {
+ for _, servicePort := range ports {
+ if name == servicePort.Name {
+ return &servicePort
+ }
+ }
+ return nil
+}
+
+func isServicePortSecure(servicePort corev1.ServicePort) bool {
+ return servicePort.Name == httpsProtocol ||
isSecurePort(int(servicePort.Port))
+}
+
+// findContainerPort returns the best suited PortPort to connect to a pod, or
nil if the pod has no ports at all.
+// The optional customPort can be used to determine which port should be used
for the communication, when not set,
+// the best suited port is returned. For this last, a secure port has
precedence over a non-secure port.
+func findContainerPort(containerPorts []corev1.ContainerPort, customPort
string) *corev1.ContainerPort {
Review Comment:
Don't we have a query in the discovery URI to help this use case? If not, we
can add like: `kubernetes:services.v1/namespace/svc?port=myCustomPort`
##########
controllers/discovery/port_utils.go:
##########
@@ -0,0 +1,108 @@
+package discovery
+
+import (
+ corev1 "k8s.io/api/core/v1"
+)
+
+const (
+ httpProtocol = "http"
+ httpsProtocol = "https"
+ webProtocol = "web"
+ securePort = 443
+ appSecurePort = 8443
+)
+
+func isSecurePort(port int) bool {
+ return port == securePort || port == appSecurePort
+}
+
+// findServicePort returns the best suited ServicePort to connect to a service.
+// The optional customPort can be used to determine which port should be used
for the communication, when not set,
+// the best suited port is returned. For this last, a secure port has
precedence over a no-secure port.
+func findServicePort(servicePorts []corev1.ServicePort, customPort string)
*corev1.ServicePort {
+ // customPort is provided and is configured?
+ if len(customPort) > 0 {
+ if result := findServicePortByName(servicePorts, customPort);
result != nil {
+ return result
+ }
+ }
+ // has ssl port?
+ if result := findServicePortByName(servicePorts, httpsProtocol); result
!= nil {
+ return result
+ }
+ // has http port?
+ if result := findServicePortByName(servicePorts, httpProtocol); result
!= nil {
+ return result
+ }
+ // has web port?
+ if result := findServicePortByName(servicePorts, webProtocol); result
!= nil {
+ return result
+ }
+ // by definition a service must always have at least one port, get the
first port.
+ return &servicePorts[0]
+}
+
+func findServicePortByName(ports []corev1.ServicePort, name string)
*corev1.ServicePort {
+ for _, servicePort := range ports {
+ if name == servicePort.Name {
+ return &servicePort
+ }
+ }
+ return nil
+}
+
+func isServicePortSecure(servicePort corev1.ServicePort) bool {
+ return servicePort.Name == httpsProtocol ||
isSecurePort(int(servicePort.Port))
+}
+
+// findContainerPort returns the best suited PortPort to connect to a pod, or
nil if the pod has no ports at all.
+// The optional customPort can be used to determine which port should be used
for the communication, when not set,
+// the best suited port is returned. For this last, a secure port has
precedence over a non-secure port.
+func findContainerPort(containerPorts []corev1.ContainerPort, customPort
string) *corev1.ContainerPort {
+ // containers with no ports are permitted, we must check.
+ if len(containerPorts) == 0 {
+ return nil
+ }
+ // customPort is provided and configured?
+ if len(customPort) > 0 {
+ if result := findContainerPortByName(containerPorts,
customPort); result != nil {
+ return result
+ }
+ }
+ // has ssl port?
+ if result := findContainerPortByName(containerPorts, httpsProtocol);
result != nil {
+ return result
+ }
+ // has http port?
+ if result := findContainerPortByName(containerPorts, httpProtocol);
result != nil {
+ return result
+ }
+ // has web port?
+ if result := findContainerPortByName(containerPorts, webProtocol);
result != nil {
+ return result
+ }
+ // when defined, a ContainerPort must always have containerPort
(Required value)
+ return &containerPorts[0]
+}
+
+func findContainerPortByName(containerPorts []corev1.ContainerPort, name
string) *corev1.ContainerPort {
Review Comment:
For these more generic functions, we can keep in `utils/kubernetes` package.
##########
controllers/discovery/uri_parser.go:
##########
@@ -0,0 +1,134 @@
+package discovery
+
+import (
+ "fmt"
+ v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "regexp"
+ "strings"
+)
+
+const (
+ // valid namespace, name, or label name.
+ dns1123LabelFmt string = "[a-z0-9]([-a-z0-9]*[a-z0-9])?"
Review Comment:
Don't we have this in `controller-runtime`? Maybe it's worth doing a little
dig in that module:
https://github.com/kubernetes-sigs/controller-runtime/tree/main/pkg
##########
controllers/discovery/kubernetes_catalog.go:
##########
@@ -0,0 +1,65 @@
+package discovery
+
+import (
+ "context"
+ "fmt"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+const (
+ serviceKind = "services"
+ podKind = "pods"
+)
+
+type k8sServiceCatalog struct {
+ Context context.Context
+ Client client.Client
+}
+
+func newK8SServiceCatalog(ctx context.Context, cli client.Client)
k8sServiceCatalog {
+ return k8sServiceCatalog{
+ Context: ctx,
+ Client: cli,
+ }
+}
+
+func (c k8sServiceCatalog) Query(uri ResourceUri, outputFormat string)
(string, error) {
+ switch uri.GVK.Kind {
+ case serviceKind:
+ return c.resolveServiceQuery(uri, outputFormat)
+ case podKind:
+ return c.resolvePodQuery(uri, outputFormat)
+ default:
+ return "", fmt.Errorf("resolution of kind: %s is not yet
implemented", uri.GVK.Kind)
+ }
+}
+
+func (c k8sServiceCatalog) resolveServiceQuery(uri ResourceUri, outputFormat
string) (string, error) {
+ if service, err := findService(c.Context, c.Client, uri.Namespace,
uri.Name); err != nil {
+ return "", err
+ } else if serviceUri, err2 := resolveServiceUri(*service,
uri.GetCustomPort(), outputFormat); err2 != nil {
+ return "", err2
+ } else {
+ return serviceUri, nil
+ }
+}
+
+func (c k8sServiceCatalog) resolvePodQuery(uri ResourceUri, outputFormat
string) (string, error) {
+ if pod, service, err :=
findPodAndReferenceServiceByPodLabels(c.Context, c.Client, uri.Namespace,
uri.Name); err != nil {
+ return "", err
+ } else {
+ if service != nil {
+ if serviceUri, err2 := resolveServiceUri(*service,
uri.GetCustomPort(), outputFormat); err2 != nil {
+ return "", err2
+ } else {
+ return serviceUri, nil
+ }
+ } else {
+ if podUri, err3 := resolvePodUri(pod, "",
uri.GetCustomPort(), outputFormat); err3 != nil {
+ return "", err3
+ } else {
+ return podUri, nil
+ }
+ }
Review Comment:
```suggestion
if serviceUri, err := resolveServiceUri(*service,
uri.GetCustomPort(), outputFormat); err != nil {
return "", err
} else {
return serviceUri, nil
}
} else {
if podUri, err := resolvePodUri(pod, "",
uri.GetCustomPort(), outputFormat); err != nil {
return "", err
} else {
return podUri, nil
}
}
```
Don't need to create new vars in this case.
##########
controllers/discovery/discovery.go:
##########
@@ -0,0 +1,196 @@
+package discovery
+
+import (
+ "context"
+ "fmt"
+ v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+const (
+ KnativeScheme = "knative"
+ KubernetesScheme = "kubernetes"
+ OpenshiftScheme = "openshift"
+
+ // CustomPortLabel well known label name to select a particular target
port
+ CustomPortLabel = "custom-port"
+
+ // KubernetesDNSAddress use this output format with kubernetes services
and pods to resolve to the corresponding
+ // kubernetes DNS name. see:
https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/
+ KubernetesDNSAddress = "KubernetesDNSAddress"
+
+ // KubernetesIPAddress default format, resolves objects addresses to
the corresponding cluster IP address.
+ KubernetesIPAddress = "KubernetesIPAddress"
+
+ // kubernetes groups
+ kubernetesServices = "kubernetes:services.v1"
+ kubernetesPods = "kubernetes:pods.v1"
+ kubernetesDeployments = "kubernetes:deployments.v1.apps"
+ kubernetesStatefulSets = "kubernetes:statefulsets.v1.apps"
+ kubernetesIngresses = "kubernetes:ingresses.v1.networking.k8s.io"
+
+ // knative groups
+ knativeServices = "knative:services.v1.serving.knative.dev"
+
+ // openshift groups
+ openshiftRoutes = "openshift:routes.v1.route.openshift.io"
+ openshiftDeploymentConfigs =
"openshift:deploymentconfigs.v1.apps.openshift.io"
+)
+
+type ResourceUri struct {
+ Scheme string
+ GVK v1.GroupVersionKind
+ Namespace string
+ Name string
+ CustomLabels map[string]string
+}
+
+// ServiceCatalog is the entry point to resolve resource addresses given a
ResourceUri.
+type ServiceCatalog interface {
+ // Query returns the address corresponding to the resource identified
by the uri. In the case of services or pods,
+ // the outputFormat can be used to determine the type of address to
calculate.
+ // If the outputFormat is KubernetesDNSAddress, the returned value for
a service will be like this:
http://my-service.my-namespace.svc.cluster.local:8080,
+ // and the returned value for pod will be like this:
http://10-244-1-135.my-namespace.pod.cluster.local:8080.
+ // If the outputFormat is KubernetesIPAddress, the returned value for
pods and services, and other resource types,
+ // will be like this: http://10.245.1.132:8080
+ Query(uri ResourceUri, outputFormat string) (string, error)
+}
+
+type sonataFlowServiceCatalog struct {
+ kubernetesCatalog ServiceCatalog
+ knativeCatalog ServiceCatalog
+ openshiftCatalog ServiceCatalog
+}
+
+// NewServiceCatalog returns a new ServiceCatalog configured to resolve
kubernetes, knative, and openshift resource addresses.
+func NewServiceCatalog(ctx context.Context, cli client.Client) ServiceCatalog {
+ return &sonataFlowServiceCatalog{
+ kubernetesCatalog: newK8SServiceCatalog(ctx, cli),
+ knativeCatalog: newKnServiceCatalog(ctx, cli),
+ }
+}
+
+func (c *sonataFlowServiceCatalog) Query(uri ResourceUri, outputFormat string)
(string, error) {
+ switch uri.Scheme {
+ case KubernetesScheme:
+ return c.kubernetesCatalog.Query(uri, outputFormat)
+ case KnativeScheme:
+ return "", fmt.Errorf("knative service discovery is not yet
implemened")
+ case OpenshiftScheme:
+ return "", fmt.Errorf("openshift service discovery is not yet
implemented")
+ default:
+ return "", fmt.Errorf("unknonw scheme was provided for service
discovery: %s", uri.Scheme)
Review Comment:
```suggestion
return "", fmt.Errorf("unknown scheme was provided for service
discovery: %s", uri.Scheme)
```
##########
controllers/discovery/port_utils.go:
##########
@@ -0,0 +1,108 @@
+package discovery
+
+import (
+ corev1 "k8s.io/api/core/v1"
+)
+
+const (
+ httpProtocol = "http"
+ httpsProtocol = "https"
+ webProtocol = "web"
+ securePort = 443
+ appSecurePort = 8443
+)
+
+func isSecurePort(port int) bool {
+ return port == securePort || port == appSecurePort
+}
+
+// findServicePort returns the best suited ServicePort to connect to a service.
+// The optional customPort can be used to determine which port should be used
for the communication, when not set,
+// the best suited port is returned. For this last, a secure port has
precedence over a no-secure port.
+func findServicePort(servicePorts []corev1.ServicePort, customPort string)
*corev1.ServicePort {
+ // customPort is provided and is configured?
+ if len(customPort) > 0 {
+ if result := findServicePortByName(servicePorts, customPort);
result != nil {
+ return result
+ }
+ }
+ // has ssl port?
+ if result := findServicePortByName(servicePorts, httpsProtocol); result
!= nil {
+ return result
+ }
+ // has http port?
+ if result := findServicePortByName(servicePorts, httpProtocol); result
!= nil {
+ return result
+ }
+ // has web port?
+ if result := findServicePortByName(servicePorts, webProtocol); result
!= nil {
+ return result
+ }
+ // by definition a service must always have at least one port, get the
first port.
+ return &servicePorts[0]
+}
+
+func findServicePortByName(ports []corev1.ServicePort, name string)
*corev1.ServicePort {
+ for _, servicePort := range ports {
+ if name == servicePort.Name {
+ return &servicePort
+ }
+ }
+ return nil
+}
+
+func isServicePortSecure(servicePort corev1.ServicePort) bool {
+ return servicePort.Name == httpsProtocol ||
isSecurePort(int(servicePort.Port))
+}
+
+// findContainerPort returns the best suited PortPort to connect to a pod, or
nil if the pod has no ports at all.
+// The optional customPort can be used to determine which port should be used
for the communication, when not set,
+// the best suited port is returned. For this last, a secure port has
precedence over a non-secure port.
+func findContainerPort(containerPorts []corev1.ContainerPort, customPort
string) *corev1.ContainerPort {
+ // containers with no ports are permitted, we must check.
+ if len(containerPorts) == 0 {
+ return nil
+ }
+ // customPort is provided and configured?
+ if len(customPort) > 0 {
+ if result := findContainerPortByName(containerPorts,
customPort); result != nil {
+ return result
+ }
+ }
+ // has ssl port?
+ if result := findContainerPortByName(containerPorts, httpsProtocol);
result != nil {
+ return result
+ }
+ // has http port?
+ if result := findContainerPortByName(containerPorts, httpProtocol);
result != nil {
+ return result
+ }
+ // has web port?
+ if result := findContainerPortByName(containerPorts, webProtocol);
result != nil {
+ return result
+ }
+ // when defined, a ContainerPort must always have containerPort
(Required value)
+ return &containerPorts[0]
+}
+
+func findContainerPortByName(containerPorts []corev1.ContainerPort, name
string) *corev1.ContainerPort {
+ for _, containerPort := range containerPorts {
+ if name == containerPort.Name {
+ return &containerPort
+ }
+ }
+ return nil
+}
+
+func findContainerByName(containers []corev1.Container, name string)
*corev1.Container {
Review Comment:
You can use this:
https://github.com/apache/incubator-kie-kogito-serverless-operator/blob/main/utils/kubernetes/deployment.go#L102
##########
controllers/discovery/uri_utils.go:
##########
@@ -0,0 +1,94 @@
+package discovery
+
+import (
+ "fmt"
+ corev1 "k8s.io/api/core/v1"
+ "strings"
+)
+
+func resolveServiceUri(service corev1.Service, customPort string, outputFormat
string) (string, error) {
+ var port int
+ var protocol string
+ var host string
+ var err error = nil
+
+ switch service.Spec.Type {
+ case corev1.ServiceTypeExternalName:
+ // ExternalName may not work properly with SSL:
+ //
https://kubernetes.io/docs/concepts/services-networking/service/#externalname
+ protocol = httpProtocol
+ host = service.Spec.ExternalName
+ port = 80
+ case corev1.ServiceTypeClusterIP:
+ protocol, host, port =
resolveClusterIPOrTypeNodeServiceUriParams(service, customPort)
+ case corev1.ServiceTypeNodePort:
+ protocol, host, port =
resolveClusterIPOrTypeNodeServiceUriParams(service, customPort)
+ case corev1.ServiceTypeLoadBalancer:
+ err = fmt.Errorf("%s type is not yet supported",
service.Spec.Type)
+ default:
+ err = fmt.Errorf("%s type is not yet supported",
service.Spec.Type)
+ }
+ if err != nil {
+ return "", err
+ }
+ if outputFormat == KubernetesDNSAddress {
+ return buildKubernetesServiceDNSUri(protocol,
service.Namespace, service.Name, port), nil
+ } else {
+ return buildURI(protocol, host, port), nil
+ }
+}
+
+// resolveClusterIPOrTypeNodeServiceUriParams returns the uri parameters for a
service of type ClusterIP or TypeNode.
+// The optional customPort can be used to determine which port should be used
for the communication, when not set,
+// the best suited port is returned. For this last, a secure port has
precedence over a no-secure port.
+func resolveClusterIPOrTypeNodeServiceUriParams(service corev1.Service,
customPort string) (protocol string, host string, port int) {
+ servicePort := findServicePort(service.Spec.Ports, customPort)
+ if isServicePortSecure(*servicePort) {
+ protocol = httpsProtocol
+ } else {
+ protocol = httpProtocol
+ }
+ host = service.Spec.ClusterIP
+ port = int(servicePort.Port)
+ return protocol, host, port
+}
+
+func resolvePodUri(pod *corev1.Pod, customContainer, customPort string,
outputFormat string) (string, error) {
+ if podIp := pod.Status.PodIP; len(podIp) == 0 {
+ return "", fmt.Errorf("pod: %s in namespace: %s, has no
allocated address", pod.Name, pod.Namespace)
+ } else {
+ var container *corev1.Container
+ if len(customContainer) > 0 {
+ container = findContainerByName(pod.Spec.Containers,
customContainer)
+ }
+ if container == nil {
+ container = &pod.Spec.Containers[0]
+ }
+ if containerPort := findContainerPort(container.Ports,
customPort); containerPort == nil {
+ return "", fmt.Errorf("no container port was found for
pod: %s in namespace: %s", pod.Name, pod.Namespace)
+ } else {
+ protocol := httpProtocol
+ if isSecure := isContainerPortSecure(*containerPort);
isSecure {
+ protocol = httpsProtocol
+ }
+ if outputFormat == KubernetesDNSAddress {
+ return buildKubernetesPodDNSUri(protocol,
pod.Namespace, podIp, int(containerPort.ContainerPort)), nil
+ } else {
+ return buildURI(protocol, podIp,
int(containerPort.ContainerPort)), nil
+ }
+ }
+ }
+}
+
+func buildURI(scheme string, host string, port int) string {
+ return fmt.Sprintf("%s://%s:%v", scheme, host, port)
+}
+
+func buildKubernetesServiceDNSUri(scheme string, namespace string, name
string, port int) string {
+ return fmt.Sprintf("%s://%s.%s.svc.cluster.local:%v", scheme, name,
namespace, port)
+}
+
+func buildKubernetesPodDNSUri(scheme string, namespace string, podIP string,
port int) string {
+ hyphenedIp := strings.Replace(podIP, ".", "-", -1)
+ return fmt.Sprintf("%s://%s.%s.pod.cluster.local:%v", scheme,
hyphenedIp, namespace, port)
Review Comment:
Same thing here. We can stop at `pod`. Do a few tests on your end, I think
it will work fine.
##########
controllers/discovery/queries.go:
##########
@@ -0,0 +1,93 @@
+package discovery
+
+import (
+ "context"
+ appsv1 "k8s.io/api/apps/v1"
+ corev1 "k8s.io/api/core/v1"
+ networkingV1 "k8s.io/api/networking/v1"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+const podTemplateHashLabel = "pod-template-hash"
+
+// findService finds a service by name in the given namespace.
+func findService(ctx context.Context, cli client.Client, namespace string,
name string) (*corev1.Service, error) {
+ service := &corev1.Service{}
+ if err := cli.Get(ctx, buildObjectKey(namespace, name), service); err
!= nil {
+ return nil, err
+ }
+ return service, nil
+}
+
+// findServiceByLabels finds a service by a set of matching labels in the
given namespace.
+func findServiceByLabels(ctx context.Context, cli client.Client, namespace
string, labels map[string]string) (*corev1.ServiceList, error) {
+ serviceList := &corev1.ServiceList{}
+ if err := cli.List(ctx, serviceList, client.InNamespace(namespace),
client.MatchingLabels(labels)); err != nil {
+ return nil, err
+ }
+ return serviceList, nil
+}
+
+// findPod finds a pod by name in the given namespace.
+func findPod(ctx context.Context, cli client.Client, namespace string, name
string) (*corev1.Pod, error) {
+ pod := &corev1.Pod{}
+ if err := cli.Get(ctx, buildObjectKey(namespace, name), pod); err !=
nil {
+ return nil, err
+ }
+ return pod, nil
+}
+
+// findPodAndReferenceServiceByPodLabels finds a pod by name in the given
namespace at the same time it piggybacks it's
+// reference service if any. The reference service is determined by using the
same set of labels as the pod.
+func findPodAndReferenceServiceByPodLabels(ctx context.Context, cli
client.Client, namespace string, podName string) (*corev1.Pod, *corev1.Service,
error) {
+ if pod, err := findPod(ctx, cli, namespace, podName); err != nil {
+ return nil, nil, err
+ } else {
+ queryLabels := pod.Labels
+ // pod-template-hash is pod dependent, mustn't be considered.
+ delete(queryLabels, podTemplateHashLabel)
+ if len(queryLabels) > 0 {
+ // check if we have a defined reference service
+ if serviceList, err2 := findServiceByLabels(ctx, cli,
namespace, queryLabels); err2 != nil {
+ return nil, nil, err
Review Comment:
```suggestion
if serviceList, err := findServiceByLabels(ctx, cli,
namespace, queryLabels); err != nil {
return nil, nil, err
```
Or you return `err2`.
##########
controllers/discovery/queries.go:
##########
@@ -0,0 +1,93 @@
+package discovery
+
+import (
+ "context"
+ appsv1 "k8s.io/api/apps/v1"
+ corev1 "k8s.io/api/core/v1"
+ networkingV1 "k8s.io/api/networking/v1"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+const podTemplateHashLabel = "pod-template-hash"
+
+// findService finds a service by name in the given namespace.
+func findService(ctx context.Context, cli client.Client, namespace string,
name string) (*corev1.Service, error) {
+ service := &corev1.Service{}
+ if err := cli.Get(ctx, buildObjectKey(namespace, name), service); err
!= nil {
+ return nil, err
+ }
+ return service, nil
+}
+
+// findServiceByLabels finds a service by a set of matching labels in the
given namespace.
+func findServiceByLabels(ctx context.Context, cli client.Client, namespace
string, labels map[string]string) (*corev1.ServiceList, error) {
+ serviceList := &corev1.ServiceList{}
+ if err := cli.List(ctx, serviceList, client.InNamespace(namespace),
client.MatchingLabels(labels)); err != nil {
+ return nil, err
+ }
+ return serviceList, nil
+}
+
+// findPod finds a pod by name in the given namespace.
+func findPod(ctx context.Context, cli client.Client, namespace string, name
string) (*corev1.Pod, error) {
+ pod := &corev1.Pod{}
+ if err := cli.Get(ctx, buildObjectKey(namespace, name), pod); err !=
nil {
+ return nil, err
+ }
+ return pod, nil
+}
+
+// findPodAndReferenceServiceByPodLabels finds a pod by name in the given
namespace at the same time it piggybacks it's
+// reference service if any. The reference service is determined by using the
same set of labels as the pod.
+func findPodAndReferenceServiceByPodLabels(ctx context.Context, cli
client.Client, namespace string, podName string) (*corev1.Pod, *corev1.Service,
error) {
+ if pod, err := findPod(ctx, cli, namespace, podName); err != nil {
+ return nil, nil, err
+ } else {
+ queryLabels := pod.Labels
+ // pod-template-hash is pod dependent, mustn't be considered.
+ delete(queryLabels, podTemplateHashLabel)
+ if len(queryLabels) > 0 {
+ // check if we have a defined reference service
+ if serviceList, err2 := findServiceByLabels(ctx, cli,
namespace, queryLabels); err2 != nil {
+ return nil, nil, err
+ } else if len(serviceList.Items) > 0 {
+ return pod, &serviceList.Items[0], nil
+ }
+ }
+ return pod, nil, nil
+ }
+}
+
+// findDeployment finds a deployment by name in the given namespace.
+func findDeployment(ctx context.Context, cli client.Client, namespace string,
name string) (*appsv1.Deployment, error) {
+ deployment := &appsv1.Deployment{}
+ if err := cli.Get(ctx, buildObjectKey(namespace, name), deployment);
err != nil {
+ return nil, err
+ }
+ return deployment, nil
+}
+
+// findStatefulSet finds a stateful set by name in the given namespace.
+func findStatefulSet(ctx context.Context, cli client.Client, namespace string,
name string) (*appsv1.StatefulSet, error) {
+ statefulSet := &appsv1.StatefulSet{}
+ if err := cli.Get(ctx, buildObjectKey(namespace, name), statefulSet);
err != nil {
+ return nil, err
+ }
+ return statefulSet, nil
+}
+
+// findIngress finds an ingress by name in the given namespace.
+func findIngress(ctx context.Context, cli client.Client, namespace string,
name string) (*networkingV1.Ingress, error) {
+ ingress := &networkingV1.Ingress{}
+ if err := cli.Get(ctx, buildObjectKey(namespace, name), ingress); err
!= nil {
+ return nil, err
+ }
+ return ingress, nil
+}
+
+func buildObjectKey(namespace string, name string) client.ObjectKey {
Review Comment:
You can use this instead:
https://github.com/kubernetes-sigs/controller-runtime/blob/main/pkg/client/interfaces.go#L35
##########
controllers/discovery/uri_utils.go:
##########
@@ -0,0 +1,94 @@
+package discovery
+
+import (
+ "fmt"
+ corev1 "k8s.io/api/core/v1"
+ "strings"
+)
+
+func resolveServiceUri(service corev1.Service, customPort string, outputFormat
string) (string, error) {
+ var port int
+ var protocol string
+ var host string
+ var err error = nil
+
+ switch service.Spec.Type {
+ case corev1.ServiceTypeExternalName:
+ // ExternalName may not work properly with SSL:
+ //
https://kubernetes.io/docs/concepts/services-networking/service/#externalname
+ protocol = httpProtocol
+ host = service.Spec.ExternalName
+ port = 80
+ case corev1.ServiceTypeClusterIP:
+ protocol, host, port =
resolveClusterIPOrTypeNodeServiceUriParams(service, customPort)
+ case corev1.ServiceTypeNodePort:
+ protocol, host, port =
resolveClusterIPOrTypeNodeServiceUriParams(service, customPort)
+ case corev1.ServiceTypeLoadBalancer:
+ err = fmt.Errorf("%s type is not yet supported",
service.Spec.Type)
+ default:
+ err = fmt.Errorf("%s type is not yet supported",
service.Spec.Type)
+ }
+ if err != nil {
+ return "", err
+ }
+ if outputFormat == KubernetesDNSAddress {
+ return buildKubernetesServiceDNSUri(protocol,
service.Namespace, service.Name, port), nil
+ } else {
+ return buildURI(protocol, host, port), nil
+ }
+}
+
+// resolveClusterIPOrTypeNodeServiceUriParams returns the uri parameters for a
service of type ClusterIP or TypeNode.
+// The optional customPort can be used to determine which port should be used
for the communication, when not set,
+// the best suited port is returned. For this last, a secure port has
precedence over a no-secure port.
+func resolveClusterIPOrTypeNodeServiceUriParams(service corev1.Service,
customPort string) (protocol string, host string, port int) {
+ servicePort := findServicePort(service.Spec.Ports, customPort)
+ if isServicePortSecure(*servicePort) {
+ protocol = httpsProtocol
+ } else {
+ protocol = httpProtocol
+ }
+ host = service.Spec.ClusterIP
+ port = int(servicePort.Port)
+ return protocol, host, port
+}
+
+func resolvePodUri(pod *corev1.Pod, customContainer, customPort string,
outputFormat string) (string, error) {
+ if podIp := pod.Status.PodIP; len(podIp) == 0 {
+ return "", fmt.Errorf("pod: %s in namespace: %s, has no
allocated address", pod.Name, pod.Namespace)
+ } else {
+ var container *corev1.Container
+ if len(customContainer) > 0 {
+ container = findContainerByName(pod.Spec.Containers,
customContainer)
+ }
+ if container == nil {
+ container = &pod.Spec.Containers[0]
+ }
+ if containerPort := findContainerPort(container.Ports,
customPort); containerPort == nil {
+ return "", fmt.Errorf("no container port was found for
pod: %s in namespace: %s", pod.Name, pod.Namespace)
+ } else {
+ protocol := httpProtocol
+ if isSecure := isContainerPortSecure(*containerPort);
isSecure {
+ protocol = httpsProtocol
+ }
+ if outputFormat == KubernetesDNSAddress {
+ return buildKubernetesPodDNSUri(protocol,
pod.Namespace, podIp, int(containerPort.ContainerPort)), nil
+ } else {
+ return buildURI(protocol, podIp,
int(containerPort.ContainerPort)), nil
+ }
+ }
+ }
+}
+
+func buildURI(scheme string, host string, port int) string {
+ return fmt.Sprintf("%s://%s:%v", scheme, host, port)
+}
+
+func buildKubernetesServiceDNSUri(scheme string, namespace string, name
string, port int) string {
+ return fmt.Sprintf("%s://%s.%s.svc.cluster.local:%v", scheme, name,
namespace, port)
Review Comment:
you can remove the `.cluster.local`. It might not work with headless
services. Just the Name and Namespace should resolve correctly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]