acelyc111 commented on code in PR #1466:
URL:
https://github.com/apache/incubator-pegasus/pull/1466#discussion_r1195922372
##########
collector/avail/detector.go:
##########
@@ -19,106 +19,134 @@ package avail
import (
"context"
- "sync/atomic"
+ "math/rand"
"time"
+ "github.com/apache/incubator-pegasus/go-client/admin"
"github.com/apache/incubator-pegasus/go-client/pegasus"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+
log "github.com/sirupsen/logrus"
+ "github.com/spf13/viper"
+ "gopkg.in/tomb.v2"
)
// Detector periodically checks the service availability of the Pegasus
cluster.
type Detector interface {
-
- // Start detection until the ctx cancelled. This method will block the
current thread.
- Start(ctx context.Context) error
+ Start(tom *tomb.Tomb) error
}
// NewDetector returns a service-availability detector.
-func NewDetector(client pegasus.Client) Detector {
- return &pegasusDetector{client: client}
+func NewDetector(detectInterval time.Duration,
+ detectTimeout time.Duration, partitionCount int) Detector {
+ metaServers := viper.GetStringSlice("meta_servers")
+ tableName :=
viper.GetStringMapString("availablity_detect")["table_name"]
+ // Create detect table.
+ adminClient := admin.NewClient(admin.Config{MetaServers: metaServers})
+ error := adminClient.CreateTable(context.Background(), tableName,
partitionCount)
+ if error != nil {
+ log.Errorf("Create detect table %s failed, error: %s",
tableName, error)
+ }
+ pegasusClient := pegasus.NewClient(pegasus.Config{MetaServers:
metaServers})
+ return &pegasusDetector{
+ client: pegasusClient,
+ detectTableName: tableName,
+ detectInterval: detectInterval,
+ detectTimeout: detectTimeout,
+ partitionCount: partitionCount,
+ }
}
-type pegasusDetector struct {
- // client reads and writes periodically to a specified table.
- client pegasus.Client
- detectTable pegasus.TableConnector
+var (
+ DetectTimes = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "detect_times",
+ Help: "The times of availability detecting",
+ })
+
+ ReadFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "read_failure_detect_times",
+ Help: "The failure times of read detecting",
+ })
+
+ WriteFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "write_failure_detect_times",
+ Help: "The failure times of write detecting",
+ })
+
+ ReadLatency = promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "read_latency",
+ Help: "The latency of read data",
+ })
+
+ WriteLatency = promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "write_latency",
+ Help: "The latency of write data",
+ })
+)
- detectInterval time.Duration
+type pegasusDetector struct {
+ client pegasus.Client
+ detectTable pegasus.TableConnector
detectTableName string
-
- // timeout of a single detect
+ detectInterval time.Duration
+ // timeout of a single detect.
detectTimeout time.Duration
-
- detectHashKeys [][]byte
-
- recentMinuteDetectTimes uint64
- recentMinuteFailureTimes uint64
-
- recentHourDetectTimes uint64
- recentHourFailureTimes uint64
-
- recentDayDetectTimes uint64
- recentDayFailureTimes uint64
+ // partition count.
+ partitionCount int
}
-func (d *pegasusDetector) Start(rootCtx context.Context) error {
+func (d *pegasusDetector) Start(tom *tomb.Tomb) error {
var err error
- ctx, cancel := context.WithTimeout(rootCtx, 10*time.Second)
- defer cancel()
- d.detectTable, err = d.client.OpenTable(ctx, d.detectTableName)
+ // Open the detect table.
+ d.detectTable, err = d.client.OpenTable(context.Background(),
d.detectTableName)
if err != nil {
+ log.Errorf("Open detect table %s failed, error: %s",
d.detectTable, err)
return err
}
-
ticker := time.NewTicker(d.detectInterval)
for {
select {
- case <-rootCtx.Done(): // check if context cancelled
+ case <-tom.Dying():
return nil
case <-ticker.C:
- return nil
- default:
+ d.detectPartition()
}
Review Comment:
Why delete the `default` case?
##########
collector/avail/detector.go:
##########
@@ -19,106 +19,134 @@ package avail
import (
"context"
- "sync/atomic"
+ "math/rand"
"time"
+ "github.com/apache/incubator-pegasus/go-client/admin"
"github.com/apache/incubator-pegasus/go-client/pegasus"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+
log "github.com/sirupsen/logrus"
+ "github.com/spf13/viper"
+ "gopkg.in/tomb.v2"
)
// Detector periodically checks the service availability of the Pegasus
cluster.
type Detector interface {
-
- // Start detection until the ctx cancelled. This method will block the
current thread.
- Start(ctx context.Context) error
+ Start(tom *tomb.Tomb) error
}
// NewDetector returns a service-availability detector.
-func NewDetector(client pegasus.Client) Detector {
- return &pegasusDetector{client: client}
+func NewDetector(detectInterval time.Duration,
+ detectTimeout time.Duration, partitionCount int) Detector {
+ metaServers := viper.GetStringSlice("meta_servers")
+ tableName :=
viper.GetStringMapString("availablity_detect")["table_name"]
+ // Create detect table.
+ adminClient := admin.NewClient(admin.Config{MetaServers: metaServers})
+ error := adminClient.CreateTable(context.Background(), tableName,
partitionCount)
+ if error != nil {
+ log.Errorf("Create detect table %s failed, error: %s",
tableName, error)
+ }
+ pegasusClient := pegasus.NewClient(pegasus.Config{MetaServers:
metaServers})
+ return &pegasusDetector{
+ client: pegasusClient,
+ detectTableName: tableName,
+ detectInterval: detectInterval,
+ detectTimeout: detectTimeout,
+ partitionCount: partitionCount,
+ }
}
-type pegasusDetector struct {
- // client reads and writes periodically to a specified table.
- client pegasus.Client
- detectTable pegasus.TableConnector
+var (
+ DetectTimes = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "detect_times",
+ Help: "The times of availability detecting",
+ })
+
+ ReadFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "read_failure_detect_times",
+ Help: "The failure times of read detecting",
+ })
+
+ WriteFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "write_failure_detect_times",
+ Help: "The failure times of write detecting",
+ })
+
+ ReadLatency = promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "read_latency",
+ Help: "The latency of read data",
+ })
+
+ WriteLatency = promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "write_latency",
+ Help: "The latency of write data",
+ })
+)
- detectInterval time.Duration
+type pegasusDetector struct {
+ client pegasus.Client
+ detectTable pegasus.TableConnector
detectTableName string
-
- // timeout of a single detect
+ detectInterval time.Duration
+ // timeout of a single detect.
detectTimeout time.Duration
Review Comment:
```suggestion
detectTimeout time.Duration
```
##########
collector/avail/detector.go:
##########
@@ -19,106 +19,134 @@ package avail
import (
"context"
- "sync/atomic"
+ "math/rand"
"time"
+ "github.com/apache/incubator-pegasus/go-client/admin"
"github.com/apache/incubator-pegasus/go-client/pegasus"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+
log "github.com/sirupsen/logrus"
+ "github.com/spf13/viper"
+ "gopkg.in/tomb.v2"
)
// Detector periodically checks the service availability of the Pegasus
cluster.
type Detector interface {
-
- // Start detection until the ctx cancelled. This method will block the
current thread.
- Start(ctx context.Context) error
+ Start(tom *tomb.Tomb) error
}
// NewDetector returns a service-availability detector.
-func NewDetector(client pegasus.Client) Detector {
- return &pegasusDetector{client: client}
+func NewDetector(detectInterval time.Duration,
+ detectTimeout time.Duration, partitionCount int) Detector {
+ metaServers := viper.GetStringSlice("meta_servers")
+ tableName :=
viper.GetStringMapString("availablity_detect")["table_name"]
+ // Create detect table.
+ adminClient := admin.NewClient(admin.Config{MetaServers: metaServers})
+ error := adminClient.CreateTable(context.Background(), tableName,
partitionCount)
+ if error != nil {
+ log.Errorf("Create detect table %s failed, error: %s",
tableName, error)
+ }
+ pegasusClient := pegasus.NewClient(pegasus.Config{MetaServers:
metaServers})
+ return &pegasusDetector{
+ client: pegasusClient,
+ detectTableName: tableName,
+ detectInterval: detectInterval,
+ detectTimeout: detectTimeout,
+ partitionCount: partitionCount,
+ }
}
-type pegasusDetector struct {
- // client reads and writes periodically to a specified table.
- client pegasus.Client
- detectTable pegasus.TableConnector
+var (
+ DetectTimes = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "detect_times",
+ Help: "The times of availability detecting",
+ })
+
+ ReadFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "read_failure_detect_times",
+ Help: "The failure times of read detecting",
+ })
+
+ WriteFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "write_failure_detect_times",
+ Help: "The failure times of write detecting",
+ })
+
+ ReadLatency = promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "read_latency",
+ Help: "The latency of read data",
+ })
+
+ WriteLatency = promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "write_latency",
+ Help: "The latency of write data",
Review Comment:
```suggestion
Help: "The latency of write data in milliseconds",
```
##########
collector/avail/detector.go:
##########
@@ -19,106 +19,134 @@ package avail
import (
"context"
- "sync/atomic"
+ "math/rand"
"time"
+ "github.com/apache/incubator-pegasus/go-client/admin"
"github.com/apache/incubator-pegasus/go-client/pegasus"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+
log "github.com/sirupsen/logrus"
+ "github.com/spf13/viper"
+ "gopkg.in/tomb.v2"
)
// Detector periodically checks the service availability of the Pegasus
cluster.
type Detector interface {
-
- // Start detection until the ctx cancelled. This method will block the
current thread.
- Start(ctx context.Context) error
+ Start(tom *tomb.Tomb) error
}
// NewDetector returns a service-availability detector.
-func NewDetector(client pegasus.Client) Detector {
- return &pegasusDetector{client: client}
+func NewDetector(detectInterval time.Duration,
+ detectTimeout time.Duration, partitionCount int) Detector {
+ metaServers := viper.GetStringSlice("meta_servers")
+ tableName :=
viper.GetStringMapString("availablity_detect")["table_name"]
+ // Create detect table.
+ adminClient := admin.NewClient(admin.Config{MetaServers: metaServers})
+ error := adminClient.CreateTable(context.Background(), tableName,
partitionCount)
+ if error != nil {
+ log.Errorf("Create detect table %s failed, error: %s",
tableName, error)
+ }
+ pegasusClient := pegasus.NewClient(pegasus.Config{MetaServers:
metaServers})
+ return &pegasusDetector{
+ client: pegasusClient,
+ detectTableName: tableName,
+ detectInterval: detectInterval,
+ detectTimeout: detectTimeout,
+ partitionCount: partitionCount,
+ }
}
-type pegasusDetector struct {
- // client reads and writes periodically to a specified table.
- client pegasus.Client
- detectTable pegasus.TableConnector
+var (
+ DetectTimes = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "detect_times",
+ Help: "The times of availability detecting",
+ })
+
+ ReadFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "read_failure_detect_times",
+ Help: "The failure times of read detecting",
+ })
+
+ WriteFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "write_failure_detect_times",
+ Help: "The failure times of write detecting",
+ })
+
+ ReadLatency = promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "read_latency",
+ Help: "The latency of read data",
+ })
+
+ WriteLatency = promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "write_latency",
+ Help: "The latency of write data",
+ })
+)
- detectInterval time.Duration
+type pegasusDetector struct {
+ client pegasus.Client
+ detectTable pegasus.TableConnector
detectTableName string
-
- // timeout of a single detect
+ detectInterval time.Duration
+ // timeout of a single detect.
detectTimeout time.Duration
-
- detectHashKeys [][]byte
-
- recentMinuteDetectTimes uint64
- recentMinuteFailureTimes uint64
-
- recentHourDetectTimes uint64
- recentHourFailureTimes uint64
-
- recentDayDetectTimes uint64
- recentDayFailureTimes uint64
+ // partition count.
+ partitionCount int
Review Comment:
```suggestion
partitionCount int
```
##########
collector/avail/detector.go:
##########
@@ -19,106 +19,134 @@ package avail
import (
"context"
- "sync/atomic"
+ "math/rand"
"time"
+ "github.com/apache/incubator-pegasus/go-client/admin"
"github.com/apache/incubator-pegasus/go-client/pegasus"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+
log "github.com/sirupsen/logrus"
+ "github.com/spf13/viper"
+ "gopkg.in/tomb.v2"
)
// Detector periodically checks the service availability of the Pegasus
cluster.
type Detector interface {
-
- // Start detection until the ctx cancelled. This method will block the
current thread.
- Start(ctx context.Context) error
+ Start(tom *tomb.Tomb) error
}
// NewDetector returns a service-availability detector.
-func NewDetector(client pegasus.Client) Detector {
- return &pegasusDetector{client: client}
+func NewDetector(detectInterval time.Duration,
+ detectTimeout time.Duration, partitionCount int) Detector {
+ metaServers := viper.GetStringSlice("meta_servers")
+ tableName :=
viper.GetStringMapString("availablity_detect")["table_name"]
+ // Create detect table.
+ adminClient := admin.NewClient(admin.Config{MetaServers: metaServers})
+ error := adminClient.CreateTable(context.Background(), tableName,
partitionCount)
+ if error != nil {
+ log.Errorf("Create detect table %s failed, error: %s",
tableName, error)
+ }
+ pegasusClient := pegasus.NewClient(pegasus.Config{MetaServers:
metaServers})
+ return &pegasusDetector{
+ client: pegasusClient,
+ detectTableName: tableName,
+ detectInterval: detectInterval,
+ detectTimeout: detectTimeout,
+ partitionCount: partitionCount,
+ }
}
-type pegasusDetector struct {
- // client reads and writes periodically to a specified table.
- client pegasus.Client
- detectTable pegasus.TableConnector
+var (
+ DetectTimes = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "detect_times",
+ Help: "The times of availability detecting",
+ })
+
+ ReadFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "read_failure_detect_times",
+ Help: "The failure times of read detecting",
+ })
+
+ WriteFailureTimes = promauto.NewCounter(prometheus.CounterOpts{
+ Name: "write_failure_detect_times",
+ Help: "The failure times of write detecting",
+ })
+
+ ReadLatency = promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "read_latency",
+ Help: "The latency of read data",
Review Comment:
```suggestion
Help: "The latency of read data in milliseconds",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]