kamilwu commented on a change in pull request #13362:
URL: https://github.com/apache/beam/pull/13362#discussion_r527568576



##########
File path: sdks/go/test/load/util.go
##########
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package load
+
+import (
+       "context"
+       "encoding/json"
+       "flag"
+       "fmt"
+       "io/ioutil"
+       "log"
+       "net/http"
+       "os"
+       "reflect"
+       "strings"
+       "time"
+
+       "github.com/apache/beam/sdks/go/pkg/beam"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
+)
+
+const (
+       runtimeMetricNamespace = "RuntimeMonitor"
+       runtimeMetricName      = "runtime"
+)
+
+var (
+       influxMeasurement = flag.String(
+               "influx_measurement",
+               "",
+               `An InfluxDB measurement where metrics should be published to.
+               If empty, no metrics will be send to InfluxDB.`)
+       influxDatabase = flag.String(
+               "influx_db_name",
+               "",
+               "InfluxDB database name. If empty, no metrics will be send to 
InfluxDB.")
+       influxHost = flag.String(
+               "influx_hostname",
+               "http://localhost:8086";,
+               "Hostname and port to connect to InfluxDB. Defaults to 
http://localhost:8086.";)
+       influxNamespace = flag.String(
+               "influx_namespace",
+               "",
+               `A namespace to be used when constructing InfluxDB's data 
points.
+               Used to make some points different from others within the same 
measurement.`)
+       runtime = beam.NewDistribution(runtimeMetricNamespace, 
runtimeMetricName)
+)
+
+func init() {
+       beam.RegisterType(reflect.TypeOf((*RuntimeMonitor)(nil)).Elem())
+}
+
+// RuntimeMonitor is a DoFn to record processing time in the pipeline.
+//
+// It uses a distribution metric which is updated every time a new bundle
+// starts or finishes. The processing time can be extracted by calculating
+// the difference of the maximum and the minimum value of the distribution
+// metric.
+type RuntimeMonitor struct{}
+
+// StartBundle updates a distribution metric.
+func (fn *RuntimeMonitor) StartBundle(ctx context.Context, emit func([]byte, 
[]byte)) {
+       runtime.Update(ctx, time.Now().UnixNano())
+}
+
+// FinishBundle updates a distribution metric.
+func (fn *RuntimeMonitor) FinishBundle(ctx context.Context, emit func([]byte, 
[]byte)) {
+       runtime.Update(ctx, time.Now().UnixNano())
+}
+
+// ProcessElement emits unmodified input elements.
+func (fn *RuntimeMonitor) ProcessElement(key, value []byte, emit func([]byte, 
[]byte)) {
+       emit(key, value)
+}
+
+type influxDBOptions struct {
+       measurement string
+       dbName      string
+       hostname    string
+       user        string
+       password    string
+}
+
+func newInfluxDBOptions() *influxDBOptions {
+       return &influxDBOptions{
+               measurement: *influxMeasurement,
+               dbName:      *influxDatabase,
+               hostname:    *influxHost,
+               user:        os.Getenv("INFLUXDB_USER"),
+               password:    os.Getenv("INFLUXDB_USER_PASSWORD")}
+}
+
+func (options influxDBOptions) validate() bool {
+       return options.measurement != "" && options.dbName != ""
+}
+
+func (options influxDBOptions) httpAuthEnabled() bool {
+       return options.user != "" && options.password != ""
+}
+
+// loadTestResult represents a single data record that has: a timestamp,
+// a type of a metric, and a value.
+type loadTestResult struct {
+       timestamp int64
+       metric    string
+       value     float64
+}
+
+func newLoadTestResult(value float64) loadTestResult {
+       metric := ""
+       if *influxNamespace == "" {
+               metric = runtimeMetricName
+       } else {
+               metric = fmt.Sprintf("%v_%v", *influxNamespace, 
runtimeMetricName)
+       }
+       return loadTestResult{timestamp: time.Now().Unix(), metric: metric, 
value: value}

Review comment:
       Yes, this is correct. The test is going to run every 24 hours, so second 
precision is totally fine. It works the same in Python and Java tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to