Thanks, Ryan for a great introduction to the topic - it helped a lot! Let
me try to fuse all the discussions we had in this one thread.

You mentioned[1] that you thought of something similar and asked what
problems did I face so let me explain it here as clear as I can:

The main trouble I had is the *JobInvocation.runPipeline()* method. It
returns a generic *PipelineResult* that is used by *executorService* to
listen to the results (*invocationFuture*). The problem is that I run into
incompatible PipelineResult types. The runPipeline method returns
*FlinkRunnerResult* in runtime when I run a portable Flink job. It cannot
be cast to *JobServicePipelineResult* so I get *ClassCastExceptions* due to
this fact (no wonder).

`java.lang.ClassCastException:
org.apache.beam.runners.flink.FlinkRunnerResult cannot be cast to
org.apache.beam.runners.fnexecution.jobsubmission.JobServicePipelineResult`

Here's a brief, minimalistic sketch of the idea here, showing the problem
on my private branch (feel free to comment): [2]

Notice that I had to move *JobServicePipelineResult* to
`runners.fnexecution.jobsubmission` to make this compile but I guess we're
fine with it being in there (aren't we?).

Łukasz

[1] https://github.com/apache/beam/pull/7934#discussion_r275003333
[2]
https://github.com/lgajowy/beam/commit/01bc5dfad434f09050e41f054a88c29059740804


pt., 29 mar 2019 o 19:51 Ryan Williams <[email protected]> napisał(a):

>
>
> On Fri, Mar 29, 2019 at 2:44 PM Ryan Williams <[email protected]>
> wrote:
>
>> + Max, Robert, dev@
>>
>> Your assessment is correct Łukasz! We need to get metrics sent over the
>> job API from the JobServicePipelineResult in the job service to the
>> "portable harness" that the user is running directly.
>>
>
> One correction: this should have been "from the JobInvocation in the job
> service to the JobServicePipelineResult in the 'portable harness'"
>
> Here are the job-service
> <https://github.com/apache/beam/pull/7934/files#diff-d4d147290be404469ebc926e593a187eR339>
> and portable-harness
> <https://github.com/apache/beam/pull/7934/files#diff-284acf2526fcf07c05ee44aefbd9d0c3R127>
> sides of that from #7934.
>
>
>> Robert added the RPC for that in #8018
>> <https://github.com/apache/beam/pull/8018>, but code to use it is
>> incomplete/unmerged.
>>
>> I implemented one way of doing this in #7934
>> <https://github.com/apache/beam/pull/7934>, but ultimately Alex, Robert,
>> and I decided a different approach made more sense (detailed below). No one
>> has built that yet, and I don't have time to for the foreseeable future,
>> unfortunately, so it is stalled.
>>
>> Here's an explanation of the possible approaches:
>>
>> In Java, there are existing MetricResult
>> <https://github.com/apache/beam/blob/v2.11.0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java>
>> (s
>> <https://github.com/apache/beam/blob/v2.11.0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java>)
>> classes in use by "legacy" runners. However, portable metrics come from the
>> SDK harness as MonitoringInfo protos
>> <https://github.com/apache/beam/blob/v2.11.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java#L150-L161>,
>> which have a more flexible data model.
>>
>> #7915 <https://github.com/apache/beam/pull/7915> (closed) expands SDK
>> classes' underlying data-models to mirror MonitoringInfos', so that we
>> could losslessly convert between MIs and SDK reprs (and back).
>>
>> This supported a flow where:
>>
>>    - Metrics come in from SDK harness as MIs
>>    - They're converted + stored as SDK MetricResults
>>    - Later, they're sent over the job API as MIs
>>    - The portable harness receives them and converts back to SDK form
>>    (to serve SDK MetricResults queries with the existing API).
>>
>> However, converting between MI- and SDK-forms (including widening the
>> latter to make this possible) required more code and churn than folks were
>> comfortable with, and is arguably not the right design.
>>
>> Another approach would be to have metrics classes like
>> MetricContainerStepMap
>> <https://github.com/apache/beam/blob/v2.11.0/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java>
>> store some metrics as SDK MetricResults, and others as MIs.
>>
>> A version of this is currently somewhat hacked in, with an "unbound
>> container
>> <https://github.com/apache/beam/blob/v2.11.0/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java#L50>"
>> for "system" metrics that aren't keyed off a PTransform name, and a
>> MonitoringInfoMetricName
>> <https://github.com/apache/beam/blob/v2.11.0/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricName.java#L32-L37>
>> that inherits MetricName but wraps a whole MI (which more closely
>> corresponds to a MetricResult; here's more discussion of how SDK classes
>> might logically map to MI fields
>> <https://docs.google.com/document/d/1m83TsFvJbOlcLfXVXprQm1B7vUakhbLZMzuRrOHWnTg/edit#heading=h.mmkhv7o7bcsb>
>> ).
>>
>> We could continue that approach, and when it's time to send metrics over
>> the job API, we'd `instanceof` the MetricNames to see which ones contain
>> MIs, and only send those over the job API.
>>
>> I was uneasy with that (partly because metrics will always all be in one
>> form or the other, depending whether the runner is portable or not). I
>> would think it clearer to [use a common form across legacy and portable
>> runners] xor [route them through separate code-paths, if their models are
>> irreconcilable], but the former was decided against and the latter may also
>> be more work than it's worth.
>>
>> hth!
>>
>> -Ryan
>>
>> On Fri, Mar 29, 2019 at 1:41 PM Łukasz Gajowy <[email protected]>
>> wrote:
>>
>>> Hello,
>>>
>>> thanks, Pablo for gathering the right people in this thread. I'm indeed
>>> interested in metrics related work. Currently, I'm setting up an
>>> infrastructure for Beam's Jenkins to be able to run portable load test jobs
>>> on Flink cluster. Therefore I'd like to ask: what is the status of portable
>>> metrics for Flink runner? Should I be able to read distribution and counter
>>> metrics from PipelineResult (in both Java and Python SDKs)? If I understood
>>> the output from the load tests I ran locally, the metrics get collected
>>> from pipelines, only reading them and returning to pipeline result is not
>>> there yet. Am I right here?
>>>
>>> In any case, maybe there's something you think I could implement in this
>>> area? I'd be happy to help! :)
>>>
>>> Best regards,
>>> *Łukasz Gajowy,*
>>> Polidea | Senior Software Engineer
>>>
>>> M: +48 510 254 681 <+48%20510%20254%20681>
>>> E: [email protected]
>>>
>>>
>>> On Mon, Mar 25, 2019 at 6:51 PM Pablo Estrada <[email protected]>
>>> wrote:
>>>
>>>> Hello everyone,
>>>> Lukasz is developing a pipeline that runs a large-scale load test on
>>>> the portable Flink runner. For these load tests, we're using the Metrics
>>>> API to collect information about the job (e.g. throughput, latency,
>>>> runtime, etc).
>>>>
>>>> Lukazs is 1) curious about your work, interested in relying on it for
>>>> the load tests, and 2) happy to help if help would be useful.
>>>>
>>>> Can you share what are your plans / how the work is going to have
>>>> support for metrics API on Flink clusters?
>>>> Thanks!
>>>> -P.
>>>>
>>>

Reply via email to