Thanks for all the follow-up, it definitely helped solidify my
understanding of things a bit. Given that, I set about confirming for
myself that running multiple instances of the StreamApplication in
different Pods, provided an appropriate Partition count in Kafka, would
result in different Containers processing messages.

After fixing some bugs in my repo, I confirmed it! However, it did
result in a few questions/possible issues I found.

1. When using a KVSerde<String, Integer> on the input stream,
  deserializing messages always failed. I was producing messages with
  the console producer so maybe I just did it wrong? I thought I set
  the appropriate --property flags. Sending String-only messages with
  a StringSerde worked fine. Details in the stack trace[1].

2. When attempting to use one topic as both input and output, the serdes
  need to match. However, a non-KVSerde never seems to match itself;
  that is, `getInputStream()` and `getOutputStream()` both passed the
  same topic and a StringSerde still fail the precondition. Perhaps
  this is by design.

3. Including samza-log4j now includes samza-core_2.12 which causes
  runtime failures when directly using samza-core_2.11. I only mention
  this because I cribbed most of my dependencies from hello-samza.

4. I receive warnings about PosixCommandBasedStatisticsGetter failing
  because `ps` returned null. I'm not sure if this is a
  misconfiguration on my part or if it's just not designed to be used
  inside a Docker container.

I'm sure I'll have more as time goes on. Thanks for all the help so far!


Jagadish Venkatraman <> writes:

Hey Tom,

As promised, here's the link to the repository: https://github.

I just reviewed your repo for Kubernetes integration. Really nice work on
integrating the high-level API and Kubernetes with
the ZkJobCoordinator!!

Were you also able to spawn multiple instances that share partitions among
themselves using Zk?

 I don't see a great story for operators that need to, e.g., make
calls for each message or perform another blocking operation.

You are right. While the high-level API does not support async-calls yet,
you can use multi-threading to achieve parallelism.
Please set *job.container.threadpool.size* to your desired thread-pool
size. Ideally, you will have threads equal to the number of
tasks. Messages on different tasks will be processed concurrently by this
pool of threads.

Please note that while the API is multi-threaded, it is still synchronous -
ie.a message is delivered to a task after the previous
process() call returns for that task. This guarantees in-order delivery of
messages and allows one in-flight message per-task.
If you do not care about this guarantee, you can have multiple in-flight
messages per-task by configuring a higher value of

The section "Your Job Image" covers my remaining questions on the low-level

For the section on the low-level API, can you use
It basically creates a new StreamProcessor and runs it. Remember to provide
task.class and set it to your implementation of
StreamTask or AsyncStreamTask. Please note that this is an evolving API and
hence, subject to change.

The nice thing is that you will not need your *KubernetesJob* that wires up
your implementation to K8s.

Please let me know if this solution works for you. As an aside, it would be
great to have your example added to our
open-source code-base with a tutorial on how to use high-level API and
Kubernetes. I'd be
happy to help with design/code-reviews.

On Sun, Jan 28, 2018 at 12:17 PM, Tom Davis <> wrote:

As promised, here's the link to the repository:

The section "Your Job Image" covers my remaining questions on the
low-level API. We use Clojure on the backend, so I'm using that to
sanity-check the example high-level app and will update the example if
it turns out I made any goofs!

After looking at more code, I believe I better understand how the
high-level API functions: it basically makes StreamTask-equivalent
objects for every operator (map, etc.) which eventually get run by a JCL
(via Container) created by StreamProcessor which execute them in a
single-thread pool. There doesn't seem to be an `AsyncStreamTask`
equivalent for these operators, though. Although
`LocalApplicationRunner#createStreamProcessor` has the ability to handle
`AsyncTaskFactory`, `TaskFactoryUtil#createTaskFactory` only returns
`StreamTaskFactory` when passed a `StreamApplication`. The crux being: I
don't see a great story for operators that need to, e.g., make database
calls for each message or perform another blocking operation.

Any clarification on these two topics would be much appreciated!



Jagadish Venkatraman <> writes:


Hi Tom,

Thank you for your feedback on Samza's architecture. Pluggability has been
differentiator that has enabled us to support a wide range of use-cases -
from stand-alone
deployments to managed services, from streaming to batch inputs and
integrations with
various systems from Kafka, Kinesis, Azure to ElasticSearch.

