This is an automated email from the ASF dual-hosted git repository. astefanutti pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 3a3b0bcf227130c57d17704edc65e56bd908a10b Author: Antonin Stefanutti <[email protected]> AuthorDate: Thu Mar 4 18:47:31 2021 +0100 fix: Do not sink events when the operator lacks Event permissions --- go.mod | 1 + pkg/cmd/operator/operator.go | 23 +++++++++------- pkg/event/broadcaster.go | 62 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 76 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index a8ded7a..69f7bea 100644 --- a/go.mod +++ b/go.mod @@ -48,6 +48,7 @@ require ( k8s.io/apimachinery v0.19.8 k8s.io/client-go v12.0.0+incompatible k8s.io/gengo v0.0.0-20200428234225-8167cfdcfc14 + k8s.io/klog/v2 v2.2.0 knative.dev/eventing v0.18.0 knative.dev/pkg v0.0.0-20200922164940-4bf40ad82aab knative.dev/serving v0.18.0 diff --git a/pkg/cmd/operator/operator.go b/pkg/cmd/operator/operator.go index 7b07ef6..1938d28 100644 --- a/pkg/cmd/operator/operator.go +++ b/pkg/cmd/operator/operator.go @@ -43,6 +43,7 @@ import ( "github.com/apache/camel-k/pkg/apis" "github.com/apache/camel-k/pkg/client" "github.com/apache/camel-k/pkg/controller" + "github.com/apache/camel-k/pkg/event" "github.com/apache/camel-k/pkg/install" "github.com/apache/camel-k/pkg/platform" "github.com/apache/camel-k/pkg/util/defaults" @@ -115,20 +116,23 @@ func Run(healthPort, monitoringPort int32) { // so that we can check the operator has been granted permission to create // Events. This is required for the operator to be installable by standard // admin users, that are not granted create permission on Events by default. - eventBroadcaster := record.NewBroadcaster() + broadcaster := record.NewBroadcaster() // nolint: gocritic - if ok, err := kubernetes.CheckPermission(context.TODO(), c, corev1.GroupName, "events", namespace, "", "create"); err != nil { - log.Error(err, "cannot check permissions for configuring event broadcaster") - } else if !ok { - log.Info("Event broadcasting to Kubernetes is disabled because of missing permissions to create events") + if ok, err := kubernetes.CheckPermission(context.TODO(), c, corev1.GroupName, "events", namespace, "", "create"); err != nil || !ok { + // Do not sink Events to the server as they'll be rejected + broadcaster = event.NewSinkLessBroadcaster(broadcaster) + if err != nil { + log.Error(err, "cannot check permissions for configuring event broadcaster") + } else if !ok { + log.Info("Event broadcasting to Kubernetes is disabled because of missing permissions to create events") + } } else { - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.CoreV1().Events(namespace)}) + broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.CoreV1().Events(namespace)}) } - // Create a new Cmd to provide shared dependencies and start components mgr, err := ctrl.NewManager(cfg, ctrl.Options{ Namespace: namespace, - EventBroadcaster: eventBroadcaster, + EventBroadcaster: broadcaster, HealthProbeBindAddress: ":" + strconv.Itoa(int(healthPort)), MetricsBindAddress: ":" + strconv.Itoa(int(monitoringPort)), }) @@ -165,13 +169,12 @@ func Run(healthPort, monitoringPort int32) { log.Info("Starting the Cmd.") - // Start the Cmd if err := mgr.Start(signals.SetupSignalHandler()); err != nil { log.Error(err, "manager exited non-zero") os.Exit(1) } - eventBroadcaster.Shutdown() + broadcaster.Shutdown() } // getWatchNamespace returns the Namespace the operator should be watching for changes diff --git a/pkg/event/broadcaster.go b/pkg/event/broadcaster.go new file mode 100644 index 0000000..fd8f5be --- /dev/null +++ b/pkg/event/broadcaster.go @@ -0,0 +1,62 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package event + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" +) + +type sinkLessBroadcaster struct { + broadcaster record.EventBroadcaster +} + +func (s sinkLessBroadcaster) StartEventWatcher(eventHandler func(*corev1.Event)) watch.Interface { + return s.broadcaster.StartEventWatcher(eventHandler) +} + +func (s sinkLessBroadcaster) StartRecordingToSink(sink record.EventSink) watch.Interface { + return watch.NewEmptyWatch() +} + +func (s sinkLessBroadcaster) StartLogging(logf func(format string, args ...interface{})) watch.Interface { + return s.broadcaster.StartLogging(logf) +} + +func (s sinkLessBroadcaster) StartStructuredLogging(verbosity klog.Level) watch.Interface { + return s.broadcaster.StartStructuredLogging(verbosity) +} + +func (s sinkLessBroadcaster) NewRecorder(scheme *runtime.Scheme, source corev1.EventSource) record.EventRecorder { + return s.broadcaster.NewRecorder(scheme, source) +} + +func (s sinkLessBroadcaster) Shutdown() { + s.broadcaster.Shutdown() +} + +var _ record.EventBroadcaster = &sinkLessBroadcaster{} + +func NewSinkLessBroadcaster(broadcaster record.EventBroadcaster) record.EventBroadcaster { + return &sinkLessBroadcaster{ + broadcaster: broadcaster, + } +}
