Hi Beam Committers,

I am a developer on Beam Samza runner. Currently, we are seeing some issues
where our users failed to update Metrics in their thread. I am wondering if
anyone has suggestions on this issue.

Problem:
MetricsContainer
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java>
is
ThreadLocal in MetricsEnvironment
<https://github.com/apache/beam/commits/master/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java>.
Whenever DelegatingCounter
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingCounter.java>.inc()
is called. It tries to find the MetricsContainer in the current thread and
update the corresponding CounterCell. For Samza runner, we have a
FnWithMetricsWrapper
<https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/DoFnRunnerWithMetrics.java>
to
set the MetricsContainer for the current thread before each DoFn is run.
However, if users define their own threads inside a Pardo function and try
to update the Metrics in their threads, they will fail to update the
Metrics and get error log "Unable to update metrics on the current
thread...."
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java#L119>
.

Example:

pipeline
    .apply(Create.of(inputData))
    .apply(ParDo.of(new DoFn<KV<String, String>, Void>() {
  @ProcessElement
  public void processElement(ProcessContext context) {
    Metrics.counter("test", "counter1").inc();
    Thread thread = new Thread(() -> {
      Metrics.counter("test", "counter2").inc();
    }, "a user-defined thread");
    thread.start();
  }
}));

In this case, counter1 can be updated but counter2 cannot be updated
because MetricsContainer has not been set in their thread.

We don't have any control of user-defined threads. So, it seems impossible
for our runner to set the MetricsContainer for their threads. Can someone
give me some suggestions either from developer's perspective or from user's
perspective about how to make this use case work?

Thanks,
Yixing

Reply via email to