lostluck commented on a change in pull request #15657:
URL: https://github.com/apache/beam/pull/15657#discussion_r726568585



##########
File path: sdks/go/pkg/beam/core/runtime/exec/pardo.go
##########
@@ -121,6 +129,12 @@ func (n *ParDo) ProcessElement(_ context.Context, elm 
*FullValue, values ...ReSt
                return errors.Errorf("invalid status for pardo %v: %v, want 
Active", n.UID, n.status)
        }
 
+       // set current state for execution time metrics
+       // metrics.GetExecutionStore(n.ctx).SetState(metrics.ProcessBundle)
+       // metrics.GetExecutionStore(n.ctx).IncTransitions()
+       metrics.SetState(n.ctx, metrics.ProcessBundle)
+       metrics.IncTransition(n.ctx)

Review comment:
       Why can't we increment transitions in the SetState call? I would think 
we need to do so on every call SetState?

##########
File path: sdks/go/pkg/beam/core/runtime/harness/monitoring.go
##########
@@ -102,6 +103,19 @@ func shortIdsToInfos(shortids []string) 
map[string]*pipepb.MonitoringInfo {
        return defaultShortIDCache.shortIdsToInfos(shortids)
 }
 
+func getUrn(i int) metricsx.Urn {
+       switch i {
+       case 0:
+               return metricsx.UrnStartBundle
+       case 1:
+               return metricsx.UrnProcessBundle
+       case 2:
+               return metricsx.UrnFinishBundle
+       default:

Review comment:
       We seem to be missing the "metricx.UrnTransformTotalTime" Urn as well.

##########
File path: sdks/go/pkg/beam/core/runtime/harness/monitoring.go
##########
@@ -102,6 +103,19 @@ func shortIdsToInfos(shortids []string) 
map[string]*pipepb.MonitoringInfo {
        return defaultShortIDCache.shortIdsToInfos(shortids)
 }
 
+func getUrn(i int) metricsx.Urn {

Review comment:
       Please move this function to the metricsx package. Call it 
`ExecutionMsecUrn`.
   
   "getUrn" is too general for what it ends up doing.

##########
File path: sdks/go/pkg/beam/core/metrics/metrics.go
##########
@@ -112,6 +114,8 @@ func (ctx *beamCtx) Value(key interface{}) interface{} {
                        }
                }
                return ctx.store
+       case exStoreKey:
+               return ctx.exStore

Review comment:
       This must return a pointer to the atomic.Value (`&ctx.exStore`) 
otherwise it will end up being copied, which is not allowed.

##########
File path: sdks/go/pkg/beam/core/metrics/sampler.go
##########
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metrics
+
+import (
+       "context"
+       "sync/atomic"
+       "time"
+)
+
+// StateSampler tracks the state of a bundle.
+type StateSampler struct {
+       done  chan (int) // signal to stop sampling
+       e     *ExecutionTracker
+       store *Store
+       pid   string // PTransform ID

Review comment:
       You've called this PTransformID but you've passed in the BundleID. The 
PTransformIDs aren't available until exec.ParDo.
   
   In the Beam Model, a sequence of PTransforms + a set of Data are executed in 
Bundles. Bundles are the unit of re-trying. But the goal with these metrics is 
to find where time is being spent in PTransforms, divided up into Start, 
Process, and Finish segments, and also total time spent with a given PTransform 
which would be the sum of the Start, Process, Finish portions.

##########
File path: sdks/go/pkg/beam/core/metrics/metrics.go
##########
@@ -79,6 +80,7 @@ type beamCtx struct {
        bundleID, ptransformID string
        store                  *Store
        cs                     *ptCounterSet
+       exStore                atomic.Value

Review comment:
       Please add a comment about what type should be stored in this field.

##########
File path: sdks/go/pkg/beam/core/runtime/harness/harness.go
##########
@@ -316,10 +316,17 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
                if err != nil {
                        return fail(ctx, instID, "Failed: %v", err)
                }
+               // Start the sampler goroutine here
+               sampler := newSampler(ctx, store, string(instID))
+               sampler.start()
 
                data := NewScopedDataManager(c.data, instID)
                state := NewScopedStateReaderWithCache(c.state, instID, c.cache)
                err = plan.Execute(ctx, string(instID), exec.DataContext{Data: 
data, State: state})
+
+               // Plan execution complete, stop sampling for metrics for this 
bundle.
+               sampler.stop()

Review comment:
       Small nit: Move this call to after the data and state `Close` calls, or 
move the sampler to start between where state is created and before the plan 
Executed.

##########
File path: sdks/go/pkg/beam/core/metrics/sampler.go
##########
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metrics
+
+import (
+       "context"
+       "sync/atomic"
+       "time"
+)
+
+// StateSampler tracks the state of a bundle.
+type StateSampler struct {
+       done  chan (int) // signal to stop sampling
+       e     *ExecutionTracker
+       store *Store
+       pid   string // PTransform ID
+}
+
+// NewSampler creates a new state sampler.
+func NewSampler(ctx context.Context, store *Store, pid string) StateSampler {
+       return StateSampler{done: make(chan int), e: getExecutionStore(ctx), 
store: store, pid: pid}
+}
+
+func (s *StateSampler) Start() {
+       s.startSampler()
+}
+
+func (s *StateSampler) startSampler() {
+       for {
+               select {
+               case <-s.done:
+                       return
+               default:
+                       if atomic.LoadInt64(&s.e.NumberOfTransitions) != 
atomic.LoadInt64(&s.e.TransitionsAtLastSample) {
+                               s.sample()
+                       }
+
+                       // TODO: constant for sampling period
+                       atomic.AddInt64(&s.e.MillisSinceLastTransition, 200)
+                       atomic.AddInt64(&s.e.State.TotalTimeMillis, 200)
+                       time.Sleep(200 * time.Millisecond)
+               }
+       }
+}
+
+func (s *StateSampler) Stop() {
+       s.stopSampler()
+}
+
+func (s *StateSampler) stopSampler() {
+       s.done <- 1
+       // collect the remaining metrics (finish bundle metrics)
+       s.sample()
+}
+
+func (s *StateSampler) sample() {
+       s.store.mu.Lock()
+       defer s.store.mu.Unlock()
+       s.e.TransitionsAtLastSample = s.e.NumberOfTransitions
+       s.e.MillisSinceLastTransition = 0
+       s.store.AddState(s.e.State, s.pid)

Review comment:
       The AddState call here, should probably be passing in the 
MillisSinceLastTransition before it gets zeroed, so it can be added to the 
current State.

##########
File path: sdks/go/pkg/beam/core/metrics/sampler.go
##########
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metrics
+
+import (
+       "context"
+       "sync/atomic"
+       "time"
+)
+
+// StateSampler tracks the state of a bundle.
+type StateSampler struct {
+       done  chan (int) // signal to stop sampling
+       e     *ExecutionTracker
+       store *Store
+       pid   string // PTransform ID
+}
+
+// NewSampler creates a new state sampler.
+func NewSampler(ctx context.Context, store *Store, pid string) StateSampler {
+       return StateSampler{done: make(chan int), e: getExecutionStore(ctx), 
store: store, pid: pid}
+}
+
+func (s *StateSampler) Start() {
+       s.startSampler()
+}
+
+func (s *StateSampler) startSampler() {
+       for {
+               select {
+               case <-s.done:
+                       return
+               default:
+                       if atomic.LoadInt64(&s.e.NumberOfTransitions) != 
atomic.LoadInt64(&s.e.TransitionsAtLastSample) {
+                               s.sample()
+                       }
+
+                       // TODO: constant for sampling period
+                       atomic.AddInt64(&s.e.MillisSinceLastTransition, 200)
+                       atomic.AddInt64(&s.e.State.TotalTimeMillis, 200)
+                       time.Sleep(200 * time.Millisecond)

Review comment:
       Move the contents of this loop to it's own method.
   
   Move the loop and stop management to the harness package sampler.
   
   This separation will make things easier to test, and avoid having a 
constant, and allow that to be configured.

##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -145,16 +150,73 @@ type ptCounterSet struct {
        gauges        map[nameHash]*gauge
 }
 
+// Bundle processing state (START_BUNDLE, PROCESS_BUNDLE, FINISH_BUNDLE)
+type BundleProcState string
+
+const (
+       StartBundle   BundleProcState = "START_BUNDLE"
+       ProcessBundle BundleProcState = "PROCESS_BUNDLE"
+       FinishBundle  BundleProcState = "FINISH_BUNDLE"
+)
+
+// ExecutionState stores the information about a bundle in a particular state.
+type ExecutionState struct {
+       State           BundleProcState
+       IsProcessing    bool // set to true when sent as a response to 
ProcessBundleProgress Request
+       TotalTimeMillis int64
+}
+
+// ExecutionStateTracker stores information about a bundle for execution time 
metrics.
+type ExecutionStateTracker struct {
+       CurrentState              BundleProcState
+       State                     ExecutionState
+       NumberOfTransitions       int
+       MillisSinceLastTransition int
+       TransitionsAtLastSample   int
+       LastSampleTimeMillis      int
+}
+
 // Store retains per transform countersets, intended for per bundle use.
 type Store struct {
        mu  sync.RWMutex
        css []*ptCounterSet
 
        store map[Labels]userMetric
+
+       executionStore ExecutionStateTracker
+       stateRegistry  map[Labels][]ExecutionState
 }
 
 func newStore() *Store {
-       return &Store{store: make(map[Labels]userMetric)}
+       return &Store{store: make(map[Labels]userMetric), stateRegistry: 
make(map[Labels][]ExecutionState)}
+}
+
+// AddState adds an ExecutionState of a PTransform to the stateRegistry.
+func (s *Store) AddState(e ExecutionState, pid string) {
+       label := PTransformLabels(pid)
+       if s.stateRegistry == nil {
+               s.stateRegistry = make(map[Labels][]ExecutionState)
+       }
+       s.stateRegistry[label] = append(s.stateRegistry[label], e)

Review comment:
       We still have the memory leak issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to