mohamedawnallah commented on issue #27038: URL: https://github.com/apache/beam/issues/27038#issuecomment-2643713037
Hi @lostluck, I am interested in this issue and have been exploring Apache Beam metrics outside of the `Setup` methods. As part of my learning, I wrote a simple Beam Go SDK program that converts words to uppercase and ran it on a local Flink development cluster. The dev environment looks like [Apache Beam Go SDK v2.62.0](https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam), [apache/beam_flink1.19_job_server](https://registry.hub.docker.com/r/apache/beam_flink1.19_job_server), and [flink:1.19.0-java11](https://registry.hub.docker.com/layers/library/flink/1.19.0-java11/images/sha256-3ca7e4d36533a6f167f4f1378c648511eb69b1f39fbb6e924ff2ff4000b5ae4e). I observed that no custom metrics are reported in the Flink runner—only internal Beam metrics appear in the `Accumulators` tab. This behavior seems related to issue #32895. Any insights on this? For reference, here is the simple program I used: ```go package main import ( "context" "flag" "fmt" "log" "reflect" "strings" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink" ) // Define metrics. var ( processedCount = metrics.NewCounter("example", "processed_count") invalidCount = metrics.NewCounter("example", "invalid_count") sizeDistribution = metrics.NewDistribution("example", "record_size") ) // UppercaseFn is a DoFn that processes words and updates metrics. type UppercaseFn struct{} func (fn *UppercaseFn) ProcessElement(word string, emit func(string)) { processedCount.Inc(context.Background(), 1) sizeDistribution.Update(context.Background(), int64(len(word))) if word == "" { invalidCount.Inc(context.Background(), 1) return } emit(strings.ToUpper(word)) } // PrintFn is a DoFn that prints each element. type PrintFn struct{} func (fn *PrintFn) ProcessElement(word string) { fmt.Println(word) } func init() { beam.RegisterType(reflect.TypeOf((*UppercaseFn)(nil)).Elem()) beam.RegisterType(reflect.TypeOf((*PrintFn)(nil)).Elem()) } func main() { flag.Parse() beam.Init() pipeline := beam.NewPipeline() scope := pipeline.Root() inputData := []string{"apple", "banana", "", "cherry"} words := beam.CreateList(scope, inputData) uppercaseWords := beam.ParDo(scope, &UppercaseFn{}, words) beam.ParDo0(scope, &PrintFn{}, uppercaseWords) result, err := flink.Execute(context.Background(), pipeline) if err != nil { log.Fatalf("Failed to execute pipeline: %v", err) } metrics := result.Metrics().AllMetrics() for _, counter := range metrics.Counters() { fmt.Printf("Counter %s: %d\n", counter.Name(), counter.Committed) } for _, distribution := range metrics.Distributions() { fmt.Printf("Distribution %s: min=%d, max=%d, sum=%d, mean=%d, count=%d\n", distribution.Name(), distribution.Committed.Min, distribution.Committed.Max, distribution.Committed.Sum, distribution.Committed.Sum/distribution.Committed.Count, distribution.Committed.Count) } } ``` Looking forward to your thoughts! 🙏 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
