>> When using a KVSerde<String, Integer> on the input stream,
  deserializing messages always failed. I was producing messages with
  the console producer so maybe I just did it wrong?

>From the stack-trace, it looks you are getting a
"BufferUnderflowException".
This is usually caused when Samza tries to read 4-bytes from your serialized
message but there are fewer than 4 bytes remaining. This likely indicates a
malformed integer message written to Kafka.

You can verify this by consuming this topic from its oldest offsets using
a KafkaConsumer configured to use an IntegerSerde.

>> When attempting to use one topic as both input and output, the serdes
  need to match.

Would n't this create a loop in your job? Usually, it's safer to have
separate
topics for input and output. Additionally, it helps with better
manageability and
lower catch-up times if you are replaying/rewinding your inputs.

>> Including samza-log4j now includes samza-core_2.12 which causes
  runtime failures when directly using samza-core_2.11.

Usually, you'd want to compile with 2.11 or 2.12 version of your libraries
in your
class-path. Samza uses *scalac* for joint compilation of java and scala
sources -
So, it's possible that binaries compiled with 2.11 and 2.12 versions are not
drop-in substitutions for each-other.

>>  I receive warnings about PosixCommandBasedStatisticsGetter failing
  because `ps` returned null. I'm not sure if this is a
  misconfiguration on my part or if it's just not designed to be used
  inside a Docker container.

I'm not sure why the output of `ps`  was null for your environment. Is
there a way to
verify? Fwiw, I did run the command in my OS-X and RHEL-6 machines. If this
is spamming your logs and if it's not a misconfiguration, we can file a
JIRA and
address this. Let us know!


>> After fixing some bugs in my repo, I confirmed it!

This is awesome! Would you mind adding your Kubernetes example to
hello-samza?

We'd also love if you could contribute your README file in
https://github.com/sonian/samza-kubernetes
to the official Apache Samza documentation. I think it's a very helpful
reference and will
be helpful to other users in the community running Samza with K8s.

Best,
Jagdish


On Sun, Feb 4, 2018 at 8:34 PM, Tom Davis <t...@recursivedream.com> wrote:

