On Fri, Mar 29, 2019 at 2:44 PM Ryan Williams <[email protected]> wrote:
> + Max, Robert, dev@ > > Your assessment is correct Łukasz! We need to get metrics sent over the > job API from the JobServicePipelineResult in the job service to the > "portable harness" that the user is running directly. > One correction: this should have been "from the JobInvocation in the job service to the JobServicePipelineResult in the 'portable harness'" Here are the job-service <https://github.com/apache/beam/pull/7934/files#diff-d4d147290be404469ebc926e593a187eR339> and portable-harness <https://github.com/apache/beam/pull/7934/files#diff-284acf2526fcf07c05ee44aefbd9d0c3R127> sides of that from #7934. > Robert added the RPC for that in #8018 > <https://github.com/apache/beam/pull/8018>, but code to use it is > incomplete/unmerged. > > I implemented one way of doing this in #7934 > <https://github.com/apache/beam/pull/7934>, but ultimately Alex, Robert, > and I decided a different approach made more sense (detailed below). No one > has built that yet, and I don't have time to for the foreseeable future, > unfortunately, so it is stalled. > > Here's an explanation of the possible approaches: > > In Java, there are existing MetricResult > <https://github.com/apache/beam/blob/v2.11.0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java> > (s > <https://github.com/apache/beam/blob/v2.11.0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java>) > classes in use by "legacy" runners. However, portable metrics come from the > SDK harness as MonitoringInfo protos > <https://github.com/apache/beam/blob/v2.11.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java#L150-L161>, > which have a more flexible data model. > > #7915 <https://github.com/apache/beam/pull/7915> (closed) expands SDK > classes' underlying data-models to mirror MonitoringInfos', so that we > could losslessly convert between MIs and SDK reprs (and back). > > This supported a flow where: > > - Metrics come in from SDK harness as MIs > - They're converted + stored as SDK MetricResults > - Later, they're sent over the job API as MIs > - The portable harness receives them and converts back to SDK form (to > serve SDK MetricResults queries with the existing API). > > However, converting between MI- and SDK-forms (including widening the > latter to make this possible) required more code and churn than folks were > comfortable with, and is arguably not the right design. > > Another approach would be to have metrics classes like > MetricContainerStepMap > <https://github.com/apache/beam/blob/v2.11.0/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java> > store some metrics as SDK MetricResults, and others as MIs. > > A version of this is currently somewhat hacked in, with an "unbound > container > <https://github.com/apache/beam/blob/v2.11.0/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java#L50>" > for "system" metrics that aren't keyed off a PTransform name, and a > MonitoringInfoMetricName > <https://github.com/apache/beam/blob/v2.11.0/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricName.java#L32-L37> > that inherits MetricName but wraps a whole MI (which more closely > corresponds to a MetricResult; here's more discussion of how SDK classes > might logically map to MI fields > <https://docs.google.com/document/d/1m83TsFvJbOlcLfXVXprQm1B7vUakhbLZMzuRrOHWnTg/edit#heading=h.mmkhv7o7bcsb> > ). > > We could continue that approach, and when it's time to send metrics over > the job API, we'd `instanceof` the MetricNames to see which ones contain > MIs, and only send those over the job API. > > I was uneasy with that (partly because metrics will always all be in one > form or the other, depending whether the runner is portable or not). I > would think it clearer to [use a common form across legacy and portable > runners] xor [route them through separate code-paths, if their models are > irreconcilable], but the former was decided against and the latter may also > be more work than it's worth. > > hth! > > -Ryan > > On Fri, Mar 29, 2019 at 1:41 PM Łukasz Gajowy <[email protected]> > wrote: > >> Hello, >> >> thanks, Pablo for gathering the right people in this thread. I'm indeed >> interested in metrics related work. Currently, I'm setting up an >> infrastructure for Beam's Jenkins to be able to run portable load test jobs >> on Flink cluster. Therefore I'd like to ask: what is the status of portable >> metrics for Flink runner? Should I be able to read distribution and counter >> metrics from PipelineResult (in both Java and Python SDKs)? If I understood >> the output from the load tests I ran locally, the metrics get collected >> from pipelines, only reading them and returning to pipeline result is not >> there yet. Am I right here? >> >> In any case, maybe there's something you think I could implement in this >> area? I'd be happy to help! :) >> >> Best regards, >> *Łukasz Gajowy,* >> Polidea | Senior Software Engineer >> >> M: +48 510 254 681 <+48%20510%20254%20681> >> E: [email protected] >> >> >> On Mon, Mar 25, 2019 at 6:51 PM Pablo Estrada <[email protected]> wrote: >> >>> Hello everyone, >>> Lukasz is developing a pipeline that runs a large-scale load test on the >>> portable Flink runner. For these load tests, we're using the Metrics API to >>> collect information about the job (e.g. throughput, latency, runtime, etc). >>> >>> Lukazs is 1) curious about your work, interested in relying on it for >>> the load tests, and 2) happy to help if help would be useful. >>> >>> Can you share what are your plans / how the work is going to have >>> support for metrics API on Flink clusters? >>> Thanks! >>> -P. >>> >>
