On Sun, Nov 10, 2019 at 5:06 PM Chad Dombrova <chad...@gmail.com> wrote:

> Hi,
>
>> You can see that each JobMessagesResponse may contain a message *or* a
>>> GetJobStateResponse.
>>>
>>> What’s the intention behind this design?
>>>
>> I believe this was because a user may want to listen to both job state
>> and messages all in one stream.
>>
>
> Just to be crystal clear, what's the advantage of using a single stream
> versus two?
>

gRPC guarantees that the messages are delivered in order and its purely
convenience since you would likely want to know about both job state
changes and any important messages from the runner. I'm sure that this
could be changed though.

> The reason this is important to me is I’d like to make a handful of
>>> changes to GetMessageStream to make it more powerful:
>>>
>>>    - propagate messages from user code (if they opt in to setting up
>>>    their logger appropriately). currently, AFAICT, the only message the
>>>    message stream delivers is a final error, if the job fails (other than
>>>    state changes). It was clearly the original intent of this endpoint to
>>>    carry other types of messages, and I'd like to bring that to fruition.
>>>
>>> Log messages is a lot of data, we do have users writing GBs/s when
>> aggregated across all their machines in Google Cloud so not sure if this
>> will scale without a lot of control on filtering. Users sometimes don't
>> recognize how much they are logging and if you have a 1000 VMs each writing
>> only a few lines at a time you can easily saturate this stream.
>>
>
> Yes, we're concerned about message volume as well.  The plan would be to
> add filters, which could be propagated from the job server to the logger on
> the runner and sdk (if they support it) to avoid over-saturating the
> stream.  For example, the log-level right now is basically ERROR, so we'd
> propagate that to the runner and it would only send error messages back to
> the job server.  Thus, we should hopefully be able to roll out this feature
> without much change to the end user.  They could then opt-in to higher
> volume message levels, if desired.
>
> Some possible filters could be:
>
>    - job id (required)
>    - log level (default=ERROR)
>    - transform id(s) (optional. defaults to just runner messages)
>    - a jsonpath <https://github.com/json-path/JsonPath> selector for
>    filtering on message metadata?
>
> I think a logging implementation would consist of 2 parts:  the logging
> service (i.e. an implementation of GetMessageStream) and the logging
> handler for emitting messages from the runner and optionally user
> transforms.  Handlers would need to be implemented for each SDK (i.e.
> language).
>
> The default logging implementation would consist of the InMemoryJobService
> on the servicer side, which would send the filter object to the handler.
>  The handler would pre-filter messages and stream them back to the standard
> job service, which would simply forward on everything it receives, as it
> does now.
>
> A StackDriver logging service would be a bit different.  Its logging
> handler might send *everything* to StackDriver so that there's a complete
> record that can be sifted through later.  Its servicer component would
> interpret the filter object into a StackDriver filter string and create a
> subscription with StackDriver.
>
> In this way we could support both semi-persistent logging services with a
> queryable history (like StackDriver) and completely transient message
> streams like we have now.
>

I see, only in the former case would you push the logging configuration to
the SDK.
I was interested in being able to have dynamic logging configuration for
the SDK so that users would be able to change log levels of their pipeline
as its running from a UI. A common use case is that you have some long
running streaming pipeline and that it started to fail on some piece of
data and you want to increase the log level to get more details. The
LogControl message was meant to enable such a feature[1].

>
>>>    - make it possible to back GetMessageStream with logging services
>>>    like StackDriver, CloudWatch, or Elasticsearch
>>>
>>> That is interesting, originally the message stream was designed around
>> system messages from the runner and not specifically around users log
>> messages due to volume concerns. All logging integration to my knowledge
>> has been deferred to the client libraries for those specific services.
>>
>
> What we're after is a user experience akin to what the Dataflow UI
> provides: view a pipeline, open the log console, and view recent messages
> from the runner.  click on a transform to view messages emitted by that
> transform.  We've found Flink's logging and log UI to be sorely lacking and
> we like the idea of tackling this problem at the Beam level, especially
> considering so much of what we want is already there in some form.
>
> Another use case that I think would benefit from this is providing custom
> progress messages to users who launch a batch job from a shell, since the
> message stream is already emitted there.   Of course, you'd have to be
> careful about message volume, but as I mentioned there would be 2 levels
> where you'd need to opt in:
>
>    - changing log level from its default (ERROR)
>    - setting up transform-level logging
>
>
> -chad
>
>
1:
https://github.com/apache/beam/blob/0178652b6c5b2e34f8b1a562e2d398eebe63587a/model/fn-execution/src/main/proto/beam_fn_api.proto#L791

Reply via email to