> Thanks for all the follow-up, it definitely helped solidify my
> understanding of things a bit. Given that, I set about confirming for
> myself that running multiple instances of the StreamApplication in
> different Pods, provided an appropriate Partition count in Kafka, would
> result in different Containers processing messages.
>
> After fixing some bugs in my repo, I confirmed it! However, it did
> result in a few questions/possible issues I found.
>
> 1. When using a KVSerde<String, Integer> on the input stream,
>   deserializing messages always failed. I was producing messages with
>   the console producer so maybe I just did it wrong? I thought I set
>   the appropriate --property flags. Sending String-only messages with
>   a StringSerde worked fine. Details in the stack trace[1].
>
> 2. When attempting to use one topic as both input and output, the serdes
>   need to match. However, a non-KVSerde never seems to match itself;
>   that is, `getInputStream()` and `getOutputStream()` both passed the
>   same topic and a StringSerde still fail the precondition. Perhaps
>   this is by design.
>
> 3. Including samza-log4j now includes samza-core_2.12 which causes
>   runtime failures when directly using samza-core_2.11. I only mention
>   this because I cribbed most of my dependencies from hello-samza.
>
> 4. I receive warnings about PosixCommandBasedStatisticsGetter failing
>   because `ps` returned null. I'm not sure if this is a
>   misconfiguration on my part or if it's just not designed to be used
>   inside a Docker container.
>
> I'm sure I'll have more as time goes on. Thanks for all the help so far!
>
> [1]: https://gist.github.com/tdavis/3398dc66fecb948eca53af0142a0c331
>
> Jagadish Venkatraman <jagadish1...@gmail.com> writes:
>
> Hey Tom,
>>
>> As promised, here's the link to the repository: https://github.
>>>>
>>> com/sonian/samza-kubernetes
>>
>> I just reviewed your repo for Kubernetes integration. Really nice work on
>> integrating the high-level API and Kubernetes with
>> the ZkJobCoordinator!!
>>
>> Were you also able to spawn multiple instances that share partitions among
>> themselves using Zk?
>>
>>  I don't see a great story for operators that need to, e.g., make
>>>>
>>> database
>> calls for each message or perform another blocking operation.
>>
>> You are right. While the high-level API does not support async-calls yet,
>> you can use multi-threading to achieve parallelism.
>> Please set *job.container.threadpool.size* to your desired thread-pool
>> size. Ideally, you will have threads equal to the number of
>> tasks. Messages on different tasks will be processed concurrently by this
>> pool of threads.
>>
>> Please note that while the API is multi-threaded, it is still synchronous
>> -
>> ie.a message is delivered to a task after the previous
>> process() call returns for that task. This guarantees in-order delivery of
>> messages and allows one in-flight message per-task.
>> If you do not care about this guarantee, you can have multiple in-flight
>> messages per-task by configuring a higher value of
>> task.max.concurrency.
>>
>> The section "Your Job Image" covers my remaining questions on the
>>>> low-level
>>>>
>>> API.
>>
>> For the section on the low-level API, can you use
>> LocalApplicationRunner#runTask()?
>> It basically creates a new StreamProcessor and runs it. Remember to
>> provide
>> task.class and set it to your implementation of
>> StreamTask or AsyncStreamTask. Please note that this is an evolving API
>> and
>> hence, subject to change.
>>
>> The nice thing is that you will not need your *KubernetesJob* that wires
>> up
>>
>> your implementation to K8s.
>>
>> Please let me know if this solution works for you. As an aside, it would
>> be
>> great to have your example added to our
>> open-source code-base with a tutorial on how to use high-level API and
>> Kubernetes. I'd be
>> happy to help with design/code-reviews.
>>
>> On Sun, Jan 28, 2018 at 12:17 PM, Tom Davis <t...@recursivedream.com>
>> wrote:
>>
>> As promised, here's the link to the repository:
>>>
>>> https://github.com/sonian/samza-kubernetes
>>>
>>> The section "Your Job Image" covers my remaining questions on the
>>> low-level API. We use Clojure on the backend, so I'm using that to
>>> sanity-check the example high-level app and will update the example if
>>> it turns out I made any goofs!
>>>
>>> After looking at more code, I believe I better understand how the
>>> high-level API functions: it basically makes StreamTask-equivalent
>>> objects for every operator (map, etc.) which eventually get run by a JCL
>>> (via Container) created by StreamProcessor which execute them in a
>>> single-thread pool. There doesn't seem to be an `AsyncStreamTask`
>>> equivalent for these operators, though. Although
>>> `LocalApplicationRunner#createStreamProcessor` has the ability to handle
>>> `AsyncTaskFactory`, `TaskFactoryUtil#createTaskFactory` only returns
>>> `StreamTaskFactory` when passed a `StreamApplication`. The crux being: I
>>> don't see a great story for operators that need to, e.g., make database
>>> calls for each message or perform another blocking operation.
>>>
>>> Any clarification on these two topics would be much appreciated!
>>>
>>>
>>> Thanks,
>>>
>>> Tom
>>>
>>> Jagadish Venkatraman <jagadish1...@gmail.com> writes:
>>>
>>> +Yi
>>>
>>>>
>>>> Hi Tom,
>>>>
>>>> Thank you for your feedback on Samza's architecture. Pluggability has
>>>> been
>>>> a
>>>> differentiator that has enabled us to support a wide range of use-cases
>>>> -
>>>> from stand-alone
>>>> deployments to managed services, from streaming to batch inputs and
>>>> integrations with
>>>> various systems from Kafka, Kinesis, Azure to ElasticSearch.
>>>>
>>>> Thanks for your ideas on integrating Samza and Kubernetes. Let me
>>>> formalize
>>>> your
>>>> intuition a bit more.
>>>>
>>>> The following four aspects are key to running Samza with any
>>>> environment.
>>>>
>>>> 1. Liveness detection/monitoring: This provides a means for discovering
>>>> the
>>>> currently available
>>>> processors in the group and discovering when a processor is no longer
>>>> running. The different
>>>> JC implementations we have rely on Zk, Yarn or AzureBlobStore for
>>>> liveness
>>>> detection.
>>>>
>>>> 2. Partition-assignment/coordination: Once there is agreement on the
>>>> available processors,
>>>> this is just a matter of computing assignments.
>>>>
>>>> Usually, (1) and (2) will require you to identify each processor and to
>>>> agree on the available
>>>> processors in the group. For example, when the ClusterBasedJC starts a
>>>> container, it
>>>> is assigned a durable ID.
>>>>
>>>> 3. Resource management: This focusses on whether you want your
>>>> containers
>>>> to be
>>>>  managed / started by Samza itself or have something external to Samza
>>>> that
>>>> starts it. While
>>>> the former allows you to run a managed service, the latter allows for
>>>> more
>>>> flexibility in your
>>>> deployment environments. We use both models heavily at LinkedIn.
>>>>
>>>> As an example, the ClusterBasedJC requests resources from YARN and
>>>> starts
>>>> the
>>>> containers itself. The ZkBasedJC assumes a more general deployment model
>>>> and allows
>>>> containers to be started externally and relies on Samza only for (1) and
>>>> (2).
>>>>
>>>> 4. Auto-scaling: Here again, you can build auto-scaling right into Samza
>>>> if
>>>> there's support
>>>> for resource management or do it externally.
>>>>
>>>> Having said this, you can implement this integration with Kubernetes at
>>>> multiple-levels
>>>> depending on how we choose to tackle the above aspects.
>>>>
>>>> ">> My intuition is that I need a JobCoordinator/Factory in the form of
>>>> a
>>>> server that sets up Watches on the appropriate Kubernetes resources so
>>>> that when I perform the action in [4.1] *something* happens. "
>>>>
>>>> This alternative does seem more complex. Hence, I would not go down this
>>>> path as
>>>> the first-step.
>>>>
>>>> For a start, I would lean on the side of simplicity and recommend the
>>>> following solution:
>>>> - Configure your Samza job to leverage the existing ZkBasedJC.
>>>> - Start multiple instances of your job by running the *run-app.sh*
>>>> script.
>>>>
>>>> I believe Kubernetes
>>>> has good support for this as well.
>>>> - Configure Kubernetes to auto-scale your instances on-demand depending
>>>> on
>>>> load.
>>>> - As new instances join and leave, Samza will automatically
>>>> re-distribute
>>>> partitions
>>>> among them.
>>>>
>>>> Additionally, we would be thrilled if you could contribute your
>>>> learnings
>>>> back to the
>>>> community - in the form of a blog-post / documentation to Samza itself
>>>> on
>>>> running with
>>>> Kubernetes.
>>>>
>>>> Please let us know should you need any further help. Here's an example
>>>> to
>>>> get you started:
>>>> https://github.com/apache/samza-hello-samza/tree/master/src/
>>>> main/java/samza/examples/wikipedia/application
>>>>
>>>> Best,
>>>> Jagdish
>>>>
>>>> On Sat, Jan 27, 2018 at 8:54 AM, Tom Davis <t...@recursivedream.com>
>>>> wrote:
>>>>
>>>> Hi there! First off, thanks for the continued work on Samza -- I looked
>>>>
>>>>> into many DC/stream processors and Samza was a real standout with its
>>>>> smart architecture and pluggable design.
>>>>>
>>>>> I'm working on a custom StreamJob/Factory for running Samza jobs on
>>>>> Kubernetes. Those two classes are pretty straight forward: I create a
>>>>> Deployment in Kubernetes with the appropriate number of Pods (a number
>>>>> <= the number of Kafka partitions I created the input topic with). Now
>>>>> I'm moving onto what executes in the actual Docker containers and I'm a
>>>>> bit confused.
>>>>>
>>>>> My plan was to mirror as much as possible what the YarnJob does
>>>>> which is setup an environment that will work with `run-jc.sh`. However,
>>>>> I don't need ClusterBasedJobCoordinator because Kubernetes is not an
>>>>> offer-based resource negotiator; if the JobCoordinator is running it
>>>>> means, by definition, it received the appropriate resources. So a
>>>>> PassThroughJobCoordinator with appropriate main() method seemed like
>>>>> the
>>>>> ticket. Unfortunately, the PTJC doesn't actually seem to *do* anything
>>>>> -- unlike the CBJC which has a run-loop and presumably schedules
>>>>> containers and the like.
>>>>>
>>>>> I saw the preview documentation on flexible deployment, but it didn't
>>>>> totally click for me. Perhaps because it was also my first introduction
>>>>> to the high-level API? (I've just been writing StreamTask impls)
>>>>>
>>>>> Here's a brief description of the workflow I'm envisioning, perhaps
>>>>> someone could tell me the classes I should implement and what sorts of
>>>>> containers I might need running in the environment to coordinate
>>>>> everything?
>>>>>
>>>>> 1. I create a topic in Kafka with N partitions
>>>>> 2. I start a job configured to run N-X containers
>>>>>  2.1. If my topic has 4 partitions and I have low load, I might want
>>>>>       X to start at 3 so I only have 1 task instance
>>>>> 3. Samza is configured to send all partitions to task instance 1
>>>>> 4. Later, load increases.
>>>>>  4.1. I use Kubernetes to scale the job to 4 pods/containers
>>>>>  4.2. Samza re-configures such that the new containers receive work
>>>>>
>>>>> My intuition is that I need a JobCoordinator/Factory in the form of a
>>>>> server that sets up Watches on the appropriate Kubernetes resources so
>>>>> that when I perform the action in [4.1] *something* happens. Or perhaps
>>>>> I should use ZkJobCoordinator? Presumably as pods/containers come and
>>>>> go
>>>>> they will cause changes in ZK that will trigger Task restarts or
>>>>> whatever logic the coordinator employs?
>>>>>
>>>>> Okay, I'll stop rambling now. Thanks in advance for any tips!
>>>>>
>>>>> - Tom
>>>>>
>>>>>
>>>>>


-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University

Reply via email to