+ Max, Robert, dev@ Your assessment is correct Łukasz! We need to get metrics sent over the job API from the JobServicePipelineResult in the job service to the "portable harness" that the user is running directly. Robert added the RPC for that in #8018 <https://github.com/apache/beam/pull/8018>, but code to use it is incomplete/unmerged.
I implemented one way of doing this in #7934 <https://github.com/apache/beam/pull/7934>, but ultimately Alex, Robert, and I decided a different approach made more sense (detailed below). No one has built that yet, and I don't have time to for the foreseeable future, unfortunately, so it is stalled. Here's an explanation of the possible approaches: In Java, there are existing MetricResult <https://github.com/apache/beam/blob/v2.11.0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java> (s <https://github.com/apache/beam/blob/v2.11.0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java>) classes in use by "legacy" runners. However, portable metrics come from the SDK harness as MonitoringInfo protos <https://github.com/apache/beam/blob/v2.11.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java#L150-L161>, which have a more flexible data model. #7915 <https://github.com/apache/beam/pull/7915> (closed) expands SDK classes' underlying data-models to mirror MonitoringInfos', so that we could losslessly convert between MIs and SDK reprs (and back). This supported a flow where: - Metrics come in from SDK harness as MIs - They're converted + stored as SDK MetricResults - Later, they're sent over the job API as MIs - The portable harness receives them and converts back to SDK form (to serve SDK MetricResults queries with the existing API). However, converting between MI- and SDK-forms (including widening the latter to make this possible) required more code and churn than folks were comfortable with, and is arguably not the right design. Another approach would be to have metrics classes like MetricContainerStepMap <https://github.com/apache/beam/blob/v2.11.0/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java> store some metrics as SDK MetricResults, and others as MIs. A version of this is currently somewhat hacked in, with an "unbound container <https://github.com/apache/beam/blob/v2.11.0/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java#L50>" for "system" metrics that aren't keyed off a PTransform name, and a MonitoringInfoMetricName <https://github.com/apache/beam/blob/v2.11.0/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricName.java#L32-L37> that inherits MetricName but wraps a whole MI (which more closely corresponds to a MetricResult; here's more discussion of how SDK classes might logically map to MI fields <https://docs.google.com/document/d/1m83TsFvJbOlcLfXVXprQm1B7vUakhbLZMzuRrOHWnTg/edit#heading=h.mmkhv7o7bcsb> ). We could continue that approach, and when it's time to send metrics over the job API, we'd `instanceof` the MetricNames to see which ones contain MIs, and only send those over the job API. I was uneasy with that (partly because metrics will always all be in one form or the other, depending whether the runner is portable or not). I would think it clearer to [use a common form across legacy and portable runners] xor [route them through separate code-paths, if their models are irreconcilable], but the former was decided against and the latter may also be more work than it's worth. hth! -Ryan On Fri, Mar 29, 2019 at 1:41 PM Łukasz Gajowy <[email protected]> wrote: > Hello, > > thanks, Pablo for gathering the right people in this thread. I'm indeed > interested in metrics related work. Currently, I'm setting up an > infrastructure for Beam's Jenkins to be able to run portable load test jobs > on Flink cluster. Therefore I'd like to ask: what is the status of portable > metrics for Flink runner? Should I be able to read distribution and counter > metrics from PipelineResult (in both Java and Python SDKs)? If I understood > the output from the load tests I ran locally, the metrics get collected > from pipelines, only reading them and returning to pipeline result is not > there yet. Am I right here? > > In any case, maybe there's something you think I could implement in this > area? I'd be happy to help! :) > > Best regards, > *Łukasz Gajowy,* > Polidea | Senior Software Engineer > > M: +48 510 254 681 <+48%20510%20254%20681> > E: [email protected] > > > On Mon, Mar 25, 2019 at 6:51 PM Pablo Estrada <[email protected]> wrote: > >> Hello everyone, >> Lukasz is developing a pipeline that runs a large-scale load test on the >> portable Flink runner. For these load tests, we're using the Metrics API to >> collect information about the job (e.g. throughput, latency, runtime, etc). >> >> Lukazs is 1) curious about your work, interested in relying on it for the >> load tests, and 2) happy to help if help would be useful. >> >> Can you share what are your plans / how the work is going to have support >> for metrics API on Flink clusters? >> Thanks! >> -P. >> >
