This is an automated email from the ASF dual-hosted git repository.
zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git
The following commit(s) were added to refs/heads/master by this push:
new 0b9f4adf feat: Improve the log library and optimize its content. (#812)
0b9f4adf is described below
commit 0b9f4adf917b2a12d7ed0d661ead93a99d0f20bc
Author: Jian Zhong <[email protected]>
AuthorDate: Sat Nov 8 00:36:28 2025 +0800
feat: Improve the log library and optimize its content. (#812)
---
.asf.yaml | 3 +-
Dockerfile | 22 --
README.md | 14 +-
.../dubbo-discovery/templates/clusterrole.yaml | 5 -
pkg/cmd/cmd.go | 13 +-
pkg/config/schema/collection/schema.go | 2 +-
pkg/kube/client.go | 5 +-
pkg/kube/controllers/queue.go | 54 ++-
pkg/kube/krt/helpers.go | 8 +-
pkg/log/deduplicator.go | 204 +++++++++++
pkg/log/formatter.go | 238 +++++++++++++
pkg/log/interceptor.go | 133 +++++++
pkg/log/klog_formatter.go | 156 +++++++++
pkg/log/klog_interceptor.go | 19 +
pkg/log/log.go | 382 +++++++++++++++++++++
pkg/log/pretty.go | 195 +++++++++++
sail/cmd/sail-discovery/app/cmd.go | 4 +-
sail/pkg/bootstrap/certcontroller.go | 34 +-
sail/pkg/bootstrap/configcontroller.go | 14 +-
sail/pkg/bootstrap/discovery.go | 6 -
sail/pkg/bootstrap/dubbo_ca.go | 76 ++--
sail/pkg/bootstrap/mesh.go | 15 +-
sail/pkg/bootstrap/proxylessinjector.go | 12 +-
sail/pkg/bootstrap/server.go | 85 +++--
sail/pkg/bootstrap/servicecontroller.go | 6 +-
sail/pkg/bootstrap/validation.go | 6 +-
sail/pkg/bootstrap/webhook.go | 10 +-
sail/pkg/config/kube/crdclient/client.go | 28 +-
sail/pkg/leaderelection/leaderelection.go | 12 +-
sail/pkg/model/cluster_local.go | 4 +-
sail/pkg/model/context.go | 2 -
sail/pkg/model/endpointshards.go | 20 +-
sail/pkg/model/push_context.go | 8 +-
sail/pkg/model/typed_xds_cache.go | 4 +-
sail/pkg/model/xds_cache.go | 6 +-
sail/pkg/networking/grpcgen/cds.go | 6 +-
sail/pkg/server/instance.go | 10 +-
sail/pkg/serviceregistry/aggregate/controller.go | 3 +-
.../kube/controller/endpointslice.go | 20 +-
.../kube/controller/multicluster.go | 5 +-
sail/pkg/xds/discovery.go | 22 +-
security/pkg/pki/ca/ca.go | 22 +-
{test => tests}/grpc-proxyless/.dockerignore | 0
{test => tests}/grpc-proxyless/.gitignore | 0
{test => tests}/grpc-proxyless/Dockerfile.consumer | 0
{test => tests}/grpc-proxyless/Dockerfile.producer | 0
{test => tests}/grpc-proxyless/README.md | 0
{test => tests}/grpc-proxyless/consumer/main.go | 0
{test => tests}/grpc-proxyless/generate-proto.sh | 0
{test => tests}/grpc-proxyless/go.mod | 0
{test => tests}/grpc-proxyless/go.sum | 0
{test => tests}/grpc-proxyless/producer/main.go | 0
{test => tests}/grpc-proxyless/proto/echo.proto | 0
{test => tests}/grpc-proxyless/proto/gen.sh | 0
{test => tests}/grpc-proxyless/test-commands.sh | 0
{test => tests}/grpc-proxyless/test.sh | 0
{security/tools => tools}/generate_cert/main.go | 0
{security/tools => tools}/generate_csr/main.go | 0
58 files changed, 1599 insertions(+), 294 deletions(-)
diff --git a/.asf.yaml b/.asf.yaml
index d4c12aaa..d939204f 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -21,7 +21,7 @@ notifications:
github:
homepage: https://dubbo.apache.org/
- description: "The Dubbo Kubernetes integration."
+ description: "Proxyless service mesh for Dubbo microservices on Kubernetes -
zero-latency communication, traffic management, and security without sidecars."
features:
# Enable wiki for documentation
wiki: false
@@ -39,4 +39,3 @@ github:
- microservices
- service-mesh
- proxyless
- - ai
diff --git a/Dockerfile b/Dockerfile
deleted file mode 100644
index eb00cedd..00000000
--- a/Dockerfile
+++ /dev/null
@@ -1,22 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-FROM gcr.io/distroless/static:debug AS discovery
-COPY sail-discovery /usr/local/bin/sail-discovery
-ENTRYPOINT ["/usr/local/bin/sail-discovery"]
-
-FROM gcr.io/distroless/static:debug AS proxy
-COPY sail-agent /usr/local/bin/sail-agent
-ENTRYPOINT ["/usr/local/bin/sail-agent"]
\ No newline at end of file
diff --git a/README.md b/README.md
index fc5687a4..7bb4e052 100644
--- a/README.md
+++ b/README.md
@@ -6,23 +6,27 @@ Dubbo Kubernetes
[](https://codecov.io/gh/apache/dubbo-kubernetes)

-Provides support for building and deploying Dubbo applications in various
environments, including Kubernetes and Alibaba Cloud ACK.
+A cloud-native **proxyless service mesh** platform for Dubbo microservices,
enabling zero-latency inter-service communication across multiple protocols,
advanced traffic management, and enterprise-grade security without sidecar
proxies. Built on Istio's control plane architecture with native Kubernetes
integration, it delivers service mesh capabilities with minimal resource
overhead and maximum performance.
+
+## Project Structure
-## Repositories
The main repositories of Dubbo on Kubernetes include:
- **dubboctl** — The command-line management tool that provides control plane
management, development framework scaffolding, and application deployment.
- **dubbod** — The dubbo control plane. It is built on Istio to implement a
proxyless service mesh and includes the following components:
- - **sail** - Runtime proxy configuration.
- - **aegis** - Certificate issuance and rotation.
- - **gear** - Validation, aggregation, transformation, and distribution of
Dubbo configuration.
+ - **sail** - Runtime proxy configuration and XDS server
+ - **aegis** - Certificate issuance and rotation
+ - **gear** - Validation, aggregation, transformation, and distribution of
Dubbo configuration
- **operator**: Provides user-friendly options to operate the Dubbo proxyless
service mesh.
## Quick Start
+
Please refer to [official
website](https://cn.dubbo.apache.org/zh-cn/overview/home/)
## Contributing
+
Refer to
[CONTRIBUTING.md](https://github.com/apache/dubbo-kubernetes/blob/master/CONTRIBUTING.md)
## License
+
Apache License 2.0, see
[LICENSE](https://github.com/apache/dubbo-kubernetes/blob/master/LICENSE).
diff --git
a/manifests/charts/dubbo-control/dubbo-discovery/templates/clusterrole.yaml
b/manifests/charts/dubbo-control/dubbo-discovery/templates/clusterrole.yaml
index d800d1f6..d3836eb1 100644
--- a/manifests/charts/dubbo-control/dubbo-discovery/templates/clusterrole.yaml
+++ b/manifests/charts/dubbo-control/dubbo-discovery/templates/clusterrole.yaml
@@ -10,11 +10,6 @@ rules:
- apiGroups: ["admissionregistration.k8s.io"]
resources: ["validatingwebhookconfigurations"]
verbs: ["get", "list", "watch", "update"]
- - apiGroups: ["security.istio.io", "networking.istio.io"]
- verbs: ["get", "watch", "list"]
- resources: ["*"]
- - apiGroups: ["security.istio.io"]
- verbs: [ "get", "watch", "list", "update", "patch", "create", "delete" ]
resources: [ "authorizationpolicies/status" ]
- apiGroups: [""]
verbs: [ "get", "watch", "list", "update", "patch", "create", "delete" ]
diff --git a/pkg/cmd/cmd.go b/pkg/cmd/cmd.go
index 5d216b66..7751963a 100644
--- a/pkg/cmd/cmd.go
+++ b/pkg/cmd/cmd.go
@@ -21,11 +21,13 @@ import (
"context"
"flag"
"fmt"
- "github.com/spf13/cobra"
- "github.com/spf13/pflag"
"os"
"os/signal"
"syscall"
+
+ "github.com/apache/dubbo-kubernetes/pkg/log"
+ "github.com/spf13/cobra"
+ "github.com/spf13/pflag"
)
func AddFlags(rootCmd *cobra.Command) {
@@ -34,15 +36,18 @@ func AddFlags(rootCmd *cobra.Command) {
func PrintFlags(flags *pflag.FlagSet) {
flags.VisitAll(func(flag *pflag.Flag) {
- fmt.Printf("FLAG: --%s=%q\n", flag.Name, flag.Value)
+ log.Infof("FLAG: --%s=%q\n", flag.Name, flag.Value)
})
}
func WaitSignal(stop chan struct{}) {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
- <-sigs
+ sig := <-sigs
+ // Log the signal received
+ log.Infof("Received signal: %v, initiating graceful shutdown...", sig)
close(stop)
+ // Return immediately after closing stop channel to allow cleanup to
proceed
}
func WaitSignalFunc(cancel context.CancelCauseFunc) {
diff --git a/pkg/config/schema/collection/schema.go
b/pkg/config/schema/collection/schema.go
index 3ca6637a..05716dd2 100644
--- a/pkg/config/schema/collection/schema.go
+++ b/pkg/config/schema/collection/schema.go
@@ -365,7 +365,7 @@ func (s *schemaImpl) Equal(o Schema) bool {
s.ProtoPackage() == o.ProtoPackage()
}
-// FromKubernetesGVK converts a Kubernetes GVK to an Istio GVK
+// FromKubernetesGVK converts a Kubernetes GVK to an Dubbo GVK
func FromKubernetesGVK(in *schema.GroupVersionKind) config.GroupVersionKind {
return config.GroupVersionKind{
Group: in.Group,
diff --git a/pkg/kube/client.go b/pkg/kube/client.go
index 85a68452..a82cc317 100644
--- a/pkg/kube/client.go
+++ b/pkg/kube/client.go
@@ -20,6 +20,7 @@ package kube
import (
"context"
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"net/http"
"time"
@@ -362,9 +363,9 @@ func WaitForCacheSync(name string, stop <-chan struct{},
cacheSyncs ...cache.Inf
attempt := 0
defer func() {
if r {
- klog.Infof("sync complete: name=%s, time=%v", name,
time.Since(t0))
+ log.Infof("sync complete: name=%s, time=%v", name,
time.Since(t0))
} else {
- klog.Infof("sync failed: name=%s, time=%v", name,
time.Since(t0))
+ log.Infof("sync failed: name=%s, time=%v", name,
time.Since(t0))
}
}()
for {
diff --git a/pkg/kube/controllers/queue.go b/pkg/kube/controllers/queue.go
index 87e25a19..e435da16 100644
--- a/pkg/kube/controllers/queue.go
+++ b/pkg/kube/controllers/queue.go
@@ -19,10 +19,10 @@ package controllers
import (
"github.com/apache/dubbo-kubernetes/pkg/config"
+ dubbolog "github.com/apache/dubbo-kubernetes/pkg/log"
"go.uber.org/atomic"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
- "k8s.io/klog/v2"
)
type ReconcilerFn func(key types.NamespacedName) error
@@ -34,6 +34,7 @@ type Queue struct {
maxAttempts int
workFn func(key any) error
closed chan struct{}
+ log *dubbolog.Scope
}
func NewQueue(name string, options ...func(*Queue)) Queue {
@@ -56,7 +57,6 @@ func NewQueue(name string, options ...func(*Queue)) Queue {
},
)
}
- klog.Infof("controller=%v", q.name)
return q
}
@@ -72,34 +72,10 @@ func (q Queue) HasSynced() bool {
return q.initialSync.Load()
}
-func WithRateLimiter(r workqueue.TypedRateLimiter[any]) func(q *Queue) {
- return func(q *Queue) {
- q.queue = workqueue.NewTypedRateLimitingQueue[any](r)
- }
-}
-
-func WithMaxAttempts(n int) func(q *Queue) {
- return func(q *Queue) {
- q.maxAttempts = n
- }
-}
-
-func WithReconciler(f ReconcilerFn) func(q *Queue) {
- return func(q *Queue) {
- q.workFn = func(key any) error {
- return f(key.(types.NamespacedName))
- }
- }
-}
-
func (q Queue) ShutDownEarly() {
q.queue.ShutDown()
}
-type syncSignal struct{}
-
-var defaultSyncSignal = syncSignal{}
-
func (q Queue) processNextItem() bool {
// Wait until there is a new item in the working queue
key, quit := q.queue.Get()
@@ -133,7 +109,6 @@ func (q Queue) processNextItem() bool {
func (q Queue) Run(stop <-chan struct{}) {
defer q.queue.ShutDown()
- klog.Infof("starting")
q.queue.Add(defaultSyncSignal)
go func() {
// Process updates until we return false, which indicates the
queue is terminated
@@ -145,9 +120,32 @@ func (q Queue) Run(stop <-chan struct{}) {
case <-stop:
case <-q.closed:
}
- klog.Info("stopped")
}
func (q Queue) Closed() <-chan struct{} {
return q.closed
}
+
+func WithRateLimiter(r workqueue.TypedRateLimiter[any]) func(q *Queue) {
+ return func(q *Queue) {
+ q.queue = workqueue.NewTypedRateLimitingQueue[any](r)
+ }
+}
+
+func WithMaxAttempts(n int) func(q *Queue) {
+ return func(q *Queue) {
+ q.maxAttempts = n
+ }
+}
+
+func WithReconciler(f ReconcilerFn) func(q *Queue) {
+ return func(q *Queue) {
+ q.workFn = func(key any) error {
+ return f(key.(types.NamespacedName))
+ }
+ }
+}
+
+type syncSignal struct{}
+
+var defaultSyncSignal = syncSignal{}
diff --git a/pkg/kube/krt/helpers.go b/pkg/kube/krt/helpers.go
index 3ad642bc..12cd3c17 100644
--- a/pkg/kube/krt/helpers.go
+++ b/pkg/kube/krt/helpers.go
@@ -19,7 +19,7 @@ package krt
import (
"fmt"
- "k8s.io/klog/v2"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"reflect"
"strconv"
"strings"
@@ -149,16 +149,16 @@ func waitForCacheSync(name string, stop <-chan struct{},
collections ...<-chan s
t0 := time.Now()
defer func() {
if r {
- klog.Infof("sync complete: name=%s, time=%v", name,
time.Since(t0))
+ log.Infof("sync complete: name=%s, time=%v", name,
time.Since(t0))
} else {
- klog.Infof("sync failed: name=%s, time=%v", name,
time.Since(t0))
+ log.Infof("sync failed: name=%s, time=%v", name,
time.Since(t0))
}
}()
for _, col := range collections {
for {
select {
case <-t.C:
- klog.Infof("waiting for sync...: name=%s,
time=%v\n", name, time.Since(t0))
+ log.Infof("waiting for sync...: name=%s,
time=%v\n", name, time.Since(t0))
continue
case <-stop:
return false
diff --git a/pkg/log/deduplicator.go b/pkg/log/deduplicator.go
new file mode 100644
index 00000000..a5e73fed
--- /dev/null
+++ b/pkg/log/deduplicator.go
@@ -0,0 +1,204 @@
+package log
+
+import (
+ "fmt"
+ "sync"
+ "time"
+)
+
+// LogKey represents a unique key for deduplication
+type LogKey struct {
+ Scope string
+ Level string
+ Message string
+}
+
+// DedupEntry represents a log entry with deduplication info
+type DedupEntry struct {
+ FirstSeen time.Time
+ LastSeen time.Time
+ Count int64
+}
+
+// Deduplicator prevents duplicate log messages from flooding the output
+type Deduplicator struct {
+ entries map[LogKey]*DedupEntry
+ mu sync.RWMutex
+ window time.Duration // time window for deduplication
+ maxCount int64 // max count before forcing output
+ enabled bool
+ cleanupTicker *time.Ticker
+ stopCleanup chan bool
+}
+
+var (
+ globalDeduplicator *Deduplicator
+ dedupOnce sync.Once
+)
+
+// DefaultDeduplicationWindow is the default time window for deduplication
+const DefaultDeduplicationWindow = 5 * time.Second
+
+// DefaultMaxDeduplicationCount is the default max count before forcing output
+const DefaultMaxDeduplicationCount = 100
+
+// GetDeduplicator returns the global deduplicator instance
+func GetDeduplicator() *Deduplicator {
+ dedupOnce.Do(func() {
+ globalDeduplicator =
NewDeduplicator(DefaultDeduplicationWindow, DefaultMaxDeduplicationCount)
+ })
+ return globalDeduplicator
+}
+
+// NewDeduplicator creates a new deduplicator
+func NewDeduplicator(window time.Duration, maxCount int64) *Deduplicator {
+ d := &Deduplicator{
+ entries: make(map[LogKey]*DedupEntry),
+ window: window,
+ maxCount: maxCount,
+ enabled: true,
+ stopCleanup: make(chan bool),
+ }
+
+ // Start cleanup goroutine
+ d.cleanupTicker = time.NewTicker(window * 2)
+ go d.cleanup()
+
+ return d
+}
+
+// ShouldLog checks if a log should be output and updates the deduplication
state
+// Returns (shouldLog, count, summaryMessage)
+func (d *Deduplicator) ShouldLog(key LogKey) (bool, int64, string) {
+ if !d.enabled {
+ return true, 1, ""
+ }
+
+ d.mu.Lock()
+ defer d.mu.Unlock()
+
+ now := time.Now()
+ entry, exists := d.entries[key]
+
+ if !exists {
+ // First time seeing this log
+ d.entries[key] = &DedupEntry{
+ FirstSeen: now,
+ LastSeen: now,
+ Count: 1,
+ }
+ return true, 1, ""
+ }
+
+ // Update entry
+ entry.Count++
+ entry.LastSeen = now
+
+ // Check if we should output
+ timeSinceFirst := now.Sub(entry.FirstSeen)
+
+ // Force output if:
+ // 1. Time window has passed
+ // 2. Count exceeds max
+ if timeSinceFirst >= d.window || entry.Count >= d.maxCount {
+ count := entry.Count
+ delete(d.entries, key)
+
+ if count > 1 {
+ summary := fmt.Sprintf("(repeated %d times in %v)",
count, timeSinceFirst)
+ return true, count, summary
+ }
+ return true, 1, ""
+ }
+
+ // Suppress duplicate log
+ return false, entry.Count, ""
+}
+
+// Enable enables deduplication
+func (d *Deduplicator) Enable() {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ d.enabled = true
+}
+
+// Disable disables deduplication
+func (d *Deduplicator) Disable() {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ d.enabled = false
+ // Clear entries when disabled
+ d.entries = make(map[LogKey]*DedupEntry)
+}
+
+// IsEnabled returns whether deduplication is enabled
+func (d *Deduplicator) IsEnabled() bool {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+ return d.enabled
+}
+
+// SetWindow sets the deduplication time window
+func (d *Deduplicator) SetWindow(window time.Duration) {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ d.window = window
+ if d.cleanupTicker != nil {
+ d.cleanupTicker.Stop()
+ }
+ d.cleanupTicker = time.NewTicker(window * 2)
+}
+
+// SetMaxCount sets the maximum count before forcing output
+func (d *Deduplicator) SetMaxCount(maxCount int64) {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ d.maxCount = maxCount
+}
+
+// cleanup periodically removes old entries
+func (d *Deduplicator) cleanup() {
+ for {
+ select {
+ case <-d.cleanupTicker.C:
+ d.mu.Lock()
+ now := time.Now()
+ for key, entry := range d.entries {
+ // Remove entries that are older than 2x the
window
+ if now.Sub(entry.LastSeen) > d.window*2 {
+ delete(d.entries, key)
+ }
+ }
+ d.mu.Unlock()
+ case <-d.stopCleanup:
+ return
+ }
+ }
+}
+
+// Stop stops the cleanup goroutine
+func (d *Deduplicator) Stop() {
+ if d.cleanupTicker != nil {
+ d.cleanupTicker.Stop()
+ }
+ close(d.stopCleanup)
+}
+
+// Clear clears all deduplication entries
+func (d *Deduplicator) Clear() {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ d.entries = make(map[LogKey]*DedupEntry)
+}
+
+// Stats returns deduplication statistics
+func (d *Deduplicator) Stats() map[LogKey]int64 {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+
+ stats := make(map[LogKey]int64)
+ for key, entry := range d.entries {
+ stats[key] = entry.Count
+ }
+ return stats
+}
diff --git a/pkg/log/formatter.go b/pkg/log/formatter.go
new file mode 100644
index 00000000..7d6e0fa9
--- /dev/null
+++ b/pkg/log/formatter.go
@@ -0,0 +1,238 @@
+package log
+
+import (
+ "fmt"
+ "regexp"
+ "strings"
+ "sync"
+ "time"
+)
+
+// LogEntry represents a parsed log entry
+type LogEntry struct {
+ Timestamp time.Time
+ Level string
+ Scope string
+ Message string
+ Original string
+}
+
+// Formatter formats log entries in Istio style
+type Formatter struct {
+ patterns []*LogPattern
+}
+
+// LogPattern represents a pattern for recognizing different log formats
+type LogPattern struct {
+ Name string
+ Pattern *regexp.Regexp
+ Extractor func([]string) *LogEntry
+}
+
+var (
+ defaultFormatter *Formatter
+ formatterOnce sync.Once
+)
+
+// GetFormatter returns the default formatter instance
+func GetFormatter() *Formatter {
+ formatterOnce.Do(func() {
+ defaultFormatter = NewFormatter()
+ })
+ return defaultFormatter
+}
+
+// NewFormatter creates a new formatter with built-in patterns
+func NewFormatter() *Formatter {
+ f := &Formatter{
+ patterns: []*LogPattern{
+ // Klog pattern: I0926 16:53:33.461184 1
controller.go:123] message
+ // or: I0926 16:53:33.461184 1 controller.go:123]
successfully acquired lease istio-system/istio-namespace-controller-election
+ {
+ Name: "klog",
+ Pattern: regexp.MustCompile(`^([IWEF])(\d{4}
\d{2}:\d{2}:\d{2}\.\d+)\s+\d+\s+[^\s]+\]\s+(.+)$`),
+ Extractor: func(matches []string) *LogEntry {
+ if len(matches) < 4 {
+ return nil
+ }
+ level := klogLevelToIstio(matches[1])
+ timestamp :=
parseKlogTimestamp(matches[2])
+ message := matches[3]
+ return &LogEntry{
+ Timestamp: timestamp,
+ Level: level,
+ Scope: "klog",
+ Message: message,
+ Original:
strings.Join(matches, " "),
+ }
+ },
+ },
+ // Standard log pattern: 2025-09-26T16:53:33.461184Z
info message
+ {
+ Name: "standard",
+ Pattern:
regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z)\s+(\w+)\s+(.+)$`),
+ Extractor: func(matches []string) *LogEntry {
+ if len(matches) < 4 {
+ return nil
+ }
+ timestamp, _ :=
time.Parse(time.RFC3339Nano, matches[1])
+ return &LogEntry{
+ Timestamp: timestamp,
+ Level: matches[2],
+ Scope: "default",
+ Message: matches[3],
+ Original:
strings.Join(matches, " "),
+ }
+ },
+ },
+ // Logrus pattern: time="2025-09-26T16:53:33Z"
level=info msg="message"
+ {
+ Name: "logrus",
+ Pattern:
regexp.MustCompile(`^time="([^"]+)"\s+level=(\w+)\s+msg="([^"]+)"`),
+ Extractor: func(matches []string) *LogEntry {
+ if len(matches) < 4 {
+ return nil
+ }
+ timestamp, _ :=
time.Parse(time.RFC3339, matches[1])
+ return &LogEntry{
+ Timestamp: timestamp,
+ Level: matches[2],
+ Scope: "logrus",
+ Message: matches[3],
+ Original:
strings.Join(matches, " "),
+ }
+ },
+ },
+ },
+ }
+ return f
+}
+
+// Format formats a log line in Istio style
+// Removes any trailing newlines to prevent blank lines
+func (f *Formatter) Format(line string) string {
+ line = strings.TrimSpace(line)
+ if line == "" {
+ return ""
+ }
+
+ // Remove any newlines from the input line
+ line = strings.TrimRight(line, "\n\r")
+ line = strings.ReplaceAll(line, "\n", " ")
+ line = strings.ReplaceAll(line, "\r", " ")
+
+ // Try to match against known patterns
+ for _, pattern := range f.patterns {
+ matches := pattern.Pattern.FindStringSubmatch(line)
+ if matches != nil {
+ entry := pattern.Extractor(matches)
+ if entry != nil {
+ // Ensure entry message has no newlines
+ entry.Message =
strings.TrimRight(entry.Message, "\n\r")
+ entry.Message =
strings.ReplaceAll(entry.Message, "\n", " ")
+ entry.Message =
strings.ReplaceAll(entry.Message, "\r", " ")
+ return f.formatEntry(entry)
+ }
+ }
+ }
+
+ // If no pattern matches, treat as plain message with current time
+ now := time.Now().UTC()
+ return formatStandardLine(
+ formatFixedWidthTimestamp(now),
+ "info",
+ "unknown",
+ line,
+ )
+}
+
+// formatEntry formats a log entry in standard style with alignment
+func (f *Formatter) formatEntry(entry *LogEntry) string {
+ return formatStandardLine(
+ formatFixedWidthTimestamp(entry.Timestamp),
+ entry.Level,
+ entry.Scope,
+ entry.Message,
+ )
+}
+
+// klogLevelToIstio converts klog level to Istio level
+func klogLevelToIstio(klogLevel string) string {
+ switch klogLevel {
+ case "I":
+ return "info"
+ case "W":
+ return "warn"
+ case "E":
+ return "error"
+ case "F":
+ return "error"
+ default:
+ return "info"
+ }
+}
+
+// parseKlogTimestamp parses klog timestamp format (MMDD HH:MM:SS.microseconds)
+func parseKlogTimestamp(klogTime string) time.Time {
+ now := time.Now()
+ // Klog format: MMDD HH:MM:SS.microseconds
+ // We need to construct a full timestamp
+ parts := strings.Fields(klogTime)
+ if len(parts) != 2 {
+ return now.UTC()
+ }
+
+ datePart := parts[0] // MMDD
+ timePart := parts[1] // HH:MM:SS.microseconds
+
+ // Parse time part
+ timeParts := strings.Split(timePart, ".")
+ if len(timeParts) != 2 {
+ return now.UTC()
+ }
+
+ hourMinSec := timeParts[0]
+ microseconds := timeParts[1]
+
+ // Parse hour:min:sec
+ hmsParts := strings.Split(hourMinSec, ":")
+ if len(hmsParts) != 3 {
+ return now.UTC()
+ }
+
+ // Use current year and parse month/day from datePart
+ // Klog uses MMDD format, but we need to handle year correctly
+ // If the date is in the future (e.g., parsed date > current date), use
previous year
+ year := now.Year()
+ month := 0
+ day := 0
+ if len(datePart) == 4 {
+ fmt.Sscanf(datePart, "%02d%02d", &month, &day)
+ }
+
+ hour := 0
+ min := 0
+ sec := 0
+ fmt.Sscanf(hmsParts[0], "%d", &hour)
+ fmt.Sscanf(hmsParts[1], "%d", &min)
+ fmt.Sscanf(hmsParts[2], "%d", &sec)
+
+ // Parse microseconds (pad to nanoseconds)
+ micro := 0
+ fmt.Sscanf(microseconds, "%d", µ)
+ nanos := micro * 1000
+
+ if month == 0 || day == 0 {
+ return now.UTC()
+ }
+
+ t := time.Date(year, time.Month(month), day, hour, min, sec, nanos,
time.UTC)
+
+ // If the parsed time is in the future (more than 1 day ahead), it's
likely from previous year
+ // This handles cases where klog logs are from a different year
+ if t.After(now.Add(24 * time.Hour)) {
+ t = time.Date(year-1, time.Month(month), day, hour, min, sec,
nanos, time.UTC)
+ }
+
+ return t
+}
diff --git a/pkg/log/interceptor.go b/pkg/log/interceptor.go
new file mode 100644
index 00000000..500e8b49
--- /dev/null
+++ b/pkg/log/interceptor.go
@@ -0,0 +1,133 @@
+package log
+
+import (
+ "io"
+ "os"
+ "strings"
+ "sync"
+)
+
+// Interceptor intercepts log output from other logging libraries and
reformats it
+type Interceptor struct {
+ formatter *Formatter
+ original io.Writer
+ mu sync.Mutex
+ enabled bool
+}
+
+var (
+ globalInterceptor *Interceptor
+ interceptorOnce sync.Once
+)
+
+// GetInterceptor returns the global interceptor instance
+func GetInterceptor() *Interceptor {
+ interceptorOnce.Do(func() {
+ globalInterceptor = NewInterceptor()
+ })
+ return globalInterceptor
+}
+
+// NewInterceptor creates a new interceptor
+func NewInterceptor() *Interceptor {
+ return &Interceptor{
+ formatter: GetFormatter(),
+ original: os.Stderr,
+ enabled: true,
+ }
+}
+
+// Enable enables the interceptor
+func (i *Interceptor) Enable() {
+ i.mu.Lock()
+ defer i.mu.Unlock()
+ i.enabled = true
+}
+
+// Disable disables the interceptor
+func (i *Interceptor) Disable() {
+ i.mu.Lock()
+ defer i.mu.Unlock()
+ i.enabled = false
+}
+
+// IsEnabled returns whether the interceptor is enabled
+func (i *Interceptor) IsEnabled() bool {
+ i.mu.Lock()
+ defer i.mu.Unlock()
+ return i.enabled
+}
+
+// Writer returns a writer that intercepts and reformats log output
+func (i *Interceptor) Writer() io.Writer {
+ return &interceptWriter{interceptor: i}
+}
+
+// SetOutput sets the output writer for the interceptor
+func (i *Interceptor) SetOutput(w io.Writer) {
+ i.mu.Lock()
+ defer i.mu.Unlock()
+ i.original = w
+}
+
+// interceptWriter wraps the interceptor to implement io.Writer
+type interceptWriter struct {
+ interceptor *Interceptor
+ buffer []byte
+ mu sync.Mutex
+}
+
+func (w *interceptWriter) Write(p []byte) (n int, err error) {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+
+ if !w.interceptor.IsEnabled() {
+ return w.interceptor.original.Write(p)
+ }
+
+ // Append to buffer
+ w.buffer = append(w.buffer, p...)
+
+ // Process complete lines
+ lines := []string{}
+ lineStart := 0
+ for i, b := range w.buffer {
+ if b == '\n' {
+ line := string(w.buffer[lineStart:i])
+ // Trim whitespace but keep non-empty lines
+ line = strings.TrimSpace(line)
+ if line != "" {
+ lines = append(lines, line)
+ }
+ lineStart = i + 1
+ }
+ }
+
+ // Keep incomplete line in buffer
+ if lineStart < len(w.buffer) {
+ w.buffer = w.buffer[lineStart:]
+ } else {
+ w.buffer = w.buffer[:0]
+ }
+
+ // Format and write each line (formatter ensures no blank lines)
+ for _, line := range lines {
+ formatted := w.interceptor.formatter.Format(line)
+ // Remove all newlines and add exactly one
+ formatted = strings.TrimRight(formatted, "\n\r \t")
+ if formatted != "" {
+ formatted = formatted + "\n"
+ w.interceptor.original.Write([]byte(formatted))
+ }
+ }
+
+ return len(p), nil
+}
+
+// SetupKlogInterceptor sets up klog interception using klog's SetOutput
+// This function should be called early in the program initialization
+// The actual implementation is in klog_interceptor.go (with build tag)
+func SetupKlogInterceptor() {
+ // This will be implemented in a separate file that imports klog
+ // to avoid forcing all users to have klog as a dependency
+}
diff --git a/pkg/log/klog_formatter.go b/pkg/log/klog_formatter.go
new file mode 100644
index 00000000..51bcb8e8
--- /dev/null
+++ b/pkg/log/klog_formatter.go
@@ -0,0 +1,156 @@
+package log
+
+import (
+ "fmt"
+ "os"
+ "path/filepath"
+ "runtime"
+ "strings"
+ "sync"
+ "time"
+)
+
+// LogFormat represents the log output format
+type LogFormat int
+
+const (
+ // FormatKlog uses klog-style format (Istio default)
+ // Format: I1107 07:53:52.789817 3352004 server.go:652] message
+ FormatKlog LogFormat = iota
+ // FormatStandard uses standard format
+ // Format: 2025-11-07T07:53:52.787717694Z info default message
+ FormatStandard
+)
+
+var (
+ logFormat LogFormat = FormatStandard // Default to standard format
+ logFormatMu sync.RWMutex
+ processID = os.Getpid()
+ // callerSkip: Skip frames to get to user code
+ // Call stack: runtime.Caller -> formatKlogLine -> Logger.log ->
Logger.Info/Infof/etc -> user code
+ // So we need to skip 4 frames
+ callerSkip = 4
+)
+
+// SetLogFormat sets the global log output format
+func SetLogFormat(format LogFormat) {
+ logFormatMu.Lock()
+ defer logFormatMu.Unlock()
+ logFormat = format
+}
+
+// GetLogFormat returns the current log output format
+func GetLogFormat() LogFormat {
+ logFormatMu.RLock()
+ defer logFormatMu.RUnlock()
+ return logFormat
+}
+
+// formatKlogLine formats a log line in klog style (Istio format)
+// Format: I1107 07:53:52.789817 3352004 server.go:652] message
+func formatKlogLine(level Level, scope, message string, skip int) string {
+ now := time.Now()
+
+ // Get level prefix: I, W, E, F
+ levelPrefix := levelToKlogPrefix(level)
+
+ // Format timestamp: MMDD HH:MM:SS.microseconds
+ monthDay := fmt.Sprintf("%02d%02d", int(now.Month()), now.Day())
+ timeStr := fmt.Sprintf("%02d:%02d:%02d.%06d",
+ now.Hour(), now.Minute(), now.Second(), now.Nanosecond()/1000)
+ timestamp := fmt.Sprintf("%s %s", monthDay, timeStr)
+
+ // Get caller information (file:line)
+ file, line := getCaller(skip)
+
+ // Format: I1107 07:53:52.789817 3352004 server.go:652] message
+ // Clean message of any newlines
+ message = strings.TrimRight(message, "\n\r")
+ message = strings.ReplaceAll(message, "\n", " ")
+ message = strings.ReplaceAll(message, "\r", " ")
+ message = strings.TrimSpace(message)
+
+ // If scope is provided and not empty, include it in the message
+ if scope != "" && scope != "default" {
+ message = fmt.Sprintf("[%s] %s", scope, message)
+ }
+
+ return fmt.Sprintf("%s%s %d %s:%d] %s",
+ levelPrefix, timestamp, processID, file, line, message)
+}
+
+// levelToKlogPrefix converts log level to klog prefix
+func levelToKlogPrefix(level Level) string {
+ switch level {
+ case ErrorLevel:
+ return "E" // Error
+ case WarnLevel:
+ return "W" // Warning
+ case InfoLevel:
+ return "I" // Info
+ case DebugLevel:
+ return "I" // Debug uses Info prefix in klog (V levels are used
for debug)
+ default:
+ return "I"
+ }
+}
+
+// getCaller returns the file and line number of the caller
+func getCaller(skip int) (string, int) {
+ pc, file, line, ok := runtime.Caller(skip)
+ if !ok {
+ return "unknown", 0
+ }
+
+ // Get just the filename, not the full path
+ filename := filepath.Base(file)
+
+ // Try to get function name for better context (optional)
+ // This can be useful for debugging
+ _ = pc // Can use runtime.FuncForPC(pc).Name() if needed
+
+ return filename, line
+}
+
+// Column widths for aligned log output
+const (
+ levelColumnWidth = 6 // "info ", "warn ", "error ", "debug "
+ scopeColumnWidth = 12 // Scope names, e.g., "default "
+)
+
+// formatFixedWidthTimestamp formats a timestamp with fixed width (always 9
digits for nanoseconds)
+// Format: 2025-11-07T08:07:12.640033925Z (always 30 characters)
+func formatFixedWidthTimestamp(t time.Time) string {
+ // Format the base part: 2025-11-07T08:07:12.
+ base := t.UTC().Format("2006-01-02T15:04:05.")
+
+ // Get nanoseconds and pad to 9 digits
+ nanos := t.Nanosecond()
+ nanosStr := fmt.Sprintf("%09d", nanos)
+
+ // Append Z for UTC
+ return base + nanosStr + "Z"
+}
+
+// formatStandardLine formats a log line in standard format with column
alignment
+// Format: timestamp level scope message (aligned columns, single space
between logical fields)
+// Example: 2025-11-07T08:02:28.610950444Z info default FLAG:
--clusterAliases="[]"
+// Note: Padding spaces are used for column alignment, but logical fields are
separated by single space
+func formatStandardLine(timestamp, level, scope, message string) string {
+ // Ensure message has no newlines (should already be cleaned by
formatMessage, but double-check)
+ message = strings.TrimRight(message, "\n\r")
+ message = strings.ReplaceAll(message, "\n", " ")
+ message = strings.ReplaceAll(message, "\r", " ")
+ // Remove any leading/trailing whitespace from message
+ message = strings.TrimSpace(message)
+
+ // Align level to fixed width (left-aligned, padded with spaces) for
column alignment
+ levelPadded := fmt.Sprintf("%-*s", levelColumnWidth, level)
+
+ // Align scope to fixed width (left-aligned, padded with spaces) for
column alignment
+ scopePadded := fmt.Sprintf("%-*s", scopeColumnWidth, scope)
+
+ // Format: timestamp level scope message
+ // Single space after timestamp, then padded level and scope for
alignment, then single space before message
+ return fmt.Sprintf("%s %s%s %s", timestamp, levelPadded, scopePadded,
message)
+}
diff --git a/pkg/log/klog_interceptor.go b/pkg/log/klog_interceptor.go
new file mode 100644
index 00000000..efcda848
--- /dev/null
+++ b/pkg/log/klog_interceptor.go
@@ -0,0 +1,19 @@
+//go:build !noklog
+// +build !noklog
+
+package log
+
+import (
+ "k8s.io/klog/v2"
+)
+
+func init() {
+ // Automatically intercept klog output when this package is imported
+ setupKlogInterceptor()
+}
+
+// setupKlogInterceptor sets up klog interception
+func setupKlogInterceptor() {
+ interceptor := GetInterceptor()
+ klog.SetOutput(interceptor.Writer())
+}
diff --git a/pkg/log/log.go b/pkg/log/log.go
new file mode 100644
index 00000000..eaef284d
--- /dev/null
+++ b/pkg/log/log.go
@@ -0,0 +1,382 @@
+package log
+
+import (
+ "fmt"
+ "io"
+ "os"
+ "strings"
+ "sync"
+ "time"
+)
+
+// Level represents the logging level
+type Level int
+
+const (
+ NoneLevel Level = iota
+ ErrorLevel
+ WarnLevel
+ InfoLevel
+ DebugLevel
+)
+
+var levelNames = map[Level]string{
+ ErrorLevel: "error",
+ WarnLevel: "warn",
+ InfoLevel: "info",
+ DebugLevel: "debug",
+}
+
+// Scope represents a logging scope with its own level and output
+type Scope struct {
+ name string
+ description string
+ outputLevel Level
+ mu sync.RWMutex
+ output io.Writer
+}
+
+// Logger provides logging methods for a scope
+type Logger struct {
+ scope *Scope
+}
+
+// Scope returns the underlying scope
+func (l *Logger) Scope() *Scope {
+ return l.scope
+}
+
+var (
+ scopes = make(map[string]*Scope)
+ scopesMu sync.RWMutex
+ defaultOut io.Writer = os.Stderr
+ globalMu sync.Mutex
+ usePrettyLog bool = false
+ prettyLogMu sync.RWMutex
+ defaultScope = "default"
+ defaultLogger *Logger
+ defaultLoggerOnce sync.Once
+)
+
+// RegisterScope creates and registers a new logging scope
+func RegisterScope(name, description string) *Logger {
+ scopesMu.Lock()
+ defer scopesMu.Unlock()
+
+ if scope, exists := scopes[name]; exists {
+ return &Logger{scope: scope}
+ }
+
+ scope := &Scope{
+ name: name,
+ description: description,
+ outputLevel: InfoLevel,
+ output: defaultOut,
+ }
+
+ scopes[name] = scope
+ return &Logger{scope: scope}
+}
+
+// SetOutputLevel sets the output level for the scope
+func (s *Scope) SetOutputLevel(level Level) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.outputLevel = level
+}
+
+// GetOutputLevel returns the current output level
+func (s *Scope) GetOutputLevel() Level {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+ return s.outputLevel
+}
+
+// SetOutput sets the output writer for the scope
+func (s *Scope) SetOutput(w io.Writer) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.output = w
+}
+
+// GetOutput returns the current output writer
+func (s *Scope) GetOutput() io.Writer {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+ return s.output
+}
+
+// Name returns the scope name
+func (s *Scope) Name() string {
+ return s.name
+}
+
+// Description returns the scope description
+func (s *Scope) Description() string {
+ return s.description
+}
+
+// SetDefaultOutput sets the default output writer for all scopes
+func SetDefaultOutput(w io.Writer) {
+ globalMu.Lock()
+ defer globalMu.Unlock()
+ defaultOut = w
+}
+
+// SetScopeLevel sets the output level for a specific scope
+func SetScopeLevel(name string, level Level) {
+ scopesMu.RLock()
+ scope, exists := scopes[name]
+ scopesMu.RUnlock()
+
+ if exists {
+ scope.SetOutputLevel(level)
+ }
+}
+
+// SetAllScopesLevel sets the output level for all scopes
+func SetAllScopesLevel(level Level) {
+ scopesMu.RLock()
+ defer scopesMu.RUnlock()
+
+ for _, scope := range scopes {
+ scope.SetOutputLevel(level)
+ }
+}
+
+// FindScope returns a scope by name
+func FindScope(name string) *Scope {
+ scopesMu.RLock()
+ defer scopesMu.RUnlock()
+ return scopes[name]
+}
+
+// AllScopes returns all registered scopes
+func AllScopes() map[string]*Scope {
+ scopesMu.RLock()
+ defer scopesMu.RUnlock()
+
+ result := make(map[string]*Scope)
+ for k, v := range scopes {
+ result[k] = v
+ }
+ return result
+}
+
+// log writes a log message if the level is enabled
+func (l *Logger) log(level Level, format string, args ...interface{}) {
+ if l.scope.GetOutputLevel() < level {
+ return
+ }
+
+ msg := formatMessage(format, args...)
+ levelName := levelNames[level]
+ scopeName := l.scope.Name()
+
+ // Check deduplication
+ dedup := GetDeduplicator()
+ key := LogKey{
+ Scope: scopeName,
+ Level: levelName,
+ Message: msg,
+ }
+
+ shouldLog, _, summary := dedup.ShouldLog(key)
+ if !shouldLog {
+ // Suppress duplicate log
+ return
+ }
+
+ // Append summary if there were duplicates
+ if summary != "" {
+ msg = msg + " " + summary
+ // Clean any newlines that might have been introduced
+ msg = strings.TrimRight(msg, "\n\r")
+ msg = strings.ReplaceAll(msg, "\n", " ")
+ msg = strings.ReplaceAll(msg, "\r", " ")
+ }
+
+ output := l.scope.GetOutput()
+ if output == nil {
+ output = defaultOut
+ }
+
+ var logLine string
+ prettyLogMu.RLock()
+ usePretty := usePrettyLog
+ prettyLogMu.RUnlock()
+
+ if usePretty {
+ prettyFormatter := GetPrettyFormatter()
+ logLine = prettyFormatter.Format(time.Now().UTC(), levelName,
scopeName, msg)
+ } else {
+ // Use standard format by default, or klog format if configured
+ logFormatMu.RLock()
+ format := logFormat
+ logFormatMu.RUnlock()
+
+ if format == FormatKlog {
+ // Use klog format with caller information
+ logLine = formatKlogLine(level, scopeName, msg,
callerSkip)
+ } else {
+ // Use standard format (default) with fixed-width
timestamp
+ timestamp := formatFixedWidthTimestamp(time.Now().UTC())
+ logLine = formatStandardLine(timestamp, levelName,
scopeName, msg)
+ }
+ }
+
+ // Ensure logLine has exactly one newline at the end, no more, no less
+ // Remove ALL newlines and carriage returns, then add exactly one
newline
+ logLine = strings.TrimRight(logLine, "\n\r \t")
+ if logLine != "" {
+ logLine = logLine + "\n"
+ output.Write([]byte(logLine))
+ }
+}
+
+// Error logs an error message
+func (l *Logger) Error(args ...interface{}) {
+ l.log(ErrorLevel, "%v", args...)
+}
+
+// Errorf logs a formatted error message
+func (l *Logger) Errorf(format string, args ...interface{}) {
+ l.log(ErrorLevel, format, args...)
+}
+
+// Warn logs a warning message
+func (l *Logger) Warn(args ...interface{}) {
+ l.log(WarnLevel, "%v", args...)
+}
+
+// Warnf logs a formatted warning message
+func (l *Logger) Warnf(format string, args ...interface{}) {
+ l.log(WarnLevel, format, args...)
+}
+
+// Info logs an info message
+func (l *Logger) Info(args ...interface{}) {
+ l.log(InfoLevel, "%v", args...)
+}
+
+// Infof logs a formatted info message
+func (l *Logger) Infof(format string, args ...interface{}) {
+ l.log(InfoLevel, format, args...)
+}
+
+// Debug logs a debug message
+func (l *Logger) Debug(args ...interface{}) {
+ l.log(DebugLevel, "%v", args...)
+}
+
+// Debugf logs a formatted debug message
+func (l *Logger) Debugf(format string, args ...interface{}) {
+ l.log(DebugLevel, format, args...)
+}
+
+// formatMessage formats the message with arguments
+// Removes any trailing newlines to prevent blank lines in output
+func formatMessage(format string, args ...interface{}) string {
+ var msg string
+ if len(args) == 0 {
+ msg = format
+ } else {
+ msg = fmt.Sprintf(format, args...)
+ }
+ // Remove any trailing newlines or carriage returns to prevent blank
lines
+ msg = strings.TrimRight(msg, "\n\r")
+ // Replace any internal newlines with spaces to keep everything on one
line
+ msg = strings.ReplaceAll(msg, "\n", " ")
+ msg = strings.ReplaceAll(msg, "\r", " ")
+ return msg
+}
+
+// formatLogLine formats a log line in standard style (deprecated, use
formatStandardLine)
+// Kept for backward compatibility
+func formatLogLine(timestamp, level, scope, message string) string {
+ return formatStandardLine(timestamp, level, scope, message)
+}
+
+// EnablePrettyLogging enables pretty logging with colors and better formatting
+func EnablePrettyLogging() {
+ prettyLogMu.Lock()
+ defer prettyLogMu.Unlock()
+ usePrettyLog = true
+ GetPrettyFormatter()
+}
+
+// DisablePrettyLogging disables pretty logging, reverts to standard Istio
format
+func DisablePrettyLogging() {
+ prettyLogMu.Lock()
+ defer prettyLogMu.Unlock()
+ usePrettyLog = false
+}
+
+// IsPrettyLoggingEnabled returns whether pretty logging is enabled
+func IsPrettyLoggingEnabled() bool {
+ prettyLogMu.RLock()
+ defer prettyLogMu.RUnlock()
+ return usePrettyLog
+}
+
+// getDefaultLogger returns the default logger instance
+func getDefaultLogger() *Logger {
+ defaultLoggerOnce.Do(func() {
+ defaultLogger = RegisterScope(defaultScope, "Default logging
scope")
+ })
+ return defaultLogger
+}
+
+// SetDefaultScope sets the default scope name for package-level logging
functions
+// This should be called before any package-level logging functions are used
+func SetDefaultScope(name string) {
+ globalMu.Lock()
+ defer globalMu.Unlock()
+ defaultScope = name
+ // Reset the default logger to use the new scope
+ defaultLogger = RegisterScope(name, "Default logging scope")
+}
+
+// Package-level logging functions using the default scope
+
+// Info logs an info message using the default scope
+func Info(args ...interface{}) {
+ getDefaultLogger().Info(args...)
+}
+
+// Infof logs a formatted info message using the default scope
+func Infof(format string, args ...interface{}) {
+ getDefaultLogger().Infof(format, args...)
+}
+
+// Warn logs a warning message using the default scope
+func Warn(args ...interface{}) {
+ getDefaultLogger().Warn(args...)
+}
+
+// Warnf logs a formatted warning message using the default scope
+func Warnf(format string, args ...interface{}) {
+ getDefaultLogger().Warnf(format, args...)
+}
+
+// Debug logs a debug message using the default scope
+// Debug level is the lowest log level (most verbose)
+func Debug(args ...interface{}) {
+ getDefaultLogger().Debug(args...)
+}
+
+// Debugf logs a formatted debug message using the default scope
+// Debug level is the lowest log level (most verbose)
+func Debugf(format string, args ...interface{}) {
+ getDefaultLogger().Debugf(format, args...)
+}
+
+// Error logs an error message using the default scope
+func Error(args ...interface{}) {
+ getDefaultLogger().Error(args...)
+}
+
+// Errorf logs a formatted error message using the default scope
+func Errorf(format string, args ...interface{}) {
+ getDefaultLogger().Errorf(format, args...)
+}
diff --git a/pkg/log/pretty.go b/pkg/log/pretty.go
new file mode 100644
index 00000000..fd02c7ff
--- /dev/null
+++ b/pkg/log/pretty.go
@@ -0,0 +1,195 @@
+package log
+
+import (
+ "fmt"
+ "io"
+ "os"
+ "strings"
+ "sync"
+ "time"
+)
+
+// Color codes for terminal output
+const (
+ ColorReset = "\033[0m"
+ ColorRed = "\033[31m"
+ ColorYellow = "\033[33m"
+ ColorBlue = "\033[34m"
+ ColorCyan = "\033[36m"
+ ColorGray = "\033[90m"
+ ColorGreen = "\033[32m"
+ ColorBold = "\033[1m"
+)
+
+// PrettyFormatter provides pretty-printed log output with colors
+type PrettyFormatter struct {
+ useColors bool
+ colorOutput bool
+ timeFormat string
+ showScope bool
+ maxScopeLen int
+}
+
+var (
+ globalPrettyFormatter *PrettyFormatter
+ prettyOnce sync.Once
+)
+
+// GetPrettyFormatter returns the global pretty formatter instance
+func GetPrettyFormatter() *PrettyFormatter {
+ prettyOnce.Do(func() {
+ globalPrettyFormatter = NewPrettyFormatter()
+ })
+ return globalPrettyFormatter
+}
+
+// NewPrettyFormatter creates a new pretty formatter
+func NewPrettyFormatter() *PrettyFormatter {
+ return &PrettyFormatter{
+ useColors: true,
+ colorOutput: isColorTerminal(os.Stderr),
+ timeFormat: "2006-01-02 15:04:05.000",
+ showScope: true,
+ maxScopeLen: 12,
+ }
+}
+
+// isColorTerminal checks if the output is a color terminal
+func isColorTerminal(w io.Writer) bool {
+ if f, ok := w.(*os.File); ok {
+ return isatty(f.Fd())
+ }
+ return false
+}
+
+// isatty checks if file descriptor is a terminal (simplified version)
+func isatty(fd uintptr) bool {
+ // Check if TERM environment variable is set (indicates terminal)
+ term := os.Getenv("TERM")
+ if term == "" {
+ return false
+ }
+ // Check if NO_COLOR is set (disables colors)
+ if os.Getenv("NO_COLOR") != "" {
+ return false
+ }
+ // Assume it's a terminal if TERM is set and not "dumb"
+ return term != "dumb"
+}
+
+// SetUseColors enables or disables color output
+func (pf *PrettyFormatter) SetUseColors(enable bool) {
+ pf.useColors = enable
+}
+
+// SetTimeFormat sets the time format for log output
+func (pf *PrettyFormatter) SetTimeFormat(format string) {
+ pf.timeFormat = format
+}
+
+// SetShowScope enables or disables scope display
+func (pf *PrettyFormatter) SetShowScope(show bool) {
+ pf.showScope = show
+}
+
+// SetMaxScopeLength sets the maximum scope name length for alignment
+func (pf *PrettyFormatter) SetMaxScopeLength(length int) {
+ pf.maxScopeLen = length
+}
+
+// Format formats a log line with pretty printing
+func (pf *PrettyFormatter) Format(timestamp time.Time, level, scope, message
string) string {
+ var parts []string
+
+ // Format timestamp
+ timeStr := timestamp.Format(pf.timeFormat)
+ if pf.useColors && pf.colorOutput {
+ timeStr = ColorGray + timeStr + ColorReset
+ }
+ parts = append(parts, timeStr)
+
+ // Format level with color and icon
+ levelStr := pf.formatLevel(level)
+ parts = append(parts, levelStr)
+
+ // Format scope
+ if pf.showScope {
+ scopeStr := pf.formatScope(scope)
+ parts = append(parts, scopeStr)
+ }
+
+ // Format message (clean any newlines)
+ message = strings.TrimRight(message, "\n\r")
+ message = strings.ReplaceAll(message, "\n", " ")
+ message = strings.ReplaceAll(message, "\r", " ")
+ message = strings.TrimSpace(message)
+ messageStr := pf.formatMessage(message)
+ parts = append(parts, messageStr)
+
+ return strings.Join(parts, " ")
+}
+
+// formatLevel formats the log level with color and icon
+func (pf *PrettyFormatter) formatLevel(level string) string {
+ var color, icon string
+
+ switch strings.ToLower(level) {
+ case "error":
+ color = ColorRed
+ icon = "✗"
+ case "warn":
+ color = ColorYellow
+ icon = "⚠"
+ case "info":
+ color = ColorCyan
+ icon = "ℹ"
+ case "debug":
+ color = ColorGray
+ icon = "•"
+ default:
+ color = ColorReset
+ icon = "•"
+ }
+
+ levelUpper := strings.ToUpper(level)
+ // Pad to 5 characters for alignment
+ if len(levelUpper) < 5 {
+ levelUpper = levelUpper + strings.Repeat(" ", 5-len(levelUpper))
+ }
+
+ if pf.useColors && pf.colorOutput {
+ return fmt.Sprintf("%s%s%s %s%s", color, ColorBold, icon,
levelUpper, ColorReset)
+ }
+ // Even without colors, show icon for better readability
+ return fmt.Sprintf("%s %s", icon, levelUpper)
+}
+
+// formatScope formats the scope name with color
+func (pf *PrettyFormatter) formatScope(scope string) string {
+ scopeStr := scope
+ if len(scopeStr) > pf.maxScopeLen {
+ scopeStr = scopeStr[:pf.maxScopeLen]
+ } else {
+ scopeStr = scopeStr + strings.Repeat(" ",
pf.maxScopeLen-len(scopeStr))
+ }
+
+ if pf.useColors && pf.colorOutput {
+ return ColorBlue + ColorBold + scopeStr + ColorReset
+ }
+ return scopeStr
+}
+
+// formatMessage formats the message, preserving any existing formatting
+func (pf *PrettyFormatter) formatMessage(message string) string {
+ // Check if message contains "repeated" pattern for deduplication
summary
+ if strings.Contains(message, "(repeated") {
+ if pf.useColors && pf.colorOutput {
+ // Highlight the repeated count
+ parts := strings.Split(message, "(repeated")
+ if len(parts) == 2 {
+ return parts[0] + ColorYellow + "(repeated" +
parts[1] + ColorReset
+ }
+ }
+ }
+ return message
+}
diff --git a/sail/cmd/sail-discovery/app/cmd.go
b/sail/cmd/sail-discovery/app/cmd.go
index 59dd2fce..329202bb 100644
--- a/sail/cmd/sail-discovery/app/cmd.go
+++ b/sail/cmd/sail-discovery/app/cmd.go
@@ -19,6 +19,7 @@ package app
import (
"fmt"
+
"github.com/apache/dubbo-kubernetes/pkg/cmd"
"github.com/apache/dubbo-kubernetes/pkg/config/constants"
"github.com/apache/dubbo-kubernetes/pkg/ctrlz"
@@ -79,9 +80,10 @@ func newDiscoveryCommand() *cobra.Command {
return fmt.Errorf("failed to start discovery
service: %v", err)
}
+ // Wait for signal - when received, immediately exit
cmd.WaitSignal(stop)
- discoveryServer.WaitUntilCompletion()
+ // Signal received, exit immediately
return nil
},
}
diff --git a/sail/pkg/bootstrap/certcontroller.go
b/sail/pkg/bootstrap/certcontroller.go
index b6805a3c..7c732173 100644
--- a/sail/pkg/bootstrap/certcontroller.go
+++ b/sail/pkg/bootstrap/certcontroller.go
@@ -21,13 +21,13 @@ import (
"bytes"
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/config/constants"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"github.com/apache/dubbo-kubernetes/pkg/sleep"
"github.com/apache/dubbo-kubernetes/sail/pkg/features"
tb "github.com/apache/dubbo-kubernetes/sail/pkg/trustbundle"
"github.com/apache/dubbo-kubernetes/security/pkg/k8s/chiron"
"github.com/apache/dubbo-kubernetes/security/pkg/pki/ca"
certutil "github.com/apache/dubbo-kubernetes/security/pkg/util"
- "k8s.io/klog/v2"
"os"
"path"
"strings"
@@ -41,7 +41,7 @@ const (
)
func (s *Server) updateRootCertAndGenKeyCert() error {
- klog.Infof("update root cert and generate new dns certs")
+ log.Infof("update root cert and generate new dns certs")
caBundle := s.CA.GetCAKeyCertBundle().GetRootCertPem()
certChain, keyPEM, err := s.CA.GenKeyCert(s.dnsNames,
SelfSignedCACertTTL.Get(), false)
if err != nil {
@@ -50,13 +50,13 @@ func (s *Server) updateRootCertAndGenKeyCert() error {
if features.MultiRootMesh {
// Trigger trust anchor update, this will send PCDS to all
sidecars.
- klog.Infof("Update trust anchor with new root cert")
+ log.Infof("Update trust anchor with new root cert")
err =
s.workloadTrustBundle.UpdateTrustAnchor(&tb.TrustAnchorUpdate{
TrustAnchorConfig: tb.TrustAnchorConfig{Certs:
[]string{string(caBundle)}},
Source: tb.SourceDubboCA,
})
if err != nil {
- klog.Errorf("failed to update trust anchor from source
Dubbo CA, err: %v", err)
+ log.Errorf("failed to update trust anchor from source
Dubbo CA, err: %v", err)
return err
}
}
@@ -71,7 +71,7 @@ func (s *Server) initFileCertificateWatches(tlsOptions
TLSOptions) error {
}
// TODO: Setup watcher for root and restart server if it changes.
for _, file := range []string{tlsOptions.CertFile, tlsOptions.KeyFile} {
- klog.Infof("adding watcher for certificate %s", file)
+ log.Infof("adding watcher for certificate %s", file)
if err := s.fileWatcher.Add(file); err != nil {
return fmt.Errorf("could not watch %v: %v", file, err)
}
@@ -84,7 +84,7 @@ func (s *Server) initFileCertificateWatches(tlsOptions
TLSOptions) error {
case <-keyCertTimerC:
keyCertTimerC = nil
if err :=
s.dubbodCertBundleWatcher.SetFromFilesAndNotify(tlsOptions.KeyFile,
tlsOptions.CertFile, tlsOptions.CaCertFile); err != nil {
- klog.Errorf("Setting
keyCertBundle failed: %v", err)
+ log.Errorf("Setting
keyCertBundle failed: %v", err)
}
case
<-s.fileWatcher.Events(tlsOptions.CertFile):
if keyCertTimerC == nil {
@@ -95,9 +95,9 @@ func (s *Server) initFileCertificateWatches(tlsOptions
TLSOptions) error {
keyCertTimerC =
time.After(watchDebounceDelay)
}
case err :=
<-s.fileWatcher.Errors(tlsOptions.CertFile):
- klog.Errorf("error watching %v: %v",
tlsOptions.CertFile, err)
+ log.Errorf("error watching %v: %v",
tlsOptions.CertFile, err)
case err :=
<-s.fileWatcher.Errors(tlsOptions.KeyFile):
- klog.Errorf("error watching %v: %v",
tlsOptions.KeyFile, err)
+ log.Errorf("error watching %v: %v",
tlsOptions.KeyFile, err)
case <-stop:
return
}
@@ -123,7 +123,7 @@ func (s *Server) RotateDNSCertForK8sCA(stop <-chan struct{},
certChain, keyPEM, _, err :=
chiron.GenKeyCertK8sCA(s.kubeClient.Kube(),
strings.Join(s.dnsNames, ","), defaultCACertPath,
signerName, approveCsr, requestedLifetime)
if err != nil {
- klog.Errorf("failed regenerating key and cert for
dubbod by kubernetes: %v", err)
+ log.Errorf("failed regenerating key and cert for dubbod
by kubernetes: %v", err)
continue
}
s.dubbodCertBundleWatcher.SetAndNotify(keyPEM, certChain,
s.dubbodCertBundleWatcher.GetCABundle())
@@ -135,7 +135,7 @@ func (s *Server) initDNSCertsK8SRA() error {
sailCertProviderName := features.SailCertProvider
signerName := strings.TrimPrefix(sailCertProviderName,
constants.CertProviderKubernetesSignerPrefix)
- klog.Infof("Generating K8S-signed cert for %v using signer %v",
s.dnsNames, signerName)
+ log.Infof("Generating K8S-signed cert for %v using signer %v",
s.dnsNames, signerName)
certChain, keyPEM, _, err = chiron.GenKeyCertK8sCA(s.kubeClient.Kube(),
strings.Join(s.dnsNames, ","), "", signerName, true,
SelfSignedCACertTTL.Get())
if err != nil {
@@ -153,7 +153,7 @@ func (s *Server) initDNSCertsK8SRA() error {
newCertChain, newKeyPEM, _, err :=
chiron.GenKeyCertK8sCA(s.kubeClient.Kube(),
strings.Join(s.dnsNames, ","), "", signerName,
true, SelfSignedCACertTTL.Get())
if err != nil {
- klog.Errorf("failed regenerating key and cert
for istiod by kubernetes: %v", err)
+ log.Errorf("failed regenerating key and cert
for istiod by kubernetes: %v", err)
}
s.dubbodCertBundleWatcher.SetAndNotify(newKeyPEM,
newCertChain, newCaBundle)
}
@@ -178,7 +178,7 @@ func (s *Server) initDNSCertsDubbod() error {
if err != nil {
return fmt.Errorf("failed generating dubbod key cert %v", err)
}
- klog.Infof("Generating dubbod-signed cert for %v:\n %s", s.dnsNames,
certChain)
+ log.Infof("Generating dubbod-signed cert for %v:\n %s", s.dnsNames,
certChain)
fileBundle, err := detectSigningCABundleAndCRL()
if err != nil {
@@ -196,7 +196,7 @@ func (s *Server) initDNSCertsDubbod() error {
// check if signing key file exists the cert dir and if the
dubbo-generated file
// exists (only if USE_CACERTS_FOR_SELF_SIGNED_CA is enabled)
if !detectedSigningCABundle {
- klog.Infof("Use roots from dubbo-ca-secret")
+ log.Infof("Use roots from dubbo-ca-secret")
caBundle = s.CA.GetCAKeyCertBundle().GetRootCertPem()
s.addStartFunc("dubbod server certificate rotation", func(stop
<-chan struct{}) error {
@@ -207,7 +207,7 @@ func (s *Server) initDNSCertsDubbod() error {
return nil
})
} else if features.UseCacertsForSelfSignedCA && dubboGenerated {
- klog.Infof("Use roots from %v and watch",
fileBundle.RootCertFile)
+ log.Infof("Use roots from %v and watch",
fileBundle.RootCertFile)
caBundle = s.CA.GetCAKeyCertBundle().GetRootCertPem()
// Similar code to dubbo-ca-secret: refresh the root cert, but
in casecrets
@@ -220,7 +220,7 @@ func (s *Server) initDNSCertsDubbod() error {
})
} else {
- klog.Infof("Use root cert from %v", fileBundle.RootCertFile)
+ log.Infof("Use root cert from %v", fileBundle.RootCertFile)
caBundle, err = os.ReadFile(fileBundle.RootCertFile)
if err != nil {
@@ -242,10 +242,10 @@ func (s *Server) watchRootCertAndGenKeyCert(stop <-chan
struct{}) {
caBundle = newRootCert
certChain, keyPEM, err := s.CA.GenKeyCert(s.dnsNames,
SelfSignedCACertTTL.Get(), false)
if err != nil {
- klog.Errorf("failed generating dubbod key cert
%v", err)
+ log.Errorf("failed generating dubbod key cert
%v", err)
} else {
s.dubbodCertBundleWatcher.SetAndNotify(keyPEM,
certChain, caBundle)
- klog.Infof("regenerated dubbod dns cert: %s",
certChain)
+ log.Infof("regenerated dubbod dns cert: %s",
certChain)
}
}
}
diff --git a/sail/pkg/bootstrap/configcontroller.go
b/sail/pkg/bootstrap/configcontroller.go
index 85c1a049..5a8ab10f 100644
--- a/sail/pkg/bootstrap/configcontroller.go
+++ b/sail/pkg/bootstrap/configcontroller.go
@@ -25,6 +25,7 @@ import (
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/adsc"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/collections"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
configaggregate
"github.com/apache/dubbo-kubernetes/sail/pkg/config/aggregate"
"github.com/apache/dubbo-kubernetes/sail/pkg/config/kube/crdclient"
"github.com/apache/dubbo-kubernetes/sail/pkg/config/kube/file"
@@ -37,7 +38,6 @@ import (
"google.golang.org/grpc/credentials/insecure"
"istio.io/api/networking/v1alpha3"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/klog/v2"
"net/url"
"strings"
)
@@ -123,7 +123,7 @@ func (s *Server) initConfigSources(args *SailArgs) (err
error) {
return err
}
s.ConfigStores = append(s.ConfigStores,
configController)
- klog.Infof("Started File configSource %s",
configSource.Address)
+ log.Infof("Started File configSource %s",
configSource.Address)
case XDS:
transportCredentials, err :=
s.getTransportCredentials(args, configSource.TlsSettings)
if err != nil {
@@ -155,22 +155,22 @@ func (s *Server) initConfigSources(args *SailArgs) (err
error) {
return fmt.Errorf("MCP: failed running %v", err)
}
s.ConfigStores = append(s.ConfigStores,
configController)
- klog.Infof("Started XDS configSource %s",
configSource.Address)
+ log.Infof("Started XDS configSource %s",
configSource.Address)
case Kubernetes:
if srcAddress.Path == "" || srcAddress.Path == "/" {
err2 := s.initK8SConfigStore(args)
if err2 != nil {
- klog.Errorf("Error loading k8s: %v",
err2)
+ log.Errorf("Error loading k8s: %v",
err2)
return err2
}
- klog.Infof("Started Kubernetes configSource
%s", configSource.Address)
+ log.Infof("Started Kubernetes configSource %s",
configSource.Address)
} else {
- klog.Infof("Not implemented, ignore: %v",
configSource.Address)
+ log.Infof("Not implemented, ignore: %v",
configSource.Address)
// TODO: handle k8s:// scheme for remote
cluster. Use same mechanism as service registry,
// using the cluster name as key to match a
secret.
}
default:
- klog.Infof("Ignoring unsupported config source: %v",
configSource.Address)
+ log.Infof("Ignoring unsupported config source: %v",
configSource.Address)
}
}
return nil
diff --git a/sail/pkg/bootstrap/discovery.go b/sail/pkg/bootstrap/discovery.go
index 2471171f..8432a57b 100644
--- a/sail/pkg/bootstrap/discovery.go
+++ b/sail/pkg/bootstrap/discovery.go
@@ -18,9 +18,6 @@
package bootstrap
import (
- "net/http"
-
- "github.com/apache/dubbo-kubernetes/pkg/cluster"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
"github.com/apache/dubbo-kubernetes/sail/pkg/networking/apigen"
"github.com/apache/dubbo-kubernetes/sail/pkg/networking/core"
@@ -32,9 +29,6 @@ import (
func InitGenerators(
s *xds.DiscoveryServer,
cg core.ConfigGenerator,
- systemNameSpace string,
- clusterID cluster.ID,
- internalDebugMux *http.ServeMux,
) {
env := s.Env
generators := map[string]model.XdsResourceGenerator{}
diff --git a/sail/pkg/bootstrap/dubbo_ca.go b/sail/pkg/bootstrap/dubbo_ca.go
index 1721e793..3405d984 100644
--- a/sail/pkg/bootstrap/dubbo_ca.go
+++ b/sail/pkg/bootstrap/dubbo_ca.go
@@ -24,6 +24,7 @@ import (
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/config/constants"
"github.com/apache/dubbo-kubernetes/pkg/env"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"github.com/apache/dubbo-kubernetes/pkg/security"
"github.com/apache/dubbo-kubernetes/sail/pkg/features"
securityModel
"github.com/apache/dubbo-kubernetes/sail/pkg/security/model"
@@ -38,7 +39,6 @@ import (
"istio.io/api/security/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/klog/v2"
"os"
"path"
"strings"
@@ -104,7 +104,7 @@ var (
func (s *Server) initCAServer(ca caserver.CertificateAuthority, opts
*caOptions) {
caServer, startErr := caserver.New(ca, maxWorkloadCertTTL.Get(),
opts.Authenticators)
if startErr != nil {
- klog.Errorf("failed to create dubbo ca server: %v", startErr)
+ log.Errorf("failed to create dubbo ca server: %v", startErr)
}
s.caServer = caServer
}
@@ -117,7 +117,7 @@ func (s *Server) RunCA(grpc *grpc.Server) {
if err == nil {
tok, err := detectAuthEnv(string(token))
if err != nil {
- klog.Warningf("Starting with invalid K8S JWT token:
%v", err)
+ log.Warnf("Starting with invalid K8S JWT token: %v",
err)
} else {
if iss == "" {
iss = tok.Iss
@@ -134,15 +134,15 @@ func (s *Server) RunCA(grpc *grpc.Server) {
oidcAuth, err := authenticate.NewJwtAuthenticator(&jwtRule, nil)
if err == nil {
s.caServer.Authenticators =
append(s.caServer.Authenticators, oidcAuth)
- klog.Info("Using out-of-cluster JWT authentication")
+ log.Info("Using out-of-cluster JWT authentication")
} else {
- klog.Info("K8S token doesn't support OIDC, using only
in-cluster auth")
+ log.Info("K8S token doesn't support OIDC, using only
in-cluster auth")
}
}
s.caServer.Register(grpc)
- klog.Info("Dubbod CA has started")
+ log.Info("Dubbod CA has started")
}
func (s *Server) loadCACerts(caOpts *caOptions, dir string) error {
@@ -164,7 +164,7 @@ func (s *Server) loadCACerts(caOpts *caOptions, dir string)
error {
return nil
}
if bundleExists {
- klog.Infof("incomplete signing CA bundle detected at %s", dir)
+ log.Infof("incomplete signing CA bundle detected at %s", dir)
}
secret, err :=
s.kubeClient.Kube().CoreV1().Secrets(caOpts.Namespace).Get(
@@ -177,7 +177,7 @@ func (s *Server) loadCACerts(caOpts *caOptions, dir string)
error {
}
// TODO: writing cacerts files from remote cluster will always fail,
- klog.Infof("cacerts Secret found in config cluster, saving contents to
%s", dir)
+ log.Infof("cacerts Secret found in config cluster, saving contents to
%s", dir)
if err := os.MkdirAll(dir, 0o700); err != nil {
return err
}
@@ -201,10 +201,10 @@ func (s *Server) createDubboRA(opts *caOptions)
(ra.RegistrationAuthority, error
// File does not exist.
if certSignerDomain == "" {
- klog.Infof("CA cert file %q not found, using %q.",
caCertFile, defaultCACertPath)
+ log.Infof("CA cert file %q not found, using %q.",
caCertFile, defaultCACertPath)
caCertFile = defaultCACertPath
} else {
- klog.Infof("CA cert file %q not found - ignoring.",
caCertFile)
+ log.Infof("CA cert file %q not found - ignoring.",
caCertFile)
caCertFile = ""
}
}
@@ -270,7 +270,7 @@ func (s *Server) createDubboCA(opts *caOptions)
(*ca.DubboCA, error) {
useSelfSignedCA := !signingCABundleComplete ||
(features.UseCacertsForSelfSignedCA && dubboGenerated)
if useSelfSignedCA {
if features.UseCacertsForSelfSignedCA && dubboGenerated {
- klog.Infof("DubboGenerated %s secret found, use it as
the CA certificate", ca.CACertsSecret)
+ log.Infof("DubboGenerated %s secret found, use it as
the CA certificate", ca.CACertsSecret)
}
// Either the secret is not mounted because it is named
`dubbo-ca-secret`,
@@ -282,7 +282,7 @@ func (s *Server) createDubboCA(opts *caOptions)
(*ca.DubboCA, error) {
caOpts.OnRootCertUpdate = s.updateRootCertAndGenKeyCert
} else {
// The secret is mounted and the "dubbo-generated" key is not
used.
- klog.Info("Use local CA certificate")
+ log.Info("Use local CA certificate")
caOpts, err = ca.NewPluggedCertDubboCAOptions(fileBundle,
workloadCertTTL.Get(), maxWorkloadCertTTL.Get(), caRSAKeySize.Get())
if err != nil {
@@ -293,10 +293,10 @@ func (s *Server) createDubboCA(opts *caOptions)
(*ca.DubboCA, error) {
// CRL is only supported for Plugged CA.
// If CRL file is present, read and notify it for
initial replication
if len(fileBundle.CRLFile) > 0 {
- klog.Infof("CRL file %s found, notifying it for
initial replication", fileBundle.CRLFile)
+ log.Infof("CRL file %s found, notifying it for
initial replication", fileBundle.CRLFile)
crlBytes, crlErr :=
os.ReadFile(fileBundle.CRLFile)
if crlErr != nil {
- klog.Errorf("failed to read CRL file
%s: %v", fileBundle.CRLFile, crlErr)
+ log.Errorf("failed to read CRL file %s:
%v", fileBundle.CRLFile, crlErr)
return nil, crlErr
}
@@ -321,13 +321,13 @@ func (s *Server) initCACertsAndCRLWatcher() {
s.cacertsWatcher, err = fsnotify.NewWatcher()
if err != nil {
- klog.Errorf("failed to add CAcerts watcher: %v", err)
+ log.Errorf("failed to add CAcerts watcher: %v", err)
return
}
err = s.addCACertsFileWatcher(LocalCertDir.Get())
if err != nil {
- klog.Errorf("failed to add CAcerts file watcher: %v", err)
+ log.Errorf("failed to add CAcerts file watcher: %v", err)
return
}
@@ -344,7 +344,7 @@ func (s *Server) handleCACertsFileWatch() {
case event, ok := <-s.cacertsWatcher.Events:
if !ok {
- klog.V(2).Infof("plugin cacerts watch stopped")
+ log.Debugf("plugin cacerts watch stopped")
return
}
if event.Has(fsnotify.Write) ||
event.Has(fsnotify.Create) {
@@ -355,7 +355,7 @@ func (s *Server) handleCACertsFileWatch() {
case err := <-s.cacertsWatcher.Errors:
if err != nil {
- klog.Errorf("failed to catch events on cacerts
file: %v", err)
+ log.Errorf("failed to catch events on cacerts
file: %v", err)
return
}
@@ -387,7 +387,7 @@ func detectAuthEnv(jwt string) (*authenticate.JwtPayload,
error) {
}
func handleEvent(s *Server) {
- klog.Info("Update Dubbod cacerts")
+ log.Info("Update Dubbod cacerts")
var newCABundle []byte
var err error
@@ -395,14 +395,14 @@ func handleEvent(s *Server) {
fileBundle, err := detectSigningCABundleAndCRL()
if err != nil {
- klog.Errorf("unable to determine signing file format %v", err)
+ log.Errorf("unable to determine signing file format %v", err)
return
}
// check if CA bundle is updated
newCABundle, err = os.ReadFile(fileBundle.RootCertFile)
if err != nil {
- klog.Errorf("failed reading root-cert.pem: %v", err)
+ log.Errorf("failed reading root-cert.pem: %v", err)
return
}
@@ -411,7 +411,7 @@ func handleEvent(s *Server) {
// Only updating intermediate CA is supported now
if !bytes.Equal(currentCABundle, newCABundle) {
if !features.MultiRootMesh {
- klog.Info("Multi root is disabled, updating new ROOT-CA
not supported")
+ log.Info("Multi root is disabled, updating new ROOT-CA
not supported")
return
}
@@ -419,10 +419,10 @@ func handleEvent(s *Server) {
// we need to make the new CA bundle contain both old and new
CA certs
if bytes.Contains(currentCABundle, newCABundle) ||
bytes.Contains(newCABundle, currentCABundle) {
- klog.Info("Updating new ROOT-CA")
+ log.Info("Updating new ROOT-CA")
updateRootCA = true
} else {
- klog.V(2).Info("Updating new ROOT-CA not supported")
+ log.Info("Updating new ROOT-CA not supported")
return
}
}
@@ -436,18 +436,18 @@ func handleEvent(s *Server) {
// handleEvent can be triggered either for
key-cert bundle update or
// for crl file update. So, even if there is an
error in reading crl file,
// we should log error and continue with
key-cert bundle update.
- klog.Errorf("failed reading crl file: %v",
crlReadErr)
+ log.Errorf("failed reading crl file: %v",
crlReadErr)
}
if !bytes.Equal(currentCRLData, crlData) {
- klog.Infof("Updating CRL data")
+ log.Infof("Updating CRL data")
updateCRL = true
}
}
}
if !updateRootCA && !updateCRL {
- klog.Info("No changes detected in root cert or CRL file data,
skipping update")
+ log.Info("No changes detected in root cert or CRL file data,
skipping update")
return
}
@@ -460,7 +460,7 @@ func handleEvent(s *Server) {
fileBundle.CRLFile,
)
if err != nil {
- klog.Errorf("Failed to update new Plug-in CA certs: %v", err)
+ log.Errorf("Failed to update new Plug-in CA certs: %v", err)
return
}
if len(s.CA.GetCAKeyCertBundle().GetRootCertPem()) != 0 {
@@ -470,26 +470,26 @@ func handleEvent(s *Server) {
// notify watcher to replicate new or updated crl data
if updateCRL {
s.dubbodCertBundleWatcher.SetAndNotifyCACRL(s.CA.GetCAKeyCertBundle().GetCRLPem())
- klog.Infof("Dubbod has detected the newly added CRL file and
updated its CRL accordingly")
+ log.Infof("Dubbod has detected the newly added CRL file and
updated its CRL accordingly")
}
err = s.updateRootCertAndGenKeyCert()
if err != nil {
- klog.Errorf("Failed generating plugged-in dubbod key cert: %v",
err)
+ log.Errorf("Failed generating plugged-in dubbod key cert: %v",
err)
return
}
- klog.Info("Dubbod has detected the newly added intermediate CA and
updated its key and certs accordingly")
+ log.Info("Dubbod has detected the newly added intermediate CA and
updated its key and certs accordingly")
}
func (s *Server) addCACertsFileWatcher(dir string) error {
err := s.cacertsWatcher.Add(dir)
if err != nil {
- klog.Infof("failed to add cacerts file watcher for %s: %v",
dir, err)
+ log.Infof("failed to add cacerts file watcher for %s: %v", dir,
err)
return err
}
- klog.Infof("Added cacerts files watcher at %v", dir)
+ log.Infof("Added cacerts files watcher at %v", dir)
return nil
}
@@ -498,7 +498,7 @@ func (s *Server)
createSelfSignedCACertificateOptions(fileBundle *ca.SigningCAFi
var caOpts *ca.DubboCAOptions
var err error
if s.kubeClient != nil {
- klog.Info("Use self-signed certificate as the CA certificate")
+ log.Info("Use self-signed certificate as the CA certificate")
// Abort after 20 minutes.
ctx, cancel := context.WithTimeout(context.Background(),
time.Minute*20)
@@ -513,7 +513,7 @@ func (s *Server)
createSelfSignedCACertificateOptions(fileBundle *ca.SigningCAFi
opts.Namespace, s.kubeClient.Kube().CoreV1(),
fileBundle.RootCertFile,
enableJitterForRootCertRotator.Get(),
caRSAKeySize.Get())
} else {
- klog.Infof("Use local self-signed CA certificate for testing.
Will use in-memory root CA, no K8S access and no ca key file %s",
fileBundle.SigningKeyFile)
+ log.Infof("Use local self-signed CA certificate for testing.
Will use in-memory root CA, no K8S access and no ca key file %s",
fileBundle.SigningKeyFile)
caOpts, err =
ca.NewSelfSignedDebugDubboCAOptions(fileBundle.RootCertFile,
SelfSignedCACertTTL.Get(),
workloadCertTTL.Get(), maxWorkloadCertTTL.Get(),
opts.TrustDomain, caRSAKeySize.Get())
@@ -557,7 +557,7 @@ func detectSigningCABundleAndCRL() (ca.SigningCAFileBundle,
error) {
// looking for tls file format (tls.crt)
if _, err := os.Stat(tlsSigningFile); err == nil {
- klog.Info("Using kubernetes.io/tls secret type for signing ca
files")
+ log.Info("Using kubernetes.io/tls secret type for signing ca
files")
return ca.SigningCAFileBundle{
RootCertFile: path.Join(LocalCertDir.Get(),
ca.TLSSecretRootCertFile),
CertChainFiles: []string{
@@ -571,7 +571,7 @@ func detectSigningCABundleAndCRL() (ca.SigningCAFileBundle,
error) {
return ca.SigningCAFileBundle{}, err
}
- klog.Info("Using dubbod file format for signing ca files")
+ log.Info("Using dubbod file format for signing ca files")
// default ca file format
signingCAFileBundle := ca.SigningCAFileBundle{
RootCertFile: path.Join(LocalCertDir.Get(), ca.RootCertFile),
@@ -584,7 +584,7 @@ func detectSigningCABundleAndCRL() (ca.SigningCAFileBundle,
error) {
// load crl file if it exists
crlFilePath := path.Join(LocalCertDir.Get(), ca.CACRLFile)
if _, err := os.Stat(crlFilePath); err == nil {
- klog.Info("Detected CRL file")
+ log.Info("Detected CRL file")
signingCAFileBundle.CRLFile = crlFilePath
}
}
diff --git a/sail/pkg/bootstrap/mesh.go b/sail/pkg/bootstrap/mesh.go
index 1121cafa..98db06ac 100644
--- a/sail/pkg/bootstrap/mesh.go
+++ b/sail/pkg/bootstrap/mesh.go
@@ -23,9 +23,9 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/config/mesh/meshwatcher"
"github.com/apache/dubbo-kubernetes/pkg/filewatcher"
"github.com/apache/dubbo-kubernetes/pkg/kube/krt"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"github.com/apache/dubbo-kubernetes/pkg/ptr"
"github.com/apache/dubbo-kubernetes/sail/pkg/features"
- "k8s.io/klog/v2"
"os"
"sigs.k8s.io/yaml"
)
@@ -35,21 +35,21 @@ const (
)
func (s *Server) initMeshConfiguration(args *SailArgs, fileWatcher
filewatcher.FileWatcher) {
- klog.Infof("initializing mesh configuration %v", args.MeshConfigFile)
+ log.Infof("initializing mesh configuration %v", args.MeshConfigFile)
col := s.getMeshConfiguration(args, fileWatcher)
col.AsCollection().WaitUntilSynced(s.internalStop)
s.environment.Watcher = meshwatcher.ConfigAdapter(col)
- klog.Infof("mesh configuration: %s",
meshwatcher.PrettyFormatOfMeshConfig(s.environment.Mesh()))
+ log.Infof("mesh configuration: %s",
meshwatcher.PrettyFormatOfMeshConfig(s.environment.Mesh()))
argsdump, _ := yaml.Marshal(args)
- klog.Infof("flags: \n%s", argsdump)
+ log.Infof("flags: \n%s", argsdump)
}
func (s *Server) initMeshNetworks(args *SailArgs, fileWatcher
filewatcher.FileWatcher) {
- klog.Infof("initializing mesh networks configuration %v",
args.NetworksConfigFile)
+ log.Infof("initializing mesh networks configuration %v",
args.NetworksConfigFile)
col := s.getMeshNetworks(args, fileWatcher)
col.AsCollection().WaitUntilSynced(s.internalStop)
s.environment.NetworksWatcher = meshwatcher.NetworksAdapter(col)
- klog.Infof("mesh networks configuration: %s",
meshwatcher.PrettyFormatOfMeshNetworks(s.environment.MeshNetworks()))
+ log.Infof("mesh networks configuration: %s",
meshwatcher.PrettyFormatOfMeshNetworks(s.environment.MeshNetworks()))
}
func (s *Server) getMeshNetworks(args *SailArgs, fileWatcher
filewatcher.FileWatcher) krt.Singleton[meshwatcher.MeshNetworksResource] {
@@ -58,7 +58,7 @@ func (s *Server) getMeshNetworks(args *SailArgs, fileWatcher
filewatcher.FileWat
opts := krt.NewOptionsBuilder(s.internalStop, "", args.KrtDebugger)
sources := s.getConfigurationSources(args, fileWatcher,
args.NetworksConfigFile, kubemesh.MeshNetworksKey)
if len(sources) == 0 {
- klog.Infof("Using default mesh networks - missing file %s and
no k8s client", args.NetworksConfigFile)
+ log.Infof("Using default mesh networks - missing file %s and no
k8s client", args.NetworksConfigFile)
}
return meshwatcher.NewNetworksCollection(opts, sources...)
}
@@ -88,6 +88,7 @@ func (s *Server) getConfigurationSources(args *SailArgs,
fileWatcher filewatcher
if s.kubeClient == nil {
return nil
}
+
configMapName := getMeshConfigMapName("")
primary := kubemesh.NewConfigMapSource(s.kubeClient, args.Namespace,
configMapName, cmKey, opts)
return toSources(primary, userMeshConfig)
diff --git a/sail/pkg/bootstrap/proxylessinjector.go
b/sail/pkg/bootstrap/proxylessinjector.go
index 1d7d1143..7e490cad 100644
--- a/sail/pkg/bootstrap/proxylessinjector.go
+++ b/sail/pkg/bootstrap/proxylessinjector.go
@@ -20,6 +20,7 @@ package bootstrap
import (
"context"
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"os"
"path/filepath"
@@ -29,7 +30,6 @@ import (
"github.com/apache/dubbo-kubernetes/sail/pkg/features"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/klog/v2"
)
const (
@@ -43,7 +43,7 @@ func (s *Server) initProxylessInjector(args *SailArgs)
(*inject.Webhook, error)
// currently the constant: "./var/lib/dubbo/inject"
injectPath := args.InjectionOptions.InjectionDirectory
if injectPath == "" || !injectionEnabled.Get() {
- klog.Infof("Skipping proxyless injector, injection path is
missing or disabled.")
+ log.Infof("Skipping proxyless injector, injection path is
missing or disabled.")
return nil, nil
}
@@ -61,18 +61,18 @@ func (s *Server) initProxylessInjector(args *SailArgs)
(*inject.Webhook, error)
cms := s.kubeClient.Kube().CoreV1().ConfigMaps(args.Namespace)
if _, err := cms.Get(context.TODO(), configMapName,
metav1.GetOptions{}); err != nil {
if errors.IsNotFound(err) {
- klog.Infof("Skipping proxyless injector,
template not found")
+ log.Infof("Skipping proxyless injector,
template not found")
return nil, nil
}
return nil, err
}
watcher = inject.NewConfigMapWatcher(s.kubeClient,
args.Namespace, configMapName, "config", "values")
} else {
- klog.Infof("Skipping proxyless injector, template not found")
+ log.Infof("Skipping proxyless injector, template not found")
return nil, nil
}
- klog.Info("initializing proxyless injector")
+ log.Info("initializing proxyless injector")
parameters := inject.WebhookParameters{
Watcher: watcher,
@@ -91,7 +91,7 @@ func (s *Server) initProxylessInjector(args *SailArgs)
(*inject.Webhook, error)
s.addStartFunc("injection patcher", func(stop <-chan struct{})
error {
patcher, err :=
webhooks.NewWebhookCertPatcher(s.kubeClient, webhookName, args.Revision,
s.dubbodCertBundleWatcher)
if err != nil {
- klog.Errorf("failed to create webhook cert
patcher: %v", err)
+ log.Errorf("failed to create webhook cert
patcher: %v", err)
return nil
}
diff --git a/sail/pkg/bootstrap/server.go b/sail/pkg/bootstrap/server.go
index c41c364d..19adb183 100644
--- a/sail/pkg/bootstrap/server.go
+++ b/sail/pkg/bootstrap/server.go
@@ -37,6 +37,7 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/kube/krt"
"github.com/apache/dubbo-kubernetes/pkg/kube/multicluster"
"github.com/apache/dubbo-kubernetes/pkg/kube/namespace"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
sec_model "github.com/apache/dubbo-kubernetes/pkg/model"
"github.com/apache/dubbo-kubernetes/pkg/network"
"github.com/apache/dubbo-kubernetes/pkg/spiffe"
@@ -64,7 +65,6 @@ import (
meshconfig "istio.io/api/mesh/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/rest"
- "k8s.io/klog"
"net"
"net/http"
"os"
@@ -209,7 +209,6 @@ func NewServer(args *SailArgs, initFuncs ...func(*Server))
(*Server, error) {
}
if caOpts.ExternalCAType == ra.ExtCAK8s {
- // Older environment variable preserved for backward
compatibility
caOpts.ExternalCASigner = k8sSigner
}
// CA signing certificate must be created first if needed.
@@ -221,7 +220,7 @@ func NewServer(args *SailArgs, initFuncs ...func(*Server))
(*Server, error) {
return nil, err
}
- InitGenerators(s.XDSServer, configGen, args.Namespace, s.clusterID,
s.internalDebugMux)
+ InitGenerators(s.XDSServer, configGen)
dubbodHost, _, err := e.GetDiscoveryAddress()
if err != nil {
@@ -274,7 +273,7 @@ func NewServer(args *SailArgs, initFuncs ...func(*Server))
(*Server, error) {
}
func (s *Server) Start(stop <-chan struct{}) error {
- klog.Infof("Starting Dubbod Server with primary cluster %s",
s.clusterID)
+ log.Infof("Starting Dubbod Server with primary cluster %s", s.clusterID)
if err := s.server.Start(stop); err != nil {
return err
}
@@ -291,9 +290,9 @@ func (s *Server) Start(stop <-chan struct{}) error {
return err
}
go func() {
- klog.Infof("starting secure gRPC discovery service at
%s", grpcListener.Addr())
+ log.Infof("starting secure gRPC discovery service at
%s", grpcListener.Addr())
if err := s.secureGrpcServer.Serve(grpcListener); err
!= nil {
- klog.Errorf("error serving secure GRPC server:
%v", err)
+ log.Errorf("error serving secure GRPC server:
%v", err)
}
}()
}
@@ -304,9 +303,9 @@ func (s *Server) Start(stop <-chan struct{}) error {
return err
}
go func() {
- klog.Infof("starting gRPC discovery service at %s",
grpcListener.Addr())
+ log.Infof("starting gRPC discovery service at %s",
grpcListener.Addr())
if err := s.grpcServer.Serve(grpcListener); err != nil {
- klog.Errorf("error serving GRPC server: %v",
err)
+ log.Errorf("error serving GRPC server: %v", err)
}
}()
}
@@ -317,9 +316,9 @@ func (s *Server) Start(stop <-chan struct{}) error {
return err
}
go func() {
- klog.Infof("starting webhook service at %s",
httpsListener.Addr())
+ log.Infof("starting webhook service at %s",
httpsListener.Addr())
if err := s.httpsServer.ServeTLS(httpsListener, "",
""); network.IsUnexpectedListenerError(err) {
- klog.Errorf("error serving https server: %v",
err)
+ log.Errorf("error serving https server: %v",
err)
}
}()
s.httpsAddr = httpsListener.Addr().String()
@@ -331,16 +330,16 @@ func (s *Server) Start(stop <-chan struct{}) error {
}
func (s *Server) initDiscoveryService() {
- klog.Infof("starting discovery service")
+ log.Infof("starting discovery service")
s.addStartFunc("xds server", func(stop <-chan struct{}) error {
- klog.Infof("Starting ADS server")
+ log.Infof("Starting ADS server")
s.XDSServer.Start(stop)
return nil
})
}
func (s *Server) initRegistryEventHandlers() {
- klog.Info("initializing registry event handlers")
+ log.Info("initializing registry event handlers")
if s.configController != nil {
configHandler := func(prev config.Config, curr config.Config,
event model.Event) {}
@@ -378,10 +377,10 @@ func (s *Server) startCA(caOpts *caOptions) {
}
// init the RA server if configured, else start init CA server
if s.RA != nil {
- klog.Infof("initializing CA server with RA")
+ log.Infof("initializing CA server with RA")
s.initCAServer(s.RA, caOpts)
} else if s.CA != nil {
- klog.Infof("initializing CA server with Dubbod CA")
+ log.Infof("initializing CA server with Dubbod CA")
s.initCAServer(s.CA, caOpts)
}
s.addStartFunc("ca", func(stop <-chan struct{}) error {
@@ -389,7 +388,7 @@ func (s *Server) startCA(caOpts *caOptions) {
if s.secureGrpcServer == nil {
grpcServer = s.grpcServer
}
- klog.Infof("Starting CA server")
+ log.Infof("Starting CA server")
s.RunCA(grpcServer)
return nil
})
@@ -457,7 +456,7 @@ func (s *Server) initKubeClient(args *SailArgs) error {
}
func (s *Server) initMeshHandlers(changeHandler func(_
*meshconfig.MeshConfig)) {
- klog.Info("initializing mesh handlers")
+ log.Info("initializing mesh handlers")
// When the mesh config or networks change, do a full push.
s.environment.AddMeshHandler(func() {
changeHandler(s.environment.Mesh())
@@ -477,7 +476,7 @@ func (s *Server) initServers(args *SailArgs) {
} else {
// This happens only if the GRPC port (15010) is disabled. We
will multiplex
// it on the HTTP port. Does not impact the HTTPS gRPC or HTTPS.
- klog.Infof("multiplexing gRPC on http addr %v",
args.ServerOptions.HTTPAddr)
+ log.Infof("multiplexing gRPC on http addr %v",
args.ServerOptions.HTTPAddr)
multiplexGRPC = true
}
h2s := &http2.Server{
@@ -515,7 +514,7 @@ func (s *Server) initGrpcServer(options
*dubbokeepalive.Options) {
}
func (s *Server) initControllers(args *SailArgs) error {
- klog.Info("initializing controllers")
+ log.Info("initializing controllers")
s.initMulticluster(args)
@@ -532,7 +531,7 @@ func (s *Server) initControllers(args *SailArgs) error {
func (s *Server) initSecureDiscoveryService(args *SailArgs, trustDomain
string) error {
if args.ServerOptions.SecureGRPCAddr == "" {
- klog.Info("The secure discovery port is disabled, multiplexing
on httpAddr ")
+ log.Info("The secure discovery port is disabled, multiplexing
on httpAddr ")
return nil
}
@@ -542,10 +541,10 @@ func (s *Server) initSecureDiscoveryService(args
*SailArgs, trustDomain string)
}
if peerCertVerifier == nil {
// Running locally without configured certs - no TLS mode
- klog.Warningf("The secure discovery service is disabled")
+ log.Warnf("The secure discovery service is disabled")
return nil
}
- klog.Info("initializing secure discovery service")
+ log.Info("initializing secure discovery service")
cfg := &tls.Config{
GetCertificate: s.getDubbodCertificate,
@@ -554,7 +553,7 @@ func (s *Server) initSecureDiscoveryService(args *SailArgs,
trustDomain string)
VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains
[][]*x509.Certificate) error {
err := peerCertVerifier.VerifyPeerCert(rawCerts,
verifiedChains)
if err != nil {
- klog.Infof("Could not verify certificate: %v",
err)
+ log.Infof("Could not verify certificate: %v",
err)
}
return err
},
@@ -649,9 +648,9 @@ func (s *Server) serveHTTP() error {
return err
}
go func() {
- klog.Infof("starting HTTP service at %s", httpListener.Addr())
+ log.Infof("starting HTTP service at %s", httpListener.Addr())
if err := s.httpServer.Serve(httpListener);
network.IsUnexpectedListenerError(err) {
- klog.Errorf("error serving http server: %v", err)
+ log.Errorf("error serving http server: %v", err)
}
}()
s.httpAddr = httpListener.Addr().String()
@@ -662,7 +661,7 @@ func (s *Server) serveHTTP() error {
func (s *Server) maybeCreateCA(caOpts *caOptions) error {
// CA signing certificate must be created only if CA is enabled.
if features.EnableCAServer {
- klog.Info("creating CA and initializing public key")
+ log.Info("creating CA and initializing public key")
var err error
if useRemoteCerts.Get() {
if err = s.loadCACerts(caOpts, LocalCertDir.Get()); err
!= nil {
@@ -708,7 +707,7 @@ func (s *Server) initSDSServer() {
}
if !features.EnableXDSIdentityCheck {
// Make sure we have security
- klog.Warningf("skipping Kubernetes credential reader;
SAIL_ENABLE_XDS_IDENTITY_CHECK must be set to true for this feature.")
+ log.Warnf("skipping Kubernetes credential reader;
SAIL_ENABLE_XDS_IDENTITY_CHECK must be set to true for this feature.")
} else {
// TODO ConfigUpdated Multicluster get secret and configmap
}
@@ -766,11 +765,11 @@ func (s *Server) waitForShutdown(stop <-chan struct{}) {
ctx, cancel := context.WithTimeout(context.Background(),
s.shutdownDuration)
defer cancel()
if err := s.httpServer.Shutdown(ctx); err != nil {
- klog.Error(err)
+ log.Error(err)
}
if s.httpsServer != nil {
if err := s.httpsServer.Shutdown(ctx); err != nil {
- klog.Error(err)
+ log.Error(err)
}
}
@@ -793,7 +792,7 @@ func (s *Server) cachesSynced() bool {
func (s *Server) pushContextReady(expected int64) bool {
committed := s.XDSServer.CommittedUpdates.Load()
if committed < expected {
- klog.V(2).Infof("Waiting for pushcontext to process inbound
updates, inbound: %v, committed : %v", expected, committed)
+ log.Debugf("Waiting for pushcontext to process inbound updates,
inbound: %v, committed : %v", expected, committed)
return false
}
return true
@@ -801,12 +800,12 @@ func (s *Server) pushContextReady(expected int64) bool {
func (s *Server) waitForCacheSync(stop <-chan struct{}) bool {
start := time.Now()
- klog.Info("Waiting for caches to be synced")
+ log.Info("Waiting for caches to be synced")
if !kubelib.WaitForCacheSync("server", stop, s.cachesSynced) {
- klog.Errorf("Failed waiting for cache sync")
+ log.Errorf("Failed waiting for cache sync")
return false
}
- klog.Infof("All controller caches have been synced up in %v",
time.Since(start))
+ log.Infof("All controller caches have been synced up in %v",
time.Since(start))
expected := s.XDSServer.InboundUpdates.Load()
return kubelib.WaitForCacheSync("push context", stop, func() bool {
return s.pushContextReady(expected) })
}
@@ -825,20 +824,20 @@ func (s *Server) initDubbodCerts(args *SailArgs, host
string) error {
})
if err != nil {
// Not crashing dubbod - This typically happens if
certs are missing and in tests.
- klog.Errorf("error initializing certificate watches:
%v", err)
+ log.Errorf("error initializing certificate watches:
%v", err)
return nil
}
} else if features.EnableCAServer && features.SailCertProvider ==
constants.CertProviderDubbod {
- klog.Infof("initializing Dubbod DNS certificates host: %s,
custom host: %s", host, features.DubbodServiceCustomHost)
+ log.Infof("initializing Dubbod DNS certificates host: %s,
custom host: %s", host, features.DubbodServiceCustomHost)
err = s.initDNSCertsDubbod()
} else if features.SailCertProvider == constants.CertProviderKubernetes
{
- klog.Warningf("SAIL_CERT_PROVIDER=kubernetes is no longer
supported by upstream K8S")
+ log.Warnf("SAIL_CERT_PROVIDER=kubernetes is no longer supported
by upstream K8S")
} else if strings.HasPrefix(features.SailCertProvider,
constants.CertProviderKubernetesSignerPrefix) {
- klog.Infof("initializing Dubbod DNS certificates using K8S
RA:%s host: %s, custom host: %s", features.SailCertProvider,
+ log.Infof("initializing Dubbod DNS certificates using K8S RA:%s
host: %s, custom host: %s", features.SailCertProvider,
host, features.DubbodServiceCustomHost)
err = s.initDNSCertsK8SRA()
} else {
- klog.Warningf("SAIL_CERT_PROVIDER=%s is not implemented",
features.SailCertProvider)
+ log.Warnf("SAIL_CERT_PROVIDER=%s is not implemented",
features.SailCertProvider)
}
if err == nil {
@@ -851,7 +850,7 @@ func (s *Server) initDubbodCerts(args *SailArgs, host
string) error {
func (s *Server) dubbodReadyHandler(w http.ResponseWriter, _ *http.Request) {
for name, fn := range s.readinessProbes {
if ready := fn(); !ready {
- klog.Warningf("%s is not ready", name)
+ log.Warnf("%s is not ready", name)
w.WriteHeader(http.StatusServiceUnavailable)
return
}
@@ -887,7 +886,7 @@ func getDNSNames(args *SailArgs, host string) []string {
sans := sets.New(cHosts...)
sans.Insert(host)
dnsNames := sets.SortedList(sans)
- klog.Infof("Discover server subject alt names: %v", dnsNames)
+ log.Infof("Discover server subject alt names: %v", dnsNames)
return dnsNames
}
@@ -954,13 +953,13 @@ func (s *Server) loadDubbodCert() error {
return fmt.Errorf("x509 cert - ParseCertificates()
error: %v", err)
}
for _, c := range x509Cert {
- klog.Infof("x509 cert - Issuer: %q, Subject: %q, SN:
%x, NotBefore: %q, NotAfter: %q",
+ log.Infof("x509 cert - Issuer: %q, Subject: %q, SN: %x,
NotBefore: %q, NotAfter: %q",
c.Issuer, c.Subject, c.SerialNumber,
c.NotBefore.Format(time.RFC3339),
c.NotAfter.Format(time.RFC3339))
}
}
- klog.Info("Dubbod certificates are reloaded")
+ log.Info("Dubbod certificates are reloaded")
s.certMu.Lock()
s.dubbodCert = &keyPair
s.certMu.Unlock()
@@ -974,7 +973,7 @@ func (s *Server) reloadDubbodCert(watchCh <-chan struct{},
stopCh <-chan struct{
return
case <-watchCh:
if err := s.loadDubbodCert(); err != nil {
- klog.Errorf("reload dubbod cert failed: %v",
err)
+ log.Errorf("reload dubbod cert failed: %v", err)
}
}
}
diff --git a/sail/pkg/bootstrap/servicecontroller.go
b/sail/pkg/bootstrap/servicecontroller.go
index b4963153..4fa83bd3 100644
--- a/sail/pkg/bootstrap/servicecontroller.go
+++ b/sail/pkg/bootstrap/servicecontroller.go
@@ -19,12 +19,12 @@ package bootstrap
import (
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/aggregate"
kubecontroller
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/kube/controller"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
- "k8s.io/klog/v2"
)
func (s *Server) ServiceController() *aggregate.Controller {
@@ -39,11 +39,11 @@ func (s *Server) initServiceControllers(args *SailArgs)
error {
for _, r := range args.RegistryOptions.Registries {
serviceRegistry := provider.ID(r)
if registered.Contains(serviceRegistry) {
- klog.Infof("%s registry specified multiple times.", r)
+ log.Infof("%s registry specified multiple times.", r)
continue
}
registered.Insert(serviceRegistry)
- klog.Infof("Adding %s registry adapter", serviceRegistry)
+ log.Infof("Adding %s registry adapter", serviceRegistry)
switch serviceRegistry {
case provider.Kubernetes:
if err := s.initKubeRegistry(args); err != nil {
diff --git a/sail/pkg/bootstrap/validation.go b/sail/pkg/bootstrap/validation.go
index 6e1be033..26b4b02a 100644
--- a/sail/pkg/bootstrap/validation.go
+++ b/sail/pkg/bootstrap/validation.go
@@ -18,17 +18,17 @@
package bootstrap
import (
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"github.com/apache/dubbo-kubernetes/pkg/webhooks/server"
"github.com/apache/dubbo-kubernetes/pkg/webhooks/validation/controller"
"github.com/apache/dubbo-kubernetes/sail/pkg/features"
- "k8s.io/klog/v2"
)
func (s *Server) initConfigValidation(args *SailArgs) error {
if s.kubeClient == nil {
return nil
}
- klog.Info("initializing config validator")
+ log.Info("initializing config validator")
params := server.Options{
DomainSuffix: args.RegistryOptions.KubeOptions.DomainSuffix,
Mux: s.httpsMux,
@@ -41,7 +41,7 @@ func (s *Server) initConfigValidation(args *SailArgs) error {
if features.ValidationWebhookConfigName != "" && s.kubeClient != nil {
s.addStartFunc("validation controller", func(stop <-chan
struct{}) error {
- klog.Infof("Starting validation controller")
+ log.Infof("Starting validation controller")
go controller.NewValidatingWebhookController(
s.kubeClient, args.Revision, args.Namespace,
s.dubbodCertBundleWatcher).Run(stop)
return nil
diff --git a/sail/pkg/bootstrap/webhook.go b/sail/pkg/bootstrap/webhook.go
index a28f4c87..630c234a 100644
--- a/sail/pkg/bootstrap/webhook.go
+++ b/sail/pkg/bootstrap/webhook.go
@@ -23,8 +23,8 @@ import (
"net/http"
"strings"
+ dubbolog "github.com/apache/dubbo-kubernetes/pkg/log"
sec_model "github.com/apache/dubbo-kubernetes/pkg/model"
- "k8s.io/klog/v2"
)
type httpServerErrorLogWriter struct{}
@@ -32,9 +32,9 @@ type httpServerErrorLogWriter struct{}
func (*httpServerErrorLogWriter) Write(p []byte) (int, error) {
m := strings.TrimSuffix(string(p), "\n")
if strings.HasPrefix(m, "http: TLS handshake error") &&
strings.HasSuffix(m, ": EOF") {
- klog.V(2).Info(m)
+ dubbolog.Debug(m)
} else {
- klog.Info(m)
+ dubbolog.Info(m)
}
return len(p), nil
}
@@ -42,7 +42,7 @@ func (*httpServerErrorLogWriter) Write(p []byte) (int, error)
{
func (s *Server) initSecureWebhookServer(args *SailArgs) {
if args.ServerOptions.HTTPSAddr == "" {
s.httpsMux = s.httpMux
- klog.Infof("HTTPS port is disabled, multiplexing webhooks on
the httpAddr %v", args.ServerOptions.HTTPAddr)
+ dubbolog.Infof("HTTPS port is disabled, multiplexing webhooks
on the httpAddr %v", args.ServerOptions.HTTPAddr)
return
}
@@ -54,7 +54,7 @@ func (s *Server) initSecureWebhookServer(args *SailArgs) {
// Compliance for control plane validation and injection webhook server.
sec_model.EnforceGoCompliance(tlsConfig)
- klog.Info("initializing secure webhook server for dubbod webhooks")
+ dubbolog.Info("initializing secure webhook server for dubbod webhooks")
// create the https server for hosting the k8s injectionWebhook
handlers.
s.httpsMux = http.NewServeMux()
s.httpsServer = &http.Server{
diff --git a/sail/pkg/config/kube/crdclient/client.go
b/sail/pkg/config/kube/crdclient/client.go
index ffa82c3d..72b5b972 100644
--- a/sail/pkg/config/kube/crdclient/client.go
+++ b/sail/pkg/config/kube/crdclient/client.go
@@ -30,6 +30,7 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
"github.com/apache/dubbo-kubernetes/pkg/kube/krt"
"github.com/apache/dubbo-kubernetes/pkg/kube/kubetypes"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"github.com/apache/dubbo-kubernetes/pkg/maps"
"github.com/apache/dubbo-kubernetes/sail/pkg/model"
jsonmerge "github.com/evanphx/json-patch/v5"
@@ -38,7 +39,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
- "k8s.io/klog/v2"
)
type Client struct {
@@ -51,6 +51,7 @@ type Client struct {
schemas collection.Schemas
client kube.Client
filtersByGVK map[config.GroupVersionKind]kubetypes.Filter
+ logger *log.Logger
}
var _ model.ConfigStoreController = &Client{}
@@ -87,6 +88,7 @@ func NewForSchemas(client kube.Client, opts Option, schemas
collection.Schemas)
client: client,
filtersByGVK: opts.FiltersByGVK,
stop: stop,
+ logger: log.RegisterScope("crdclient", "Sail
Kubernetes CRD controller"),
}
kopts := krt.NewOptionsBuilder(stop, "crdclient", opts.KrtDebugger)
@@ -106,15 +108,15 @@ func (cl *Client) Run(stop <-chan struct{}) {
}
t0 := time.Now()
- klog.Info("Starting Sail Kubernetes CRD controller")
+ cl.logger.Infof("Starting Sail Kubernetes CRD controller")
if !kube.WaitForCacheSync("crdclient", stop, cl.informerSynced) {
- klog.Errorf("Failed to sync Sail Kubernetes CRD controller
cache")
+ cl.logger.Infof("Failed to sync Sail Kubernetes CRD controller
cache")
} else {
- klog.Infof("Sail Kubernetes CRD controller synced in %v",
time.Since(t0))
+ cl.logger.Infof("Sail Kubernetes CRD controller synced in %v",
time.Since(t0))
}
<-stop
close(cl.stop)
- klog.Info("controller terminated")
+ cl.logger.Infof("controller terminated")
}
func (cl *Client) HasSynced() bool {
@@ -136,7 +138,7 @@ func (cl *Client) HasSynced() bool {
func (cl *Client) informerSynced() bool {
for gk, ctl := range cl.allKinds() {
if !ctl.collection.HasSynced() {
- klog.Infof("controller %q is syncing...", gk)
+ cl.logger.Infof("controller %q is syncing...", gk)
return false
}
}
@@ -185,10 +187,10 @@ func genPatchBytes(oldRes, modRes runtime.Object,
patchType types.PatchType) ([]
}
func (cl *Client) addCRD(name string, opts krt.OptionsBuilder) {
- klog.V(2).Infof("adding CRD %q", name)
+ cl.logger.Debugf("adding CRD %q", name)
s, f := cl.schemasByCRDName[name]
if !f {
- klog.V(2).Infof("added resource that we are not watching: %v",
name)
+ cl.logger.Debugf("added resource that we are not watching: %v",
name)
return
}
resourceGVK := s.GroupVersionKind()
@@ -197,12 +199,12 @@ func (cl *Client) addCRD(name string, opts
krt.OptionsBuilder) {
cl.kindsMu.Lock()
defer cl.kindsMu.Unlock()
if _, f := cl.kinds[resourceGVK]; f {
- klog.V(2).Infof("added resource that already exists: %v",
resourceGVK)
+ cl.logger.Debugf("added resource that already exists: %v",
resourceGVK)
return
}
translateFunc, f := translationMap[resourceGVK]
if !f {
- klog.Errorf("translation function for %v not found",
resourceGVK)
+ cl.logger.Errorf("translation function for %v not found",
resourceGVK)
return
}
@@ -273,7 +275,7 @@ func (cl *Client) Schemas() collection.Schemas {
func (cl *Client) Get(typ config.GroupVersionKind, name, namespace string)
*config.Config {
h, f := cl.kind(typ)
if !f {
- klog.Warningf("unknown type: %s", typ)
+ cl.logger.Warnf("unknown type: %s", typ)
return nil
}
@@ -286,7 +288,7 @@ func (cl *Client) Get(typ config.GroupVersionKind, name,
namespace string) *conf
obj := h.collection.GetKey(key)
if obj == nil {
- klog.V(2).Infof("couldn't find %s/%s in informer index",
namespace, name)
+ cl.logger.Debugf("couldn't find %s/%s in informer index",
namespace, name)
return nil
}
@@ -374,5 +376,5 @@ func (cl *Client) RegisterEventHandler(kind
config.GroupVersionKind, handler mod
return
}
- klog.Warningf("unknown type: %s", kind)
+ cl.logger.Warnf("unknown type: %s", kind)
}
diff --git a/sail/pkg/leaderelection/leaderelection.go
b/sail/pkg/leaderelection/leaderelection.go
index e295ae45..e98d4cf5 100644
--- a/sail/pkg/leaderelection/leaderelection.go
+++ b/sail/pkg/leaderelection/leaderelection.go
@@ -20,6 +20,7 @@ package leaderelection
import (
"context"
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"os"
"sync"
"time"
@@ -31,11 +32,10 @@ import (
"go.uber.org/atomic"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
- "k8s.io/klog/v2"
)
const (
- NamespaceController = "istio-namespace-controller-election"
+ NamespaceController = "dubbo-namespace-controller-election"
ClusterTrustBundleController =
"dubbo-clustertrustbundle-controller-election"
)
@@ -127,7 +127,7 @@ func (l *LeaderElection) create()
(*k8sleaderelection.LeaderElector, error) {
leaderCtx := l.leaderCtx
l.leaderMu.Unlock()
- klog.Infof("leader election lock obtained: %v",
l.electionID)
+ log.Infof("leader election lock obtained: %v",
l.electionID)
for _, f := range l.runFns {
go f(leaderCtx.Done())
}
@@ -141,7 +141,7 @@ func (l *LeaderElection) create()
(*k8sleaderelection.LeaderElector, error) {
l.leaderCtx = nil
}
l.leaderMu.Unlock()
- klog.Infof("leader election lock lost: %v",
l.electionID)
+ log.Infof("leader election lock lost: %v", l.electionID)
},
}
@@ -183,7 +183,7 @@ func (l *LeaderElection) Run(stop <-chan struct{}) {
if !l.enabled {
// Silently bypass leader election for single-node deployments
or when disabled
// No need to log this as it's expected behavior
- klog.V(2).Infof("bypassing leader election: %v", l.electionID)
+ log.Infof("bypassing leader election: %v", l.electionID)
for _, f := range l.runFns {
go f(stop)
}
@@ -258,7 +258,7 @@ func (l *LeaderElection) Run(stop <-chan struct{}) {
default:
// Otherwise, we may have lost our lock. This can
happen when the default revision changes and steals
// the lock from us.
- klog.Infof("Leader election cycle %v lost. Trying
again", l.cycle.Load())
+ log.Infof("Leader election cycle %v lost. Trying
again", l.cycle.Load())
}
}
}
diff --git a/sail/pkg/model/cluster_local.go b/sail/pkg/model/cluster_local.go
index 8908722a..73cae2bf 100644
--- a/sail/pkg/model/cluster_local.go
+++ b/sail/pkg/model/cluster_local.go
@@ -18,11 +18,11 @@
package model
import (
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"strings"
"sync"
"github.com/apache/dubbo-kubernetes/pkg/config/host"
- "k8s.io/klog/v2"
)
var (
@@ -76,7 +76,7 @@ func (c *clusterLocalProvider) onMeshUpdated(e *Environment) {
}
if discoveryHost, _, err := e.GetDiscoveryAddress(); err != nil {
- klog.Errorf("failed to make discoveryAddress cluster-local:
%v", err)
+ log.Errorf("failed to make discoveryAddress cluster-local: %v",
err)
} else {
if !strings.HasSuffix(string(discoveryHost), domainSuffix) {
discoveryHost += host.Name("." + domainSuffix)
diff --git a/sail/pkg/model/context.go b/sail/pkg/model/context.go
index 38e255c5..68c11264 100644
--- a/sail/pkg/model/context.go
+++ b/sail/pkg/model/context.go
@@ -67,8 +67,6 @@ type (
)
const (
- Proxyless = pm.Proxyless
-
IPv4 = pm.IPv4
IPv6 = pm.IPv6
Dual = pm.Dual
diff --git a/sail/pkg/model/endpointshards.go b/sail/pkg/model/endpointshards.go
index 2d4d2e56..3190b076 100644
--- a/sail/pkg/model/endpointshards.go
+++ b/sail/pkg/model/endpointshards.go
@@ -18,6 +18,7 @@
package model
import (
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"sync"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
@@ -25,7 +26,6 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/cluster"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
"github.com/apache/dubbo-kubernetes/sail/pkg/serviceregistry/provider"
- "k8s.io/klog/v2"
)
type ShardKey struct {
@@ -153,9 +153,9 @@ func (e *EndpointIndex) UpdateServiceEndpoints(
// flip flopping between 1 and 0.
e.DeleteServiceShard(shard, hostname, namespace, true)
if logPushType {
- klog.Infof("Incremental push, service %s at shard %v
has no endpoints", hostname, shard)
+ log.Infof("Incremental push, service %s at shard %v has
no endpoints", hostname, shard)
} else {
- klog.Infof("Cache Update, Service %s at shard %v has no
endpoints", hostname, shard)
+ log.Infof("Cache Update, Service %s at shard %v has no
endpoints", hostname, shard)
}
return IncrementalPush
}
@@ -166,9 +166,9 @@ func (e *EndpointIndex) UpdateServiceEndpoints(
// If we create a new endpoint shard, that means we have not seen the
service earlier. We should do a full push.
if created {
if logPushType {
- klog.Infof("Full push, new service %s/%s", namespace,
hostname)
+ log.Infof("Full push, new service %s/%s", namespace,
hostname)
} else {
- klog.Infof("Cache Update, new service %s/%s",
namespace, hostname)
+ log.Infof("Cache Update, new service %s/%s", namespace,
hostname)
}
pushType = FullPush
}
@@ -198,12 +198,12 @@ func (e *EndpointIndex) UpdateServiceEndpoints(
newUnhealthyCount++
}
}
- klog.Warningf("UpdateServiceEndpoints: service=%s, shard=%v,
oldEndpoints=%d (healthy=%d, unhealthy=%d), newEndpoints=%d (healthy=%d,
unhealthy=%d), needPush=%v, pushType=%v",
+ log.Warnf("UpdateServiceEndpoints: service=%s, shard=%v,
oldEndpoints=%d (healthy=%d, unhealthy=%d), newEndpoints=%d (healthy=%d,
unhealthy=%d), needPush=%v, pushType=%v",
hostname, shard, len(oldDubboEndpoints),
oldHealthyCount, oldUnhealthyCount, len(newDubboEndpoints), newHealthyCount,
newUnhealthyCount, needPush, pushType)
}
if pushType != FullPush && !needPush {
- klog.Warningf("No push, either old endpoint health status did
not change or new endpoint came with unhealthy status, %v (oldEndpoints=%d,
newEndpoints=%d)", hostname, len(oldDubboEndpoints), len(newDubboEndpoints))
+ log.Warnf("No push, either old endpoint health status did not
change or new endpoint came with unhealthy status, %v (oldEndpoints=%d,
newEndpoints=%d)", hostname, len(oldDubboEndpoints), len(newDubboEndpoints))
pushType = NoPush
}
@@ -216,9 +216,9 @@ func (e *EndpointIndex) UpdateServiceEndpoints(
if saUpdated && pushType != FullPush {
// Avoid extra logging if already a full push
if logPushType {
- klog.Infof("Full push, service accounts changed, %v",
hostname)
+ log.Infof("Full push, service accounts changed, %v",
hostname)
} else {
- klog.Infof("Cache Update, service accounts changed,
%v", hostname)
+ log.Infof("Cache Update, service accounts changed, %v",
hostname)
}
pushType = FullPush
}
@@ -265,7 +265,7 @@ func updateShardServiceAccount(shards *EndpointShards,
serviceName string) bool
if !oldServiceAccount.Equals(serviceAccounts) {
shards.ServiceAccounts = serviceAccounts
- klog.V(2).Infof("Updating service accounts now, svc %v, before
service account %v, after %v",
+ log.Debugf("Updating service accounts now, svc %v, before
service account %v, after %v",
serviceName, oldServiceAccount, serviceAccounts)
return true
}
diff --git a/sail/pkg/model/push_context.go b/sail/pkg/model/push_context.go
index 8e9114f1..02225999 100644
--- a/sail/pkg/model/push_context.go
+++ b/sail/pkg/model/push_context.go
@@ -19,6 +19,7 @@ package model
import (
"cmp"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"sync"
"time"
@@ -35,13 +36,11 @@ import (
"go.uber.org/atomic"
meshconfig "istio.io/api/mesh/v1alpha1"
"k8s.io/apimachinery/pkg/types"
- "k8s.io/klog/v2"
)
var (
LastPushStatus *PushContext
- // LastPushMutex will protect the LastPushStatus
- LastPushMutex sync.Mutex
+ LastPushMutex sync.Mutex
)
type TriggerReason string
@@ -318,7 +317,6 @@ func (ps *PushContext) OnConfigChange() {
}
func (ps *PushContext) ServiceForHostname(proxy *Proxy, hostname host.Name)
*Service {
- // TODO SidecarScope?
for _, service := range ps.ServiceIndex.HostnameAndNamespace[hostname] {
return service
}
@@ -387,7 +385,7 @@ func (ps *PushContext) initServiceRegistry(env
*Environment, configsUpdate sets.
}
if existing :=
ps.ServiceIndex.HostnameAndNamespace[s.Hostname][s.Attributes.Namespace];
existing != nil &&
!(existing.Attributes.ServiceRegistry !=
provider.Kubernetes && s.Attributes.ServiceRegistry == provider.Kubernetes) {
- klog.V(2).Infof("Service %s/%s from registry %s ignored
by %s/%s/%s", s.Attributes.Namespace, s.Hostname, s.Attributes.ServiceRegistry,
+ log.Debugf("Service %s/%s from registry %s ignored by
%s/%s/%s", s.Attributes.Namespace, s.Hostname, s.Attributes.ServiceRegistry,
existing.Attributes.ServiceRegistry,
existing.Attributes.Namespace, existing.Hostname)
} else {
ps.ServiceIndex.HostnameAndNamespace[s.Hostname][s.Attributes.Namespace] = s
diff --git a/sail/pkg/model/typed_xds_cache.go
b/sail/pkg/model/typed_xds_cache.go
index 0951c71b..bbd25d6e 100644
--- a/sail/pkg/model/typed_xds_cache.go
+++ b/sail/pkg/model/typed_xds_cache.go
@@ -19,6 +19,7 @@ package model
import (
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"sync"
"time"
@@ -29,7 +30,6 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/hashicorp/golang-lru/v2/simplelru"
"google.golang.org/protobuf/testing/protocmp"
- "k8s.io/klog/v2"
)
type CacheToken uint64
@@ -218,7 +218,7 @@ func (l *lruCache[K]) Clear(configs sets.Set[ConfigKey]) {
delete(l.configIndex, hc)
}
if clearedCount > 0 {
- klog.V(3).Infof("lruCache.Clear: cleared %d cache entries for
%d configs", clearedCount, len(configs))
+ log.Debugf("lruCache.Clear: cleared %d cache entries for %d
configs", clearedCount, len(configs))
}
}
diff --git a/sail/pkg/model/xds_cache.go b/sail/pkg/model/xds_cache.go
index 5b279f5d..614f27b0 100644
--- a/sail/pkg/model/xds_cache.go
+++ b/sail/pkg/model/xds_cache.go
@@ -18,13 +18,13 @@
package model
import (
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"time"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
"github.com/apache/dubbo-kubernetes/sail/pkg/features"
discovery
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
- "k8s.io/klog/v2"
)
type XdsCacheImpl struct {
@@ -116,7 +116,7 @@ func (x XdsCacheImpl) Add(entry XdsCacheEntry, pushRequest
*PushRequest, value *
key := k.(uint64)
x.rds.Add(key, entry, pushRequest, value)
default:
- klog.Errorf("unknown type %s", entry.Type())
+ log.Errorf("unknown type %s", entry.Type())
}
}
@@ -140,7 +140,7 @@ func (x XdsCacheImpl) Get(entry XdsCacheEntry)
*discovery.Resource {
key := k.(uint64)
return x.rds.Get(key)
default:
- klog.Errorf("unknown type %s", entry.Type())
+ log.Errorf("unknown type %s", entry.Type())
return nil
}
}
diff --git a/sail/pkg/networking/grpcgen/cds.go
b/sail/pkg/networking/grpcgen/cds.go
index 124981a8..e206d4f2 100644
--- a/sail/pkg/networking/grpcgen/cds.go
+++ b/sail/pkg/networking/grpcgen/cds.go
@@ -19,6 +19,7 @@ package grpcgen
import (
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"github.com/apache/dubbo-kubernetes/pkg/config/host"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
@@ -28,7 +29,6 @@ import (
cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
- "k8s.io/klog/v2"
)
func (g *GrpcConfigGenerator) BuildClusters(node *model.Proxy, push
*model.PushContext, names []string) model.Resources {
@@ -37,7 +37,7 @@ func (g *GrpcConfigGenerator) BuildClusters(node
*model.Proxy, push *model.PushC
for defaultClusterName, subsetFilter := range filter {
builder, err := newClusterBuilder(node, push,
defaultClusterName, subsetFilter)
if err != nil {
- klog.Warning(err)
+ log.Warn(err)
continue
}
clusters = append(clusters, builder.build()...)
@@ -51,7 +51,7 @@ func (g *GrpcConfigGenerator) BuildClusters(node
*model.Proxy, push *model.PushC
})
}
if len(resp) == 0 && len(names) == 0 {
- klog.Warningf("did not generate any cds for %s; no names
provided", node.ID)
+ log.Warnf("did not generate any cds for %s; no names provided",
node.ID)
}
return resp
}
diff --git a/sail/pkg/server/instance.go b/sail/pkg/server/instance.go
index 48a70aa3..7d5c1c46 100644
--- a/sail/pkg/server/instance.go
+++ b/sail/pkg/server/instance.go
@@ -18,7 +18,7 @@
package server
import (
- "k8s.io/klog/v2"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"sync"
"time"
)
@@ -68,7 +68,7 @@ func (i *instance) Start(stop <-chan struct{}) error {
}
runtime := time.Since(t0)
if runtime > time.Second {
- klog.Warningf("slow startup task")
+ log.Warnf("slow startup task")
}
default:
startupDone = true
@@ -88,7 +88,7 @@ func (i *instance) Start(stop <-chan struct{}) error {
}
runtime := time.Since(t0)
if runtime > time.Second {
- klog.Warningf("slow post-start task")
+ log.Warnf("slow post-start task")
}
}
@@ -101,7 +101,7 @@ func (i *instance) Start(stop <-chan struct{}) error {
func (i *instance) RunComponent(name string, t Component) {
select {
case <-i.done:
- klog.Warningf("attempting to run a new component %q after the
server was shutdown", name)
+ log.Warnf("attempting to run a new component %q after the
server was shutdown", name)
default:
i.components <- task{name, t}
}
@@ -126,5 +126,5 @@ func (i *instance) Wait() {
}
func logComponentError(name string, err error) {
- klog.Errorf("failure in server component %q: %v", name, err)
+ log.Errorf("failure in server component %q: %v", name, err)
}
diff --git a/sail/pkg/serviceregistry/aggregate/controller.go
b/sail/pkg/serviceregistry/aggregate/controller.go
index 20b4fea8..871b70f4 100644
--- a/sail/pkg/serviceregistry/aggregate/controller.go
+++ b/sail/pkg/serviceregistry/aggregate/controller.go
@@ -18,6 +18,7 @@
package aggregate
import (
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"sync"
"github.com/apache/dubbo-kubernetes/pkg/cluster"
@@ -75,7 +76,7 @@ func (c *Controller) Run(stop <-chan struct{}) {
c.storeLock.Unlock()
<-stop
- klog.Info("Registry Aggregator terminated")
+ log.Info("Registry Aggregator terminated")
}
func (c *Controller) HasSynced() bool {
diff --git a/sail/pkg/serviceregistry/kube/controller/endpointslice.go
b/sail/pkg/serviceregistry/kube/controller/endpointslice.go
index 1f439556..4e574ef0 100644
--- a/sail/pkg/serviceregistry/kube/controller/endpointslice.go
+++ b/sail/pkg/serviceregistry/kube/controller/endpointslice.go
@@ -19,6 +19,7 @@ package controller
import (
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"strings"
"sync"
@@ -38,7 +39,6 @@ import (
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
- "k8s.io/klog/v2"
mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
)
@@ -212,7 +212,7 @@ func (esc *endpointSliceController)
updateEndpointCacheForSlice(hostName host.Na
if e.Conditions.Terminating != nil {
terminating = fmt.Sprintf("%v",
*e.Conditions.Terminating)
}
- klog.Warningf("endpointHealthStatus: address=%s,
Ready=%s, Terminating=%s, HealthStatus=%v, svc=%v",
+ log.Debugf("endpointHealthStatus: address=%s, Ready=%s,
Terminating=%s, HealthStatus=%v, svc=%v",
e.Addresses[0], ready, terminating,
healthStatus, svc != nil)
}
@@ -310,14 +310,14 @@ func (esc *endpointSliceController)
updateEndpointCacheForSlice(hostName host.Na
}
matched = true
-
klog.V(2).InfoS("updateEndpointCacheForSlice: matched ServicePort
(servicePort=%d, targetPort=%d, portName='%s') for EndpointSlice.Port
(portNum=%d, portName='%s')",
+
log.Debugf("updateEndpointCacheForSlice: matched ServicePort (servicePort=%d,
targetPort=%d, portName='%s') for EndpointSlice.Port (portNum=%d,
portName='%s')",
servicePortNum,
targetPortNum, portName, epSlicePortNum, epSlicePortName)
break
}
}
if !matched {
-
klog.V(2).InfoS("updateEndpointCacheForSlice: failed to match
EndpointSlice.Port (portNum=%d, portName='%s') with Service %s, using
EndpointSlice values",
+
log.Debugf("updateEndpointCacheForSlice: failed to match EndpointSlice.Port
(portNum=%d, portName='%s') with Service %s, using EndpointSlice values",
epSlicePortNum,
epSlicePortName, svcNamespacedName.Name)
// Fallback: use EndpointSlice
values
servicePortNum = epSlicePortNum
@@ -333,12 +333,12 @@ func (esc *endpointSliceController)
updateEndpointCacheForSlice(hostName host.Na
if epSlicePortName != "" {
portName = epSlicePortName
}
-
klog.V(2).InfoS("updateEndpointCacheForSlice: Service not found for %s, using
EndpointSlice values (portNum=%d, portName='%s')",
+
log.Debugf("updateEndpointCacheForSlice: Service not found for %s, using
EndpointSlice values (portNum=%d, portName='%s')",
svcNamespacedName.Name,
epSlicePortNum, epSlicePortName)
}
// CRITICAL: Log endpoint creation with actual
values for debugging
- klog.V(2).InfoS("updateEndpointCacheForSlice:
creating endpoint for service %s (address=%s, servicePortNum=%d,
targetPortNum=%d, portName='%s', hostname=%s, kubeSvc=%v)",
+ log.Debugf("updateEndpointCacheForSlice:
creating endpoint for service %s (address=%s, servicePortNum=%d,
targetPortNum=%d, portName='%s', hostname=%s, kubeSvc=%v)",
svcNamespacedName.Name, a,
servicePortNum, targetPortNum, portName, hostName, kubeSvc != nil)
// CRITICAL FIX: According to Istio's
implementation and Kubernetes EndpointSlice spec:
@@ -365,20 +365,20 @@ func (esc *endpointSliceController)
updateEndpointCacheForSlice(hostName host.Na
// CRITICAL: Log if endpoint is unhealthy and
service doesn't support it
if healthStatus == model.UnHealthy &&
!supportsUnhealthy {
if svc != nil {
-
klog.V(2).InfoS("updateEndpointCacheForSlice: endpoint %s is unhealthy
(HealthStatus=%v) but service %s does not support unhealthy endpoints
(PublishNotReadyAddresses=%v). Endpoint will be filtered in EDS.",
+
log.Debugf("updateEndpointCacheForSlice: endpoint %s is unhealthy
(HealthStatus=%v) but service %s does not support unhealthy endpoints
(PublishNotReadyAddresses=%v). Endpoint will be filtered in EDS.",
a, healthStatus,
svcNamespacedName.Name, svc.Attributes.PublishNotReadyAddresses)
} else {
-
klog.V(2).InfoS("updateEndpointCacheForSlice: endpoint %s is unhealthy
(HealthStatus=%v) but service %s is nil. Endpoint will be filtered in EDS.",
+
log.Debugf("updateEndpointCacheForSlice: endpoint %s is unhealthy
(HealthStatus=%v) but service %s is nil. Endpoint will be filtered in EDS.",
a, healthStatus,
svcNamespacedName.Name)
}
}
// CRITICAL: Verify the endpoint was created
with correct ServicePortName
if dubboEndpoint != nil {
-
klog.V(2).InfoS("updateEndpointCacheForSlice: created endpoint with
ServicePortName='%s', EndpointPort=%d, address=%s",
+
log.Debugf("updateEndpointCacheForSlice: created endpoint with
ServicePortName='%s', EndpointPort=%d, address=%s",
dubboEndpoint.ServicePortName,
dubboEndpoint.EndpointPort, dubboEndpoint.FirstAddressOrNil())
} else {
-
klog.Errorf("updateEndpointCacheForSlice: buildDubboEndpoint returned nil for
address=%s, targetPortNum=%d, portName='%s'",
+
log.Debugf("updateEndpointCacheForSlice: buildDubboEndpoint returned nil for
address=%s, targetPortNum=%d, portName='%s'",
a, targetPortNum, portName)
}
if len(overrideAddresses) > 1 {
diff --git a/sail/pkg/serviceregistry/kube/controller/multicluster.go
b/sail/pkg/serviceregistry/kube/controller/multicluster.go
index 5bbf3b61..4659c1be 100644
--- a/sail/pkg/serviceregistry/kube/controller/multicluster.go
+++ b/sail/pkg/serviceregistry/kube/controller/multicluster.go
@@ -19,6 +19,7 @@ package controller
import (
"github.com/apache/dubbo-kubernetes/pkg/kube/multicluster"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"github.com/apache/dubbo-kubernetes/sail/pkg/config/kube/clustertrustbundle"
"github.com/apache/dubbo-kubernetes/sail/pkg/features"
"github.com/apache/dubbo-kubernetes/sail/pkg/keycertbundle"
@@ -125,7 +126,7 @@ func (m *Multicluster) initializeCluster(cluster
*multicluster.Cluster, kubeCont
election.SetEnabled(false)
}
election.AddRunFunction(func(leaderStop
<-chan struct{}) {
- klog.Infof("starting
clustertrustbundle controller for cluster %s", cluster.ID)
+ log.Infof("starting
clustertrustbundle controller for cluster %s", cluster.ID)
c :=
clustertrustbundle.NewController(client, m.caBundleWatcher)
client.RunAndWait(clusterStopCh)
c.Run(leaderStop)
@@ -144,7 +145,7 @@ func (m *Multicluster) initializeCluster(cluster
*multicluster.Cluster, kubeCont
election.SetEnabled(false)
}
election.AddRunFunction(func(leaderStop
<-chan struct{}) {
- klog.Infof("starting namespace
controller for cluster %s", cluster.ID)
+ log.Infof("starting namespace
controller for cluster %s", cluster.ID)
nc :=
NewNamespaceController(client, m.caBundleWatcher)
// Start informers again. This
fixes the case where informers for namespace do not start,
// as we create them only after
acquiring the leader lock
diff --git a/sail/pkg/xds/discovery.go b/sail/pkg/xds/discovery.go
index f2048ab9..02d39324 100644
--- a/sail/pkg/xds/discovery.go
+++ b/sail/pkg/xds/discovery.go
@@ -20,6 +20,7 @@ package xds
import (
"context"
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"strconv"
"sync"
"time"
@@ -36,7 +37,6 @@ import (
"go.uber.org/atomic"
"golang.org/x/time/rate"
"google.golang.org/grpc"
- "k8s.io/klog/v2"
)
type DebounceOptions struct {
@@ -110,7 +110,7 @@ func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {
}
func (s *DiscoveryServer) CachesSynced() {
- klog.Infof("All caches have been synced up in %v, marking server
ready", time.Since(s.DiscoveryStartTime))
+ log.Infof("All caches have been synced up in %v, marking server ready",
time.Since(s.DiscoveryStartTime))
s.serverReady.Store(true)
}
@@ -243,7 +243,7 @@ func (s *DiscoveryServer) dropCacheForRequest(req
*model.PushRequest) {
// If we don't know what updated, cannot safely cache. Clear the whole
cache
if req.Forced {
s.Cache.ClearAll()
- klog.V(2).Infof("dropCacheForRequest: cleared all cache
(Forced=true)")
+ log.Debugf("dropCacheForRequest: cleared all cache
(Forced=true)")
} else {
// Otherwise, just clear the updated configs
// CRITICAL: Log cache clear for debugging
@@ -252,7 +252,7 @@ func (s *DiscoveryServer) dropCacheForRequest(req
*model.PushRequest) {
for ckey := range req.ConfigsUpdated {
configs = append(configs, ckey.String())
}
- klog.V(3).Infof("dropCacheForRequest: clearing cache
for configs: %v", configs)
+ log.Debugf("dropCacheForRequest: clearing cache for
configs: %v", configs)
}
s.Cache.Clear(req.ConfigsUpdated)
}
@@ -327,11 +327,11 @@ func debounce(ch chan *model.PushRequest, stopCh <-chan
struct{}, opts DebounceO
if req != nil {
pushCounter++
if req.ConfigsUpdated == nil {
- klog.Infof("Push debounce stable[%d] %d
for reason %s: %v since last change, %v since last push, full=%v",
+ log.Infof("Push debounce stable[%d] %d
for reason %s: %v since last change, %v since last push, full=%v",
pushCounter, debouncedEvents,
reasonsUpdated(req),
quietTime, eventDelay, req.Full)
} else {
- klog.Infof("Push debounce stable[%d] %d
for config %s: %v since last change, %v since last push, full=%v",
+ log.Infof("Push debounce stable[%d] %d
for config %s: %v since last change, %v since last push, full=%v",
pushCounter, debouncedEvents,
configsUpdated(req),
quietTime, eventDelay, req.Full)
}
@@ -376,18 +376,18 @@ func debounce(ch chan *model.PushRequest, stopCh <-chan
struct{}, opts DebounceO
if wasNewDebounceWindow {
// First event in a new debounce window
if len(r.ConfigsUpdated) > 0 {
- klog.V(2).Infof("Push debounce: new
window started, event[1] for config %s (reason: %s)",
+ log.Debugf("Push debounce: new window
started, event[1] for config %s (reason: %s)",
configsUpdated(r),
reasonsUpdated(r))
} else {
- klog.V(2).Infof("Push debounce: new
window started, event[1] for reason %s", reasonsUpdated(r))
+ log.Debugf("Push debounce: new window
started, event[1] for reason %s", reasonsUpdated(r))
}
} else {
// Event merged into existing debounce window
if len(r.ConfigsUpdated) > 0 {
- klog.V(2).Infof("Push debounce:
event[%d] merged into window for config %s (reason: %s, total events: %d)",
+ log.Debugf("Push debounce: event[%d]
merged into window for config %s (reason: %s, total events: %d)",
debouncedEvents,
configsUpdated(r), reasonsUpdated(r), debouncedEvents)
} else {
- klog.V(2).Infof("Push debounce:
event[%d] merged into window for reason %s (total events: %d)",
+ log.Debugf("Push debounce: event[%d]
merged into window for reason %s (total events: %d)",
debouncedEvents,
reasonsUpdated(r), debouncedEvents)
}
}
@@ -438,7 +438,7 @@ func doSendPushes(stopCh <-chan struct{}, semaphore chan
struct{}, queue *PushQu
return
case <-closed: // grpc stream was closed
doneFunc()
- klog.Infof("Client closed connection
%v", client.ID())
+ log.Infof("Client closed connection
%v", client.ID())
}
}()
}
diff --git a/security/pkg/pki/ca/ca.go b/security/pkg/pki/ca/ca.go
index a9eb7a80..9d2d16bb 100644
--- a/security/pkg/pki/ca/ca.go
+++ b/security/pkg/pki/ca/ca.go
@@ -24,6 +24,7 @@ import (
"encoding/pem"
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/backoff"
+ "github.com/apache/dubbo-kubernetes/pkg/log"
"github.com/apache/dubbo-kubernetes/security/pkg/cmd"
caerror "github.com/apache/dubbo-kubernetes/security/pkg/pki/error"
"github.com/apache/dubbo-kubernetes/security/pkg/pki/util"
@@ -32,11 +33,12 @@ import (
apierror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
- "k8s.io/klog/v2"
"os"
"time"
)
+var pkiCaLog = log.RegisterScope("pkica", "Aegis CA log")
+
const (
dubboCASecretType = "dubbo.io/ca-root"
CACertFile = "ca-cert.pem"
@@ -173,7 +175,7 @@ func NewSelfSignedDubboCAOptions(ctx context.Context,
}
// 3. if use cacerts disabled, create
`dubbo-ca-secret`, otherwise create `cacerts`.
- klog.Infof("CASecret %s not found, will create one",
caCertName)
+ pkiCaLog.Infof("CASecret %s not found, will create
one", caCertName)
options := util.CertOptions{
TTL: caCertTTL,
Org: org,
@@ -184,13 +186,13 @@ func NewSelfSignedDubboCAOptions(ctx context.Context,
}
pemCert, pemKey, ckErr :=
util.GenCertKeyFromOptions(options)
if ckErr != nil {
- klog.Errorf("unable to generate CA cert and key
for self-signed CA (%v)", ckErr)
+ pkiCaLog.Errorf("unable to generate CA cert and
key for self-signed CA (%v)", ckErr)
return fmt.Errorf("unable to generate CA cert
and key for self-signed CA (%v)", ckErr)
}
rootCerts, err := util.AppendRootCerts(pemCert,
rootCertFile)
if err != nil {
- klog.Errorf("failed to append root certificates
(%v)", err)
+ pkiCaLog.Errorf("failed to append root
certificates (%v)", err)
return fmt.Errorf("failed to append root
certificates (%v)", err)
}
if caOpts.KeyCertBundle, err =
util.NewVerifiedKeyCertBundleFromPem(
@@ -200,22 +202,22 @@ func NewSelfSignedDubboCAOptions(ctx context.Context,
rootCerts,
nil,
); err != nil {
- klog.Errorf("failed to create CA KeyCertBundle
(%v)", err)
+ pkiCaLog.Errorf("failed to create CA
KeyCertBundle (%v)", err)
return fmt.Errorf("failed to create CA
KeyCertBundle (%v)", err)
}
// Write the key/cert back to secret, so they will be
persistent when CA restarts.
secret := BuildSecret(caCertName, namespace, nil, nil,
pemCert, pemCert, pemKey, dubboCASecretType)
_, err =
client.Secrets(namespace).Create(context.TODO(), secret, metav1.CreateOptions{})
if err != nil {
- klog.Errorf("Failed to create secret %s (%v)",
caCertName, err)
+ pkiCaLog.Errorf("Failed to create secret %s
(%v)", caCertName, err)
return err
}
- klog.Infof("Using self-generated public key: %v",
string(rootCerts))
+ pkiCaLog.Infof("Using self-generated public key: %v",
string(rootCerts))
return nil
}
return err
})
- klog.Infof("Set secret name for self-signed CA cert rotator to %s",
caCertName)
+ pkiCaLog.Infof("Set secret name for self-signed CA cert rotator to %s",
caCertName)
caOpts.RotatorConfig.secretName = caCertName
return caOpts, err
}
@@ -299,7 +301,7 @@ func (ca *DubboCA) signWithCertChain(csrPEM []byte,
subjectIDs []string, request
func loadSelfSignedCaSecret(client corev1.CoreV1Interface, namespace string,
caCertName string, rootCertFile string, caOpts *DubboCAOptions) error {
caSecret, err := client.Secrets(namespace).Get(context.TODO(),
caCertName, metav1.GetOptions{})
if err == nil {
- klog.Infof("Load signing key and cert from existing secret
%s/%s", caSecret.Namespace, caSecret.Name)
+ pkiCaLog.Infof("Load signing key and cert from existing secret
%s/%s", caSecret.Namespace, caSecret.Name)
rootCerts, err :=
util.AppendRootCerts(caSecret.Data[CACertFile], rootCertFile)
if err != nil {
return fmt.Errorf("failed to append root certificates
(%v)", err)
@@ -313,7 +315,7 @@ func loadSelfSignedCaSecret(client corev1.CoreV1Interface,
namespace string, caC
); err != nil {
return fmt.Errorf("failed to create CA KeyCertBundle
(%v)", err)
}
- klog.Infof("Using existing public key: \n%v", string(rootCerts))
+ pkiCaLog.Infof("Using existing public key: \n%v",
string(rootCerts))
}
return err
}
diff --git a/test/grpc-proxyless/.dockerignore
b/tests/grpc-proxyless/.dockerignore
similarity index 100%
rename from test/grpc-proxyless/.dockerignore
rename to tests/grpc-proxyless/.dockerignore
diff --git a/test/grpc-proxyless/.gitignore b/tests/grpc-proxyless/.gitignore
similarity index 100%
rename from test/grpc-proxyless/.gitignore
rename to tests/grpc-proxyless/.gitignore
diff --git a/test/grpc-proxyless/Dockerfile.consumer
b/tests/grpc-proxyless/Dockerfile.consumer
similarity index 100%
rename from test/grpc-proxyless/Dockerfile.consumer
rename to tests/grpc-proxyless/Dockerfile.consumer
diff --git a/test/grpc-proxyless/Dockerfile.producer
b/tests/grpc-proxyless/Dockerfile.producer
similarity index 100%
rename from test/grpc-proxyless/Dockerfile.producer
rename to tests/grpc-proxyless/Dockerfile.producer
diff --git a/test/grpc-proxyless/README.md b/tests/grpc-proxyless/README.md
similarity index 100%
rename from test/grpc-proxyless/README.md
rename to tests/grpc-proxyless/README.md
diff --git a/test/grpc-proxyless/consumer/main.go
b/tests/grpc-proxyless/consumer/main.go
similarity index 100%
rename from test/grpc-proxyless/consumer/main.go
rename to tests/grpc-proxyless/consumer/main.go
diff --git a/test/grpc-proxyless/generate-proto.sh
b/tests/grpc-proxyless/generate-proto.sh
similarity index 100%
rename from test/grpc-proxyless/generate-proto.sh
rename to tests/grpc-proxyless/generate-proto.sh
diff --git a/test/grpc-proxyless/go.mod b/tests/grpc-proxyless/go.mod
similarity index 100%
rename from test/grpc-proxyless/go.mod
rename to tests/grpc-proxyless/go.mod
diff --git a/test/grpc-proxyless/go.sum b/tests/grpc-proxyless/go.sum
similarity index 100%
rename from test/grpc-proxyless/go.sum
rename to tests/grpc-proxyless/go.sum
diff --git a/test/grpc-proxyless/producer/main.go
b/tests/grpc-proxyless/producer/main.go
similarity index 100%
rename from test/grpc-proxyless/producer/main.go
rename to tests/grpc-proxyless/producer/main.go
diff --git a/test/grpc-proxyless/proto/echo.proto
b/tests/grpc-proxyless/proto/echo.proto
similarity index 100%
rename from test/grpc-proxyless/proto/echo.proto
rename to tests/grpc-proxyless/proto/echo.proto
diff --git a/test/grpc-proxyless/proto/gen.sh
b/tests/grpc-proxyless/proto/gen.sh
similarity index 100%
rename from test/grpc-proxyless/proto/gen.sh
rename to tests/grpc-proxyless/proto/gen.sh
diff --git a/test/grpc-proxyless/test-commands.sh
b/tests/grpc-proxyless/test-commands.sh
similarity index 100%
rename from test/grpc-proxyless/test-commands.sh
rename to tests/grpc-proxyless/test-commands.sh
diff --git a/test/grpc-proxyless/test.sh b/tests/grpc-proxyless/test.sh
similarity index 100%
rename from test/grpc-proxyless/test.sh
rename to tests/grpc-proxyless/test.sh
diff --git a/security/tools/generate_cert/main.go b/tools/generate_cert/main.go
similarity index 100%
rename from security/tools/generate_cert/main.go
rename to tools/generate_cert/main.go
diff --git a/security/tools/generate_csr/main.go b/tools/generate_csr/main.go
similarity index 100%
rename from security/tools/generate_csr/main.go
rename to tools/generate_csr/main.go