On Fri, Mar 29, 2019 at 2:44 PM Ryan Williams <[email protected]> wrote:

> + Max, Robert, dev@
>
> Your assessment is correct Łukasz! We need to get metrics sent over the
> job API from the JobServicePipelineResult in the job service to the
> "portable harness" that the user is running directly.
>

One correction: this should have been "from the JobInvocation in the job
service to the JobServicePipelineResult in the 'portable harness'"

Here are the job-service
<https://github.com/apache/beam/pull/7934/files#diff-d4d147290be404469ebc926e593a187eR339>
and portable-harness
<https://github.com/apache/beam/pull/7934/files#diff-284acf2526fcf07c05ee44aefbd9d0c3R127>
sides of that from #7934.


> Robert added the RPC for that in #8018
> <https://github.com/apache/beam/pull/8018>, but code to use it is
> incomplete/unmerged.
>
> I implemented one way of doing this in #7934
> <https://github.com/apache/beam/pull/7934>, but ultimately Alex, Robert,
> and I decided a different approach made more sense (detailed below). No one
> has built that yet, and I don't have time to for the foreseeable future,
> unfortunately, so it is stalled.
>
> Here's an explanation of the possible approaches:
>
> In Java, there are existing MetricResult
> <https://github.com/apache/beam/blob/v2.11.0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java>
> (s
> <https://github.com/apache/beam/blob/v2.11.0/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java>)
> classes in use by "legacy" runners. However, portable metrics come from the
> SDK harness as MonitoringInfo protos
> <https://github.com/apache/beam/blob/v2.11.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java#L150-L161>,
> which have a more flexible data model.
>
> #7915 <https://github.com/apache/beam/pull/7915> (closed) expands SDK
> classes' underlying data-models to mirror MonitoringInfos', so that we
> could losslessly convert between MIs and SDK reprs (and back).
>
> This supported a flow where:
>
>    - Metrics come in from SDK harness as MIs
>    - They're converted + stored as SDK MetricResults
>    - Later, they're sent over the job API as MIs
>    - The portable harness receives them and converts back to SDK form (to
>    serve SDK MetricResults queries with the existing API).
>
> However, converting between MI- and SDK-forms (including widening the
> latter to make this possible) required more code and churn than folks were
> comfortable with, and is arguably not the right design.
>
> Another approach would be to have metrics classes like
> MetricContainerStepMap
> <https://github.com/apache/beam/blob/v2.11.0/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java>
> store some metrics as SDK MetricResults, and others as MIs.
>
> A version of this is currently somewhat hacked in, with an "unbound
> container
> <https://github.com/apache/beam/blob/v2.11.0/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java#L50>"
> for "system" metrics that aren't keyed off a PTransform name, and a
> MonitoringInfoMetricName
> <https://github.com/apache/beam/blob/v2.11.0/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoMetricName.java#L32-L37>
> that inherits MetricName but wraps a whole MI (which more closely
> corresponds to a MetricResult; here's more discussion of how SDK classes
> might logically map to MI fields
> <https://docs.google.com/document/d/1m83TsFvJbOlcLfXVXprQm1B7vUakhbLZMzuRrOHWnTg/edit#heading=h.mmkhv7o7bcsb>
> ).
>
> We could continue that approach, and when it's time to send metrics over
> the job API, we'd `instanceof` the MetricNames to see which ones contain
> MIs, and only send those over the job API.
>
> I was uneasy with that (partly because metrics will always all be in one
> form or the other, depending whether the runner is portable or not). I
> would think it clearer to [use a common form across legacy and portable
> runners] xor [route them through separate code-paths, if their models are
> irreconcilable], but the former was decided against and the latter may also
> be more work than it's worth.
>
> hth!
>
> -Ryan
>
> On Fri, Mar 29, 2019 at 1:41 PM Łukasz Gajowy <[email protected]>
> wrote:
>
>> Hello,
>>
>> thanks, Pablo for gathering the right people in this thread. I'm indeed
>> interested in metrics related work. Currently, I'm setting up an
>> infrastructure for Beam's Jenkins to be able to run portable load test jobs
>> on Flink cluster. Therefore I'd like to ask: what is the status of portable
>> metrics for Flink runner? Should I be able to read distribution and counter
>> metrics from PipelineResult (in both Java and Python SDKs)? If I understood
>> the output from the load tests I ran locally, the metrics get collected
>> from pipelines, only reading them and returning to pipeline result is not
>> there yet. Am I right here?
>>
>> In any case, maybe there's something you think I could implement in this
>> area? I'd be happy to help! :)
>>
>> Best regards,
>> *Łukasz Gajowy,*
>> Polidea | Senior Software Engineer
>>
>> M: +48 510 254 681 <+48%20510%20254%20681>
>> E: [email protected]
>>
>>
>> On Mon, Mar 25, 2019 at 6:51 PM Pablo Estrada <[email protected]> wrote:
>>
>>> Hello everyone,
>>> Lukasz is developing a pipeline that runs a large-scale load test on the
>>> portable Flink runner. For these load tests, we're using the Metrics API to
>>> collect information about the job (e.g. throughput, latency, runtime, etc).
>>>
>>> Lukazs is 1) curious about your work, interested in relying on it for
>>> the load tests, and 2) happy to help if help would be useful.
>>>
>>> Can you share what are your plans / how the work is going to have
>>> support for metrics API on Flink clusters?
>>> Thanks!
>>> -P.
>>>
>>

Reply via email to