Thanks for your ideas on integrating Samza and Kubernetes. Let me
intuition a bit more.

The following four aspects are key to running Samza with any environment.

1. Liveness detection/monitoring: This provides a means for discovering
currently available
processors in the group and discovering when a processor is no longer
running. The different
JC implementations we have rely on Zk, Yarn or AzureBlobStore for liveness

2. Partition-assignment/coordination: Once there is agreement on the
available processors,
this is just a matter of computing assignments.

Usually, (1) and (2) will require you to identify each processor and to
agree on the available
processors in the group. For example, when the ClusterBasedJC starts a
container, it
is assigned a durable ID.

3. Resource management: This focusses on whether you want your containers
to be
 managed / started by Samza itself or have something external to Samza
starts it. While
the former allows you to run a managed service, the latter allows for more
flexibility in your
deployment environments. We use both models heavily at LinkedIn.

As an example, the ClusterBasedJC requests resources from YARN and starts
containers itself. The ZkBasedJC assumes a more general deployment model
and allows
containers to be started externally and relies on Samza only for (1) and

4. Auto-scaling: Here again, you can build auto-scaling right into Samza
there's support
for resource management or do it externally.

Having said this, you can implement this integration with Kubernetes at
depending on how we choose to tackle the above aspects.

">> My intuition is that I need a JobCoordinator/Factory in the form of a
server that sets up Watches on the appropriate Kubernetes resources so
that when I perform the action in [4.1] *something* happens. "

This alternative does seem more complex. Hence, I would not go down this
path as
the first-step.

For a start, I would lean on the side of simplicity and recommend the
following solution:
- Configure your Samza job to leverage the existing ZkBasedJC.
- Start multiple instances of your job by running the ** script.

I believe Kubernetes
has good support for this as well.
- Configure Kubernetes to auto-scale your instances on-demand depending on
- As new instances join and leave, Samza will automatically re-distribute
among them.

Additionally, we would be thrilled if you could contribute your learnings
back to the
community - in the form of a blog-post / documentation to Samza itself on
running with

Please let us know should you need any further help. Here's an example to
get you started:


On Sat, Jan 27, 2018 at 8:54 AM, Tom Davis <>

Hi there! First off, thanks for the continued work on Samza -- I looked
into many DC/stream processors and Samza was a real standout with its
smart architecture and pluggable design.

I'm working on a custom StreamJob/Factory for running Samza jobs on
Kubernetes. Those two classes are pretty straight forward: I create a
Deployment in Kubernetes with the appropriate number of Pods (a number
<= the number of Kafka partitions I created the input topic with). Now
I'm moving onto what executes in the actual Docker containers and I'm a
bit confused.

My plan was to mirror as much as possible what the YarnJob does
which is setup an environment that will work with ``. However,
I don't need ClusterBasedJobCoordinator because Kubernetes is not an
offer-based resource negotiator; if the JobCoordinator is running it
means, by definition, it received the appropriate resources. So a
PassThroughJobCoordinator with appropriate main() method seemed like the
ticket. Unfortunately, the PTJC doesn't actually seem to *do* anything
-- unlike the CBJC which has a run-loop and presumably schedules
containers and the like.

I saw the preview documentation on flexible deployment, but it didn't
totally click for me. Perhaps because it was also my first introduction
to the high-level API? (I've just been writing StreamTask impls)

Here's a brief description of the workflow I'm envisioning, perhaps
someone could tell me the classes I should implement and what sorts of
containers I might need running in the environment to coordinate

1. I create a topic in Kafka with N partitions
2. I start a job configured to run N-X containers
 2.1. If my topic has 4 partitions and I have low load, I might want
      X to start at 3 so I only have 1 task instance
3. Samza is configured to send all partitions to task instance 1
4. Later, load increases.
 4.1. I use Kubernetes to scale the job to 4 pods/containers
 4.2. Samza re-configures such that the new containers receive work

My intuition is that I need a JobCoordinator/Factory in the form of a
server that sets up Watches on the appropriate Kubernetes resources so
that when I perform the action in [4.1] *something* happens. Or perhaps
I should use ZkJobCoordinator? Presumably as pods/containers come and go
they will cause changes in ZK that will trigger Task restarts or
whatever logic the coordinator employs?

Okay, I'll stop rambling now. Thanks in advance for any tips!

- Tom

Reply via email to