This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 38deb59 Merge pull request #16241 from [BEAM-13440] [Playground]
Implement initialization of Cloud Logger
38deb59 is described below
commit 38deb59f3baf2d54f215ffd7a9900918419ec87b
Author: Pavel Avilov <[email protected]>
AuthorDate: Wed Dec 22 01:54:44 2021 +0300
Merge pull request #16241 from [BEAM-13440] [Playground] Implement
initialization of Cloud Logger
* Implement initialization of Cloud Logger
* Edit SetupLogger method;
* Edit comments for methods
* Refactoring code;
Co-authored-by: daria-malkova <[email protected]>
---
playground/backend/cmd/server/server.go | 3 +
.../backend/internal/environment/application.go | 20 +++++-
.../internal/environment/application_test.go | 82 ++++++++++++++++++++++
.../internal/environment/environment_service.go | 7 +-
.../environment/environment_service_test.go | 21 ++++--
.../internal/logger/cloud_logging_handler.go | 1 +
playground/backend/internal/logger/logger.go | 51 +++++++-------
playground/backend/internal/logger/logger_test.go | 4 +-
playground/backend/internal/logger/std_handler.go | 78 ++++++++++++++++++++
9 files changed, 233 insertions(+), 34 deletions(-)
diff --git a/playground/backend/cmd/server/server.go
b/playground/backend/cmd/server/server.go
index 143764b..4b47c78 100644
--- a/playground/backend/cmd/server/server.go
+++ b/playground/backend/cmd/server/server.go
@@ -36,6 +36,9 @@ func runServer() error {
if err != nil {
return err
}
+
+ logger.SetupLogger(ctx, envService.ApplicationEnvs.LaunchSite(),
envService.ApplicationEnvs.GoogleProjectId())
+
grpcServer := grpc.NewServer()
cacheService, err := setupCache(ctx, envService.ApplicationEnvs)
diff --git a/playground/backend/internal/environment/application.go
b/playground/backend/internal/environment/application.go
index 119f812..58665ea 100644
--- a/playground/backend/internal/environment/application.go
+++ b/playground/backend/internal/environment/application.go
@@ -90,14 +90,22 @@ type ApplicationEnvs struct {
// pipelineExecuteTimeout is timeout for code processing
pipelineExecuteTimeout time.Duration
+
+ // launchSite is a launch site of application
+ launchSite string
+
+ // projectId is the Google Сloud project id
+ projectId string
}
// NewApplicationEnvs constructor for ApplicationEnvs
-func NewApplicationEnvs(workingDir string, cacheEnvs *CacheEnvs,
pipelineExecuteTimeout time.Duration) *ApplicationEnvs {
+func NewApplicationEnvs(workingDir, launchSite, projectId string, cacheEnvs
*CacheEnvs, pipelineExecuteTimeout time.Duration) *ApplicationEnvs {
return &ApplicationEnvs{
workingDir: workingDir,
cacheEnvs: cacheEnvs,
pipelineExecuteTimeout: pipelineExecuteTimeout,
+ launchSite: launchSite,
+ projectId: projectId,
}
}
@@ -115,3 +123,13 @@ func (ae *ApplicationEnvs) CacheEnvs() *CacheEnvs {
func (ae *ApplicationEnvs) PipelineExecuteTimeout() time.Duration {
return ae.pipelineExecuteTimeout
}
+
+// LaunchSite returns launch site of application
+func (ae *ApplicationEnvs) LaunchSite() string {
+ return ae.launchSite
+}
+
+// GoogleProjectId returns Google Сloud project id
+func (ae *ApplicationEnvs) GoogleProjectId() string {
+ return ae.projectId
+}
diff --git a/playground/backend/internal/environment/application_test.go
b/playground/backend/internal/environment/application_test.go
index c023fcf..4c6d062 100644
--- a/playground/backend/internal/environment/application_test.go
+++ b/playground/backend/internal/environment/application_test.go
@@ -260,3 +260,85 @@ func TestApplicationEnvs_PipelineExecuteTimeout(t
*testing.T) {
})
}
}
+
+func TestApplicationEnvs_LaunchSite(t *testing.T) {
+ type fields struct {
+ workingDir string
+ cacheEnvs *CacheEnvs
+ pipelineExecuteTimeout time.Duration
+ launchSite string
+ googleProjectId string
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want string
+ }{
+ {
+ // Test case with calling LaunchSite method.
+ // As a result, want to receive an expected launch site.
+ name: "get launch site",
+ fields: fields{
+ workingDir: "",
+ cacheEnvs: &CacheEnvs{},
+ pipelineExecuteTimeout: 0,
+ launchSite: "local",
+ },
+ want: "local",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ae := &ApplicationEnvs{
+ workingDir: tt.fields.workingDir,
+ cacheEnvs: tt.fields.cacheEnvs,
+ pipelineExecuteTimeout:
tt.fields.pipelineExecuteTimeout,
+ launchSite: tt.fields.launchSite,
+ projectId:
tt.fields.googleProjectId,
+ }
+ if got := ae.LaunchSite(); got != tt.want {
+ t.Errorf("LaunchSite() = %v, want %v", got,
tt.want)
+ }
+ })
+ }
+}
+
+func TestApplicationEnvs_GoogleProjectId(t *testing.T) {
+ type fields struct {
+ workingDir string
+ cacheEnvs *CacheEnvs
+ pipelineExecuteTimeout time.Duration
+ googleProjectId string
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want string
+ }{
+ {
+ // Test case with calling GoogleProjectId method.
+ // As a result, want to receive an expected project id.
+ name: "get google project id",
+ fields: fields{
+ workingDir: "",
+ cacheEnvs: &CacheEnvs{},
+ pipelineExecuteTimeout: 0,
+ googleProjectId:
"MOCK_GOOGLE_PROJECT_ID",
+ },
+ want: "MOCK_GOOGLE_PROJECT_ID",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ae := &ApplicationEnvs{
+ workingDir: tt.fields.workingDir,
+ cacheEnvs: tt.fields.cacheEnvs,
+ pipelineExecuteTimeout:
tt.fields.pipelineExecuteTimeout,
+ projectId:
tt.fields.googleProjectId,
+ }
+ if got := ae.GoogleProjectId(); got != tt.want {
+ t.Errorf("GoogleProjectId() = %v, want %v",
got, tt.want)
+ }
+ })
+ }
+}
diff --git a/playground/backend/internal/environment/environment_service.go
b/playground/backend/internal/environment/environment_service.go
index 2861618..2a975f2 100644
--- a/playground/backend/internal/environment/environment_service.go
+++ b/playground/backend/internal/environment/environment_service.go
@@ -41,6 +41,9 @@ const (
cacheKeyExpirationTimeKey = "KEY_EXPIRATION_TIME"
pipelineExecuteTimeoutKey = "PIPELINE_EXPIRATION_TIMEOUT"
protocolTypeKey = "PROTOCOL_TYPE"
+ launchSiteKey = "LAUNCH_SITE"
+ projectIdKey = "GOOGLE_CLOUD_PROJECT"
+ defaultLaunchSite = "local"
defaultProtocol = "HTTP"
defaultIp = "localhost"
defaultPort = 8080
@@ -90,6 +93,8 @@ func GetApplicationEnvsFromOsEnvs() (*ApplicationEnvs, error)
{
cacheExpirationTime := defaultCacheKeyExpirationTime
cacheType := getEnv(cacheTypeKey, defaultCacheType)
cacheAddress := getEnv(cacheAddressKey, defaultCacheAddress)
+ launchSite := getEnv(launchSiteKey, defaultLaunchSite)
+ projectId := os.Getenv(projectIdKey)
if value, present := os.LookupEnv(cacheKeyExpirationTimeKey); present {
if converted, err := time.ParseDuration(value); err == nil {
@@ -107,7 +112,7 @@ func GetApplicationEnvsFromOsEnvs() (*ApplicationEnvs,
error) {
}
if value, present := os.LookupEnv(workingDirKey); present {
- return NewApplicationEnvs(value, NewCacheEnvs(cacheType,
cacheAddress, cacheExpirationTime), pipelineExecuteTimeout), nil
+ return NewApplicationEnvs(value, launchSite, projectId,
NewCacheEnvs(cacheType, cacheAddress, cacheExpirationTime),
pipelineExecuteTimeout), nil
}
return nil, errors.New("APP_WORK_DIR env should be provided with
os.env")
}
diff --git
a/playground/backend/internal/environment/environment_service_test.go
b/playground/backend/internal/environment/environment_service_test.go
index 600e933..cfadce1 100644
--- a/playground/backend/internal/environment/environment_service_test.go
+++ b/playground/backend/internal/environment/environment_service_test.go
@@ -26,8 +26,8 @@ import (
)
const (
- javaConfig = "{\n \"compile_cmd\": \"javac\",\n \"run_cmd\":
\"java\",\n \"test_cmd\": \"java\",\n \"compile_args\": [\n \"-d\",\n
\"bin\",\n \"-classpath\"\n ],\n \"run_args\": [\n \"-cp\",\n
\"bin:\"\n ],\n \"test_args\": [\n \"-cp\",\n \"bin:\",\n
\"org.junit.runner.JUnitCore\"\n ]\n}"
- jarsPath = "/opt/apache/beam/jars/*"
+ javaConfig = "{\n \"compile_cmd\": \"javac\",\n \"run_cmd\":
\"java\",\n \"test_cmd\": \"java\",\n \"compile_args\": [\n \"-d\",\n
\"bin\",\n \"-classpath\"\n ],\n \"run_args\": [\n \"-cp\",\n
\"bin:\"\n ],\n \"test_args\": [\n \"-cp\",\n \"bin:\",\n
\"org.junit.runner.JUnitCore\"\n ]\n}"
+ defaultProjectId = ""
)
var executorConfig *ExecutorConfig
@@ -90,7 +90,7 @@ func TestNewEnvironment(t *testing.T) {
{name: "create env service with default envs", want:
&Environment{
NetworkEnvs: *NewNetworkEnvs(defaultIp,
defaultPort, defaultProtocol),
BeamSdkEnvs: *NewBeamEnvs(defaultSdk,
executorConfig, preparedModDir),
- ApplicationEnvs: *NewApplicationEnvs("/app",
&CacheEnvs{defaultCacheType, defaultCacheAddress,
defaultCacheKeyExpirationTime}, defaultPipelineExecuteTimeout),
+ ApplicationEnvs: *NewApplicationEnvs("/app",
defaultLaunchSite, defaultProjectId, &CacheEnvs{defaultCacheType,
defaultCacheAddress, defaultCacheKeyExpirationTime},
defaultPipelineExecuteTimeout),
}},
}
for _, tt := range tests {
@@ -98,7 +98,7 @@ func TestNewEnvironment(t *testing.T) {
if got := NewEnvironment(
*NewNetworkEnvs(defaultIp, defaultPort,
defaultProtocol),
*NewBeamEnvs(defaultSdk, executorConfig,
preparedModDir),
- *NewApplicationEnvs("/app",
&CacheEnvs{defaultCacheType, defaultCacheAddress,
defaultCacheKeyExpirationTime}, defaultPipelineExecuteTimeout));
!reflect.DeepEqual(got, tt.want) {
+ *NewApplicationEnvs("/app", defaultLaunchSite,
defaultProjectId, &CacheEnvs{defaultCacheType, defaultCacheAddress,
defaultCacheKeyExpirationTime}, defaultPipelineExecuteTimeout));
!reflect.DeepEqual(got, tt.want) {
t.Errorf("NewEnvironment() = %v, want %v", got,
tt.want)
}
})
@@ -205,8 +205,17 @@ func Test_getApplicationEnvsFromOsEnvs(t *testing.T) {
wantErr bool
envsToSet map[string]string
}{
- {name: "working dir is provided", want:
NewApplicationEnvs("/app", &CacheEnvs{defaultCacheType, defaultCacheAddress,
defaultCacheKeyExpirationTime}, defaultPipelineExecuteTimeout), wantErr: false,
envsToSet: map[string]string{workingDirKey: "/app"}},
- {name: "working dir isn't provided", want: nil, wantErr: true},
+ {
+ name: "working dir is provided",
+ want: NewApplicationEnvs("/app",
defaultLaunchSite, defaultProjectId, &CacheEnvs{defaultCacheType,
defaultCacheAddress, defaultCacheKeyExpirationTime},
defaultPipelineExecuteTimeout),
+ wantErr: false,
+ envsToSet: map[string]string{workingDirKey: "/app",
launchSiteKey: defaultLaunchSite, projectIdKey: defaultProjectId},
+ },
+ {
+ name: "working dir isn't provided",
+ want: nil,
+ wantErr: true,
+ },
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
diff --git a/playground/backend/internal/logger/cloud_logging_handler.go
b/playground/backend/internal/logger/cloud_logging_handler.go
index 62afb42..908ab4d 100644
--- a/playground/backend/internal/logger/cloud_logging_handler.go
+++ b/playground/backend/internal/logger/cloud_logging_handler.go
@@ -23,6 +23,7 @@ import (
const logId = "playground-log"
+// CloudLoggingHandler represents 'Cloud Logging' package that logs to Google
Cloud Logging service.
type CloudLoggingHandler struct {
logger *logging.Logger
client *logging.Client
diff --git a/playground/backend/internal/logger/logger.go
b/playground/backend/internal/logger/logger.go
index 9a512ea..0ffc574 100644
--- a/playground/backend/internal/logger/logger.go
+++ b/playground/backend/internal/logger/logger.go
@@ -16,22 +16,43 @@
package logger
import (
- "fmt"
+ "cloud.google.com/go/logging"
+ "context"
"log"
)
type Severity string
const (
- INFO Severity = "[INFO]:"
- WARN Severity = "[WARN]:"
- ERROR Severity = "[ERROR]:"
- FATAL Severity = "[FATAL]:"
- DEBUG Severity = "[DEBUG]:"
+ INFO Severity = "[INFO]:"
+ WARN Severity = "[WARN]:"
+ ERROR Severity = "[ERROR]:"
+ FATAL Severity = "[FATAL]:"
+ DEBUG Severity = "[DEBUG]:"
+ appEngine = "app_engine"
)
var handlers []Handler
+// SetupLogger constructs logger by application environment
+// Add handlers in root logger:
+// CloudLoggingHandler - if server running on App Engine
+// StdHandler - if server running locally
+func SetupLogger(ctx context.Context, launchSite, googleProjectId string) {
+ switch launchSite {
+ case appEngine:
+ client, err := logging.NewClient(ctx, googleProjectId)
+ if err != nil {
+ log.Fatalf("Failed to create client: %v", err)
+ }
+ cloudLogger := NewCloudLoggingHandler(client)
+ AddHandler(cloudLogger)
+ default:
+ stdLogger := NewStdHandler()
+ AddHandler(stdLogger)
+ }
+}
+
// SetHandlers set a new array of logger handlers
func SetHandlers(h []Handler) {
handlers = h
@@ -46,76 +67,58 @@ func Info(args ...interface{}) {
for _, handler := range handlers {
handler.Info(args...)
}
- logMessage(INFO, args...)
}
func Infof(format string, args ...interface{}) {
for _, handler := range handlers {
handler.Infof(format, args...)
}
- logMessage(INFO, fmt.Sprintf(format, args...))
}
func Warn(args ...interface{}) {
for _, handler := range handlers {
handler.Warn(args...)
}
- logMessage(WARN, args...)
}
func Warnf(format string, args ...interface{}) {
for _, handler := range handlers {
handler.Warnf(format, args...)
}
- logMessage(WARN, fmt.Sprintf(format, args...))
}
func Error(args ...interface{}) {
for _, handler := range handlers {
handler.Error(args...)
}
- logMessage(ERROR, args...)
}
func Errorf(format string, args ...interface{}) {
for _, handler := range handlers {
handler.Errorf(format, args...)
}
- logMessage(ERROR, fmt.Sprintf(format, args...))
}
func Debug(args ...interface{}) {
for _, handler := range handlers {
handler.Debug(args...)
}
- logMessage(DEBUG, args...)
}
func Debugf(format string, args ...interface{}) {
for _, handler := range handlers {
handler.Debugf(format, args...)
}
- logMessage(DEBUG, fmt.Sprintf(format, args...))
}
func Fatal(args ...interface{}) {
for _, handler := range handlers {
handler.Fatal(args...)
}
- args = append([]interface{}{FATAL}, args...)
- log.Fatalln(args...)
}
func Fatalf(format string, args ...interface{}) {
for _, handler := range handlers {
handler.Fatalf(format, args...)
}
- args = append([]interface{}{FATAL}, fmt.Sprintf(format, args...))
- log.Fatalln(args...)
-}
-
-// logMessage logs a message at level severity.
-func logMessage(severity Severity, args ...interface{}) {
- args = append([]interface{}{severity}, args...)
- log.Println(args...)
}
diff --git a/playground/backend/internal/logger/logger_test.go
b/playground/backend/internal/logger/logger_test.go
index 82cc69b..2bc4278 100644
--- a/playground/backend/internal/logger/logger_test.go
+++ b/playground/backend/internal/logger/logger_test.go
@@ -29,8 +29,8 @@ import (
var preparedHandler testHandler
const (
- codeFatal = "package main\n\nimport
(\n\t\"beam.apache.org/playground/backend/internal/logger\"\n)\n\nfunc main()
{\n\tlogger.Fatal(\"%v\")\n}"
- codeFatalf = "package main\n\nimport
(\n\t\"beam.apache.org/playground/backend/internal/logger\"\n)\n\nfunc main()
{\n\tlogger.Fatalf(\"%v\",\"%s\")\n}"
+ codeFatal = "package main\n\nimport
(\n\t\"beam.apache.org/playground/backend/internal/logger\"\n)\n\nfunc main()
{\n\tlogger.AddHandler(logger.StdHandler{})\n\tlogger.Fatal(\"%v\")\n}"
+ codeFatalf = "package main\n\nimport
(\n\t\"beam.apache.org/playground/backend/internal/logger\"\n)\n\nfunc main()
{\n\tlogger.AddHandler(logger.StdHandler{})\n\tlogger.Fatalf(\"%v\",\"%s\")\n}"
testLoggerDir = "testLogger"
testFatalDir = "testFatal"
testFatalfDir = "testFatalf"
diff --git a/playground/backend/internal/logger/std_handler.go
b/playground/backend/internal/logger/std_handler.go
new file mode 100644
index 0000000..1534c8f
--- /dev/null
+++ b/playground/backend/internal/logger/std_handler.go
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logger
+
+import (
+ "fmt"
+ "log"
+)
+
+// StdHandler represents standard 'log' package that logs to stderr
+type StdHandler struct {
+}
+
+// NewStdHandler creates StdHandler
+func NewStdHandler() *StdHandler {
+ return &StdHandler{}
+}
+
+func (h StdHandler) Info(args ...interface{}) {
+ h.logMessage(INFO, args...)
+}
+
+func (h StdHandler) Infof(format string, args ...interface{}) {
+ h.logMessage(INFO, fmt.Sprintf(format, args...))
+}
+
+func (h StdHandler) Warn(args ...interface{}) {
+ h.logMessage(WARN, args...)
+}
+
+func (h StdHandler) Warnf(format string, args ...interface{}) {
+ h.logMessage(WARN, fmt.Sprintf(format, args...))
+}
+
+func (h StdHandler) Error(args ...interface{}) {
+ h.logMessage(ERROR, args...)
+}
+
+func (h StdHandler) Errorf(format string, args ...interface{}) {
+ h.logMessage(ERROR, fmt.Sprintf(format, args...))
+}
+
+func (h StdHandler) Debug(args ...interface{}) {
+ h.logMessage(DEBUG, args...)
+}
+
+func (h StdHandler) Debugf(format string, args ...interface{}) {
+ h.logMessage(DEBUG, fmt.Sprintf(format, args...))
+}
+
+func (h StdHandler) Fatal(args ...interface{}) {
+ args = append([]interface{}{FATAL}, args...)
+ log.Fatalln(args...)
+}
+
+func (h StdHandler) Fatalf(format string, args ...interface{}) {
+ args = append([]interface{}{FATAL}, fmt.Sprintf(format, args...))
+ log.Fatalln(args...)
+}
+
+// logMessage logs a message at level severity.
+func (h StdHandler) logMessage(severity Severity, args ...interface{}) {
+ args = append([]interface{}{severity}, args...)
+ log.Println(args...)
+}