Thanks, Ryan for a great introduction to the topic - it helped a lot! Let me try to fuse all the discussions we had in this one thread.
You mentioned[1] that you thought of something similar and asked what problems did I face so let me explain it here as clear as I can: The main trouble I had is the *JobInvocation.runPipeline()* method. It returns a generic *PipelineResult* that is used by *executorService* to listen to the results (*invocationFuture*). The problem is that I run into incompatible PipelineResult types. The runPipeline method returns *FlinkRunnerResult* in runtime when I run a portable Flink job. It cannot be cast to *JobServicePipelineResult* so I get *ClassCastExceptions* due to this fact (no wonder). `java.lang.ClassCastException: org.apache.beam.runners.flink.FlinkRunnerResult cannot be cast to org.apache.beam.runners.fnexecution.jobsubmission.JobServicePipelineResult` Here's a brief, minimalistic sketch of the idea here, showing the problem on my private branch (feel free to comment): [2] Notice that I had to move *JobServicePipelineResult* to `runners.fnexecution.jobsubmission` to make this compile but I guess we're fine with it being in there (aren't we?). Łukasz [1] https://github.com/apache/beam/pull/7934#discussion_r275003333 [2] https://github.com/lgajowy/beam/commit/01bc5dfad434f09050e41f054a88c29059740804 pt., 29 mar 2019 o 19:51 Ryan Williams <[email protected]> napisał(a): > > > On Fri, Mar 29, 2019 at 2:44 PM Ryan Williams <[email protected]> > wrote: > >> + Max, Robert, dev@ >> >> Your assessment is correct Łukasz! We need to get metrics sent over the >> job API from the JobServicePipelineResult in the job service to the >> "portable harness" that the user is running directly. >> > > One correction: this should have been "from the JobInvocation in the job > service to the JobServicePipelineResult in the 'portable harness'" > > Here are the job-service > <https://github.com/apache/beam/pull/7934/files#diff-d4d147290be404469ebc926e593a187eR339> > and portable-harness > <https://github.com/apache/beam/pull/7934/files#diff-284acf2526fcf07c05ee44aefbd9d0c3R127> > sides of that from #7934. > > >> Robert added the RPC for that in #8018 >> <https://github.com/apache/beam/pull/8018>, but code to use it is >> incomplete/unmerged. >> >> I implemented one way of doing this in #7934 >> <https://github.com/apache/beam/pull/7934>, but ultimately Alex, Robert, >> and I decided a different approach made more sense (detailed below). No one >> has built that yet, and I don't have time to for the foreseeable future, >> unfortunately, so it is stalled. >> >> Here's an explanation of the possible approaches: >> >> In Java, there are existing MetricResult >> <https://github.com/apache/beam/blob/v2.11.0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java> >> (s >> <https://github.com/apache/beam/blob/v2.11.0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java>) >> classes in use by "legacy" runners. However, portable metrics come from the >> SDK harness as MonitoringInfo protos >> <https://github.com/apache/beam/blob/v2.11.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java#L150-L161>, >> which have a more flexible data model. >> >> #7915 <https://github.com/apache/beam/pull/7915> (closed) expands SDK >> classes' underlying data-models to mirror MonitoringInfos', so that we >> could losslessly convert between MIs and SDK reprs (and back). >> >> This supported a flow where: >> >> - Metrics come in from SDK harness as MIs >> - They're converted + stored as SDK MetricResults >> - Later, they're sent over the job API as MIs >> - The portable harness receives them and converts back to SDK form >> (to serve SDK MetricResults queries with the existing API). >> >> However, converting between MI- and SDK-forms (including widening the >> latter to make this possible) required more code and churn than folks were >> comfortable with, and is arguably not the right design. >> >> Another approach would be to have metrics classes like >> MetricContainerStepMap >> <https://github.com/apache/beam/blob/v2.11.0/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java> >> store some metrics as SDK MetricResults, and others as MIs. >> >> A version of this is currently somewhat hacked in, with an "unbound >> container >> <https://github.com/apache/beam/blob/v2.11.0/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java#L50>" >> for "system" metrics that aren't keyed off a PTransform name, and a >> MonitoringInfoMetricName >> <https://github.com/apache/beam/blob/v2.11.0/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricName.java#L32-L37> >> that inherits MetricName but wraps a whole MI (which more closely >> corresponds to a MetricResult; here's more discussion of how SDK classes >> might logically map to MI fields >> <https://docs.google.com/document/d/1m83TsFvJbOlcLfXVXprQm1B7vUakhbLZMzuRrOHWnTg/edit#heading=h.mmkhv7o7bcsb> >> ). >> >> We could continue that approach, and when it's time to send metrics over >> the job API, we'd `instanceof` the MetricNames to see which ones contain >> MIs, and only send those over the job API. >> >> I was uneasy with that (partly because metrics will always all be in one >> form or the other, depending whether the runner is portable or not). I >> would think it clearer to [use a common form across legacy and portable >> runners] xor [route them through separate code-paths, if their models are >> irreconcilable], but the former was decided against and the latter may also >> be more work than it's worth. >> >> hth! >> >> -Ryan >> >> On Fri, Mar 29, 2019 at 1:41 PM Łukasz Gajowy <[email protected]> >> wrote: >> >>> Hello, >>> >>> thanks, Pablo for gathering the right people in this thread. I'm indeed >>> interested in metrics related work. Currently, I'm setting up an >>> infrastructure for Beam's Jenkins to be able to run portable load test jobs >>> on Flink cluster. Therefore I'd like to ask: what is the status of portable >>> metrics for Flink runner? Should I be able to read distribution and counter >>> metrics from PipelineResult (in both Java and Python SDKs)? If I understood >>> the output from the load tests I ran locally, the metrics get collected >>> from pipelines, only reading them and returning to pipeline result is not >>> there yet. Am I right here? >>> >>> In any case, maybe there's something you think I could implement in this >>> area? I'd be happy to help! :) >>> >>> Best regards, >>> *Łukasz Gajowy,* >>> Polidea | Senior Software Engineer >>> >>> M: +48 510 254 681 <+48%20510%20254%20681> >>> E: [email protected] >>> >>> >>> On Mon, Mar 25, 2019 at 6:51 PM Pablo Estrada <[email protected]> >>> wrote: >>> >>>> Hello everyone, >>>> Lukasz is developing a pipeline that runs a large-scale load test on >>>> the portable Flink runner. For these load tests, we're using the Metrics >>>> API to collect information about the job (e.g. throughput, latency, >>>> runtime, etc). >>>> >>>> Lukazs is 1) curious about your work, interested in relying on it for >>>> the load tests, and 2) happy to help if help would be useful. >>>> >>>> Can you share what are your plans / how the work is going to have >>>> support for metrics API on Flink clusters? >>>> Thanks! >>>> -P. >>>> >>>
