Status Current state: UNDER DISCUSSION ... JIRA:
JIRA |
| server |
ASF JIRA |
| serverId |
5aa69414-a9e9-3523-82ec-879b028fb15b |
| key |
SAMZA-1554 |
|
Released: Problem Samza framework enables its users to build stateful stream processing applications–that is, applications that remember information about past events in a local state(store), which will be then used to influence the processing of future events from the stream. Local state is a fundamental and enabling concept in stream processing which is required and essential to support a majority of common use cases such as stream-stream join, stream-table join, windowing etc. Every stream application in samza has many task instances which contains a custom user-defined function for processing events from a stream. Each task instance will have one to many associated local stores. Local store of a task instance is backed up by an log compacted kafka topic referred to as change-log. When a task instance commits, incremental local task store updates are flushed to the kafka topic. When a task instance runs on a host that doesn’t have latest local store, it’s restored by replaying messages from the change-log stream. For large stateful jobs, this restoration phase takes longer time, thus preventing the application from starting up and processing events from the input streams. Host affinity is a feature that maintains stickiness between a task and physical host and offers best-effort guarantees that a task instance will be assigned to run on the same physical it had ran before. This document discusses some potential approaches to support this feature in standalone deployment model. Requirements Goals
- Support stateful stream processing in standalone stream applications.
- Minimize partition movements amongst stateful processors in the rebalance phase.
...
-
Modify the existing interfaces and classes as per the proposed solution.
-
Add unit tests to test and validate compatibility and functional correctness.
-
Add a integration test in samza standalone samples to verify the host affinity feature.
-
Verify compatibility - Jackson, a java serialization/deserialization library is used to convert data model objects in samza into JSON and back. After removing containerId field from ContainerModel, it should be verified that deserialization of old ContainerModel data with new ContainerModel spec works.
-
Some TaskNameGrouper implementations assumes the comparability of integer containerId present in ContainerModel(for instance - GroupByContainerCount, a TaskNameGrouper implementation). Modify existing TaskNameGrouper implementations to take in collection of string processorId’s, as opposed to assuming that containerId is integer and lies within [0, N-1] interval(without incurring any change in functionality).
Rejected Alternatives Approach 1 This contains all the changes mentioned in proposed solution with a differing interface changes as listed below. ...
-
Would require boilerplate LocalityManager implementations for every new execution environment integration with standalone. For instance, azure integration would require building AzureTableLocalityManager and wiring it into TaskNameGrouper. Ideally a TaskNameGrouper implementation should not be aware about the underlying storage layer used to persist JobModel.
-
Any TaskNameGrouper implementation could hold references to LocalityManager(a live object) and create object hierarchies based upon that reference. This will clutter the ownership of LocalityManager and could potentially create an unintentional resource leak.
Approach 2 GroupByContainerIds is the only TaskNameGrouper currently supported in standalone. Implement the host aware task to stream processors assignment for standalone in GroupByContainerIds. ...
-
Ideally any grouper should be usable in both yarn and standalone deployment model. If we proceed with this approach, custom groupers cannot be supported in standalone. This limits the extensibility available in yarn in standalone and loses enormous value proposition in standalone.
Approach 3 Do not change any existing interfaces and pass the previous generation ContainerModel, TaskModels to TaskNameGrouper implementations through the config object and document it in the interface contract. Cons: ... |