This is an automated email from the ASF dual-hosted git repository.

astefanutti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 3a3b0bcf227130c57d17704edc65e56bd908a10b
Author: Antonin Stefanutti <[email protected]>
AuthorDate: Thu Mar 4 18:47:31 2021 +0100

    fix: Do not sink events when the operator lacks Event permissions
---
 go.mod                       |  1 +
 pkg/cmd/operator/operator.go | 23 +++++++++-------
 pkg/event/broadcaster.go     | 62 ++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 76 insertions(+), 10 deletions(-)

diff --git a/go.mod b/go.mod
index a8ded7a..69f7bea 100644
--- a/go.mod
+++ b/go.mod
@@ -48,6 +48,7 @@ require (
        k8s.io/apimachinery v0.19.8
        k8s.io/client-go v12.0.0+incompatible
        k8s.io/gengo v0.0.0-20200428234225-8167cfdcfc14
+       k8s.io/klog/v2 v2.2.0
        knative.dev/eventing v0.18.0
        knative.dev/pkg v0.0.0-20200922164940-4bf40ad82aab
        knative.dev/serving v0.18.0
diff --git a/pkg/cmd/operator/operator.go b/pkg/cmd/operator/operator.go
index 7b07ef6..1938d28 100644
--- a/pkg/cmd/operator/operator.go
+++ b/pkg/cmd/operator/operator.go
@@ -43,6 +43,7 @@ import (
        "github.com/apache/camel-k/pkg/apis"
        "github.com/apache/camel-k/pkg/client"
        "github.com/apache/camel-k/pkg/controller"
+       "github.com/apache/camel-k/pkg/event"
        "github.com/apache/camel-k/pkg/install"
        "github.com/apache/camel-k/pkg/platform"
        "github.com/apache/camel-k/pkg/util/defaults"
@@ -115,20 +116,23 @@ func Run(healthPort, monitoringPort int32) {
        // so that we can check the operator has been granted permission to 
create
        // Events. This is required for the operator to be installable by 
standard
        // admin users, that are not granted create permission on Events by 
default.
-       eventBroadcaster := record.NewBroadcaster()
+       broadcaster := record.NewBroadcaster()
        // nolint: gocritic
-       if ok, err := kubernetes.CheckPermission(context.TODO(), c, 
corev1.GroupName, "events", namespace, "", "create"); err != nil {
-               log.Error(err, "cannot check permissions for configuring event 
broadcaster")
-       } else if !ok {
-               log.Info("Event broadcasting to Kubernetes is disabled because 
of missing permissions to create events")
+       if ok, err := kubernetes.CheckPermission(context.TODO(), c, 
corev1.GroupName, "events", namespace, "", "create"); err != nil || !ok {
+               // Do not sink Events to the server as they'll be rejected
+               broadcaster = event.NewSinkLessBroadcaster(broadcaster)
+               if err != nil {
+                       log.Error(err, "cannot check permissions for 
configuring event broadcaster")
+               } else if !ok {
+                       log.Info("Event broadcasting to Kubernetes is disabled 
because of missing permissions to create events")
+               }
        } else {
-               
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: 
c.CoreV1().Events(namespace)})
+               
broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: 
c.CoreV1().Events(namespace)})
        }
 
-       // Create a new Cmd to provide shared dependencies and start components
        mgr, err := ctrl.NewManager(cfg, ctrl.Options{
                Namespace:              namespace,
-               EventBroadcaster:       eventBroadcaster,
+               EventBroadcaster:       broadcaster,
                HealthProbeBindAddress: ":" + strconv.Itoa(int(healthPort)),
                MetricsBindAddress:     ":" + strconv.Itoa(int(monitoringPort)),
        })
@@ -165,13 +169,12 @@ func Run(healthPort, monitoringPort int32) {
 
        log.Info("Starting the Cmd.")
 
-       // Start the Cmd
        if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
                log.Error(err, "manager exited non-zero")
                os.Exit(1)
        }
 
-       eventBroadcaster.Shutdown()
+       broadcaster.Shutdown()
 }
 
 // getWatchNamespace returns the Namespace the operator should be watching for 
changes
diff --git a/pkg/event/broadcaster.go b/pkg/event/broadcaster.go
new file mode 100644
index 0000000..fd8f5be
--- /dev/null
+++ b/pkg/event/broadcaster.go
@@ -0,0 +1,62 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package event
+
+import (
+       corev1 "k8s.io/api/core/v1"
+       "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/watch"
+       "k8s.io/client-go/tools/record"
+       "k8s.io/klog/v2"
+)
+
+type sinkLessBroadcaster struct {
+       broadcaster record.EventBroadcaster
+}
+
+func (s sinkLessBroadcaster) StartEventWatcher(eventHandler 
func(*corev1.Event)) watch.Interface {
+       return s.broadcaster.StartEventWatcher(eventHandler)
+}
+
+func (s sinkLessBroadcaster) StartRecordingToSink(sink record.EventSink) 
watch.Interface {
+       return watch.NewEmptyWatch()
+}
+
+func (s sinkLessBroadcaster) StartLogging(logf func(format string, args 
...interface{})) watch.Interface {
+       return s.broadcaster.StartLogging(logf)
+}
+
+func (s sinkLessBroadcaster) StartStructuredLogging(verbosity klog.Level) 
watch.Interface {
+       return s.broadcaster.StartStructuredLogging(verbosity)
+}
+
+func (s sinkLessBroadcaster) NewRecorder(scheme *runtime.Scheme, source 
corev1.EventSource) record.EventRecorder {
+       return s.broadcaster.NewRecorder(scheme, source)
+}
+
+func (s sinkLessBroadcaster) Shutdown() {
+       s.broadcaster.Shutdown()
+}
+
+var _ record.EventBroadcaster = &sinkLessBroadcaster{}
+
+func NewSinkLessBroadcaster(broadcaster record.EventBroadcaster) 
record.EventBroadcaster {
+       return &sinkLessBroadcaster{
+               broadcaster: broadcaster,
+       }
+}

Reply via email to