> You can see that each JobMessagesResponse may contain a message *or* a
>> GetJobStateResponse.
>> What’s the intention behind this design?
> I believe this was because a user may want to listen to both job state and
> messages all in one stream.

Just to be crystal clear, what's the advantage of using a single stream
versus two?

> The reason this is important to me is I’d like to make a handful of
>> changes to GetMessageStream to make it more powerful:
>>    - propagate messages from user code (if they opt in to setting up
>>    their logger appropriately). currently, AFAICT, the only message the
>>    message stream delivers is a final error, if the job fails (other than
>>    state changes). It was clearly the original intent of this endpoint to
>>    carry other types of messages, and I'd like to bring that to fruition.
>> Log messages is a lot of data, we do have users writing GBs/s when
> aggregated across all their machines in Google Cloud so not sure if this
> will scale without a lot of control on filtering. Users sometimes don't
> recognize how much they are logging and if you have a 1000 VMs each writing
> only a few lines at a time you can easily saturate this stream.

Yes, we're concerned about message volume as well.  The plan would be to
add filters, which could be propagated from the job server to the logger on
the runner and sdk (if they support it) to avoid over-saturating the
stream.  For example, the log-level right now is basically ERROR, so we'd
propagate that to the runner and it would only send error messages back to
the job server.  Thus, we should hopefully be able to roll out this feature
without much change to the end user.  They could then opt-in to higher
volume message levels, if desired.

Some possible filters could be:

   - job id (required)
   - log level (default=ERROR)
   - transform id(s) (optional. defaults to just runner messages)
   - a jsonpath <https://github.com/json-path/JsonPath> selector for
   filtering on message metadata?

I think a logging implementation would consist of 2 parts:  the logging
service (i.e. an implementation of GetMessageStream) and the logging
handler for emitting messages from the runner and optionally user
transforms.  Handlers would need to be implemented for each SDK (i.e.

The default logging implementation would consist of the InMemoryJobService
on the servicer side, which would send the filter object to the handler.
 The handler would pre-filter messages and stream them back to the standard
job service, which would simply forward on everything it receives, as it
does now.

A StackDriver logging service would be a bit different.  Its logging
handler might send *everything* to StackDriver so that there's a complete
record that can be sifted through later.  Its servicer component would
interpret the filter object into a StackDriver filter string and create a
subscription with StackDriver.

In this way we could support both semi-persistent logging services with a
queryable history (like StackDriver) and completely transient message
streams like we have now.

>>    - make it possible to back GetMessageStream with logging services
>>    like StackDriver, CloudWatch, or Elasticsearch
>> That is interesting, originally the message stream was designed around
> system messages from the runner and not specifically around users log
> messages due to volume concerns. All logging integration to my knowledge
> has been deferred to the client libraries for those specific services.

What we're after is a user experience akin to what the Dataflow UI
provides: view a pipeline, open the log console, and view recent messages
from the runner.  click on a transform to view messages emitted by that
transform.  We've found Flink's logging and log UI to be sorely lacking and
we like the idea of tackling this problem at the Beam level, especially
considering so much of what we want is already there in some form.

Another use case that I think would benefit from this is providing custom
progress messages to users who launch a batch job from a shell, since the
message stream is already emitted there.   Of course, you'd have to be
careful about message volume, but as I mentioned there would be 2 levels
where you'd need to opt in:

   - changing log level from its default (ERROR)
   - setting up transform-level logging


Reply via email to