lostluck commented on a change in pull request #15657: URL: https://github.com/apache/beam/pull/15657#discussion_r728629054
########## File path: sdks/go/pkg/beam/core/metrics/sampler_test.go ########## @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "context" + "testing" + "time" +) + +func TestSampler(t *testing.T) { + ctx := context.Background() + pctx := SetPTransformID(ctx, "transform") + st := GetStore(pctx) + + s := NewSampler(pctx, st) + + SetPTransformState(pctx, "transform", StartBundle) + time.Sleep(200 * time.Millisecond) + go s.Start(pctx, 200*time.Millisecond) + + time.Sleep(200 * time.Millisecond) + go s.Start(pctx, 200*time.Millisecond) + + SetPTransformState(pctx, "transform", ProcessBundle) + IncTransition(pctx) + + time.Sleep(200 * time.Millisecond) + go s.Start(pctx, 200*time.Millisecond) + + SetPTransformState(pctx, "transform", FinishBundle) + IncTransition(pctx) + + time.Sleep(200 * time.Millisecond) + go s.Start(pctx, 200*time.Millisecond) + + time.Sleep(200 * time.Millisecond) + go s.Start(pctx, 200*time.Millisecond) + + time.Sleep(200 * time.Millisecond) + go s.Start(pctx, 200*time.Millisecond) + + time.Sleep(200 * time.Millisecond) + go s.Stop(pctx, 200*time.Millisecond) Review comment: Why is stop taking a increment value? We don't know what the time difference has been since the last sample call. It's OK if we lose a bit of time at the end. It's not a precise sampler. ########## File path: sdks/go/pkg/beam/core/metrics/sampler_test.go ########## @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "context" + "testing" + "time" +) + +func TestSampler(t *testing.T) { + ctx := context.Background() + pctx := SetPTransformID(ctx, "transform") + st := GetStore(pctx) + + s := NewSampler(pctx, st) + + SetPTransformState(pctx, "transform", StartBundle) Review comment: Since `pctx` has had the TransformID set, then it knows that it's the Ptransform's "transform"'s context, and SetPTransformState doesn't need to have the transform ID passed in on every single call. It's already being passed, in the context. ########## File path: sdks/go/pkg/beam/core/metrics/sampler_test.go ########## @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "context" + "testing" + "time" +) + +func TestSampler(t *testing.T) { Review comment: Here's some pseudo code for how a basic test should look. The goal is a simple straightforward test case. Make it as simple as possible, without sleeps or starting go routines. Don't validate correctness with concurrency if you don't have to. You do not have to in this case. The point is the test is being all 3 involved threads. The one setting the state (the SetState calls), the one sampling the state (the Sample calls), and the one reading out the metrics for export (or whatever other internal details we can look at for the purpose of this test). Copy paste this, make it real code, make it pass. ``` 1. Create a initialized "bundle" context. 2. Create a sampler object s, from the bundle context. 3. Create a "PTransform" context for pid "transform", based on the bundle context. // Here the ptransform start bundle call happened, and it's taken a bit of time, so a sample is taken during StartBundle 4. SetState(pctx, StartBundle) 5. s.Sample(200ms) // Validate that that works properly. 6.a Validate that the StartBundle state for pid "transform" has 200 ms. 6.b Validate that the ProcessBundle state for pid "transform" has 0 ms. 6.c Validate that the FinishBundle state for pid "transform" has 0 ms. 6.d Validate that the TotalProcessingTime state for "pid" transform" has 200 ms. 6.e Validate that the number of transitions so far is 1. // Here the PTransform is processing a few elements gets sampled during it a couple of them. 7. SetState(pctx, Process) 8. s.Sample(200ms) 9. SetState(pctx, Process) 10. SetState(pctx, Process) 11. s.Sample(200ms) 12.a Validate that the StartBundle state for pid "transform" has 200 ms. 12.b Validate that the ProcessBundle state for pid "transform" has 400 ms. 12.c Validate that the FinishBundle state for pid "transform" has 0 ms. 12.d Validate that the TotalProcessingTime state for "pid" transform" has 600 ms. 12.e Validate that the number of transitions so far is 4. 12.f Validate that the the "time since last transition" 0. // Pretend the ptransform is busy with a single element. 13. s.Sample(200ms) 14. s.Sample(200ms) 15. s.Sample(200ms) // Check that additional samples have accumulated while the PTransform was "busy" with one element. 16.a Validate that the StartBundle state for pid "transform" has 200 ms. 16.b Validate that the ProcessBundle state for pid "transform" has 1000 ms. 16.c Validate that the FinishBundle state for pid "transform" has 0 ms. 16.d Validate that the TotalProcessingTime state for "pid" transform" has 1200 ms. 16.e Validate that the number of transitions so far is 4. 16.f Validate that the the "time since last transition" 600ms. 17. SetState(pctx, FinishBundle) // Check that the final metric readout matches what we expect. 18.a Validate that the StartBundle state for pid "transform" has 200 ms. 18.b Validate that the ProcessBundle state for pid "transform" has 1000 ms. 18.c Validate that the FinishBundle state for pid "transform" has 0 ms. 18.d Validate that the TotalProcessingTime state for "pid" transform" has 1200 ms. 18.e Validate that the number of transitions so far is 5. 18.f Validate that the the "time since last transition" 0ms. // Nothing needs to be done here since sampling has "stopped" and the final metrics have been read out. ``` As always, if you aren't sure about something, ask. ########## File path: sdks/go/pkg/beam/core/metrics/sampler_test.go ########## @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "context" + "testing" + "time" +) + +func TestSampler(t *testing.T) { + ctx := context.Background() + pctx := SetPTransformID(ctx, "transform") + st := GetStore(pctx) + + s := NewSampler(pctx, st) + + SetPTransformState(pctx, "transform", StartBundle) + time.Sleep(200 * time.Millisecond) + go s.Start(pctx, 200*time.Millisecond) + + time.Sleep(200 * time.Millisecond) + go s.Start(pctx, 200*time.Millisecond) Review comment: In this test, the point is to never actually sleep, and never actually spin off new goroutines. Do everything in a single linear thread. No asynchrony. Do not call time.Sleep in this test. If you are, something has gone wrong. ########## File path: sdks/go/pkg/beam/core/metrics/sampler_test.go ########## @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "context" + "testing" + "time" +) + +func TestSampler(t *testing.T) { + ctx := context.Background() + pctx := SetPTransformID(ctx, "transform") + st := GetStore(pctx) + + s := NewSampler(pctx, st) + + SetPTransformState(pctx, "transform", StartBundle) + time.Sleep(200 * time.Millisecond) + go s.Start(pctx, 200*time.Millisecond) + + time.Sleep(200 * time.Millisecond) + go s.Start(pctx, 200*time.Millisecond) + + SetPTransformState(pctx, "transform", ProcessBundle) + IncTransition(pctx) Review comment: I've mentioned this before. Setting the state should increment the transitions at the same time. One call should do both thing. Every time the state is set, *that* is a transition and must be counted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
