[ 
https://issues.apache.org/jira/browse/SAMZA-348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14167573#comment-14167573
 ] 

Chris Riccomini commented on SAMZA-348:
---------------------------------------

Known remaining issues with the proposed design:

# Message payload format (K:V, vs. K:{K:V, ...})
# How does the control-job.sh script use the SystemConsumer/SystemProducer?
# How will this work in a dev environment?

I will address these in order.

*Message payload format*

The current design models ConfigStream messages as a simple key-value pair. The 
downside to this approach is that it breaks atomicity for a StreamTask's 
checkpoint (multiple messages are required for a single checkpoint--one per 
SSP:offset pair).

The two solutions to this are to (1) depend on transactionality, or (2) support 
a message payload format that is nested (K: {K:V, ...}). All offset checkpoints 
for a single task could therefore be written in a single message, thus 
maintaining atomic commits for all checkpoints within a single task. The latter 
approach (nested payloads) is how we currently checkpoint. The downsides to 
this approach are:

# The single offset checkpoint message will be much larger than any individual 
offset checkpoint message in approach (1).
# Modifying an offset checkpoint requires the job coordinator to do a 
read-modify-write, which is more complicated than the simple put that would be 
required for approach (1).
# It muddles the data model a little bit.

The problem with (1) is mainly that it depends on transactionality. Without 
this, there's the potential for a failure to occur halfway through a task 
checkpoint. In such a case, some input streams would fall back, and others 
would not. I tend to agree with Martin's assessment of the problem:

bq. My hunch is that it's not a big problem, since state changelogs are 
currently also not atomically tied to the checkpoints either, so the semantics 
of container restart are somewhat vague already.

But given that it should be fairly trivial to solve this using nested payloads, 
we might as well do so. We can always clean it up later, if transactionality 
becomes commonplace.

*How does the control-job.sh script use the SystemConsumer/SystemProducer?*

This is a tricky one. Given that Samza has a SystemConsumer/SystemProducer API, 
it seems ideal to have the ConfigStream implementation use these interfaces for 
reading/writing config. In the design document, I glossed over how the job 
coordinator and control-job.sh script know how to translate a URI to a Config 
for SystemConsumer/SystemProducer. This is a bit of a chicken and egg problem. 
The control-job.sh script needs to know how to write to the ConfigStream, but 
in order to do that, it needs config for the SystemFactory.getConsumer() call.

Two potential solutions that I can think of are:

# Introduce a SAMZA\_HOME environment variable, which expects a 
conf/samza-site.properties configuration.
# Add a SystemFactory.getConfig(URI uri) interface.

Introducing a SAMZA\_HOME environment variable seems very heavy handed. It's 
going to have to be set on every node in the YARN cluster (since the job 
coordinator could run on any node), as well as the machine that control-job.sh 
is going to run on. This will be hard to operate, may be (Samza) version 
dependent, and seems kind of clunky.

Adding a getConfig() API seems mildly hacky. The main problem with this 
approach is how to determine which SystemFactory to use based on the URI. We 
could do something as simple as Class.forName(uri.getScheme() + 
"SystemFactory").newInstance(). This seems a bit hacky and dangerous, but 
should work, and maintains pluggability.

Does anyone else have any other ideas for this?

*How will this work in a dev environment?*

It's relatively easy to start a Samza job locally using the ThreadJobFactory or 
ProcessJobFactory right now. Config can be inserted via the constructor, and no 
Kafka grid is required to do this. In the new design proposal, it seems that 
developers will be required to have a Kafka grid (or some equivalent system 
implementation--hbase, or whatever) to store their configuration. There doesn't 
seem to be much of a way around this, unless the 
FileSystemConsumer/FileSystemProducer could be made to work as the backing 
system for the ConfigStream (which seems possible at first glance).

> Configure Samza jobs through a stream
> -------------------------------------
>
>                 Key: SAMZA-348
>                 URL: https://issues.apache.org/jira/browse/SAMZA-348
>             Project: Samza
>          Issue Type: Bug
>    Affects Versions: 0.7.0
>            Reporter: Chris Riccomini
>              Labels: project
>         Attachments: DESIGN-SAMZA-348-0.md, DESIGN-SAMZA-348-0.pdf, 
> DESIGN-SAMZA-348-1.md, DESIGN-SAMZA-348-1.pdf
>
>
> Samza's existing config setup is problematic for a number of reasons:
> # It's completely immutable once a job starts. This prevents any dynamic 
> reconfiguration and auto-scaling. It is debatable whether we want these 
> feature or not, but our existing implementation actively prevents it. See 
> SAMZA-334 for discussion.
> # We pass existing configuration through environment variables. YARN exports 
> environment variables in a shell script, which limits the size to the varargs 
> length on the machine. This is usually ~128KB. See SAMZA-333 and SAMZA-337 
> for details.
> # User-defined configuration (the Config object) and programmatic 
> configuration (checkpoints and TaskName:State mappings (see SAMZA-123)) are 
> handled differently. It's debatable whether this makes sense.
> In SAMZA-123, [~jghoman] and I propose implementing a ConfigLog. This log 
> would replace both the checkpoint topic and the existing config environment 
> variables in SamzaContainer and Samza's YARN AM.
> I'd like to keep this ticket's scope limited to just the implementation of 
> the ConfigLog, and not re-designing how Samza's config is used in the code 
> (SAMZA-40). We should, however, discuss how this feature would affect dynamic 
> reconfiguration/auto-scaling.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to