>> When using a KVSerde<String, Integer> on the input stream, deserializing messages always failed. I was producing messages with the console producer so maybe I just did it wrong?
>From the stack-trace, it looks you are getting a "BufferUnderflowException". This is usually caused when Samza tries to read 4-bytes from your serialized message but there are fewer than 4 bytes remaining. This likely indicates a malformed integer message written to Kafka. You can verify this by consuming this topic from its oldest offsets using a KafkaConsumer configured to use an IntegerSerde. >> When attempting to use one topic as both input and output, the serdes need to match. Would n't this create a loop in your job? Usually, it's safer to have separate topics for input and output. Additionally, it helps with better manageability and lower catch-up times if you are replaying/rewinding your inputs. >> Including samza-log4j now includes samza-core_2.12 which causes runtime failures when directly using samza-core_2.11. Usually, you'd want to compile with 2.11 or 2.12 version of your libraries in your class-path. Samza uses *scalac* for joint compilation of java and scala sources - So, it's possible that binaries compiled with 2.11 and 2.12 versions are not drop-in substitutions for each-other. >> I receive warnings about PosixCommandBasedStatisticsGetter failing because `ps` returned null. I'm not sure if this is a misconfiguration on my part or if it's just not designed to be used inside a Docker container. I'm not sure why the output of `ps` was null for your environment. Is there a way to verify? Fwiw, I did run the command in my OS-X and RHEL-6 machines. If this is spamming your logs and if it's not a misconfiguration, we can file a JIRA and address this. Let us know! >> After fixing some bugs in my repo, I confirmed it! This is awesome! Would you mind adding your Kubernetes example to hello-samza? We'd also love if you could contribute your README file in https://github.com/sonian/samza-kubernetes to the official Apache Samza documentation. I think it's a very helpful reference and will be helpful to other users in the community running Samza with K8s. Best, Jagdish On Sun, Feb 4, 2018 at 8:34 PM, Tom Davis <t...@recursivedream.com> wrote: > Thanks for all the follow-up, it definitely helped solidify my > understanding of things a bit. Given that, I set about confirming for > myself that running multiple instances of the StreamApplication in > different Pods, provided an appropriate Partition count in Kafka, would > result in different Containers processing messages. > > After fixing some bugs in my repo, I confirmed it! However, it did > result in a few questions/possible issues I found. > > 1. When using a KVSerde<String, Integer> on the input stream, > deserializing messages always failed. I was producing messages with > the console producer so maybe I just did it wrong? I thought I set > the appropriate --property flags. Sending String-only messages with > a StringSerde worked fine. Details in the stack trace[1]. > > 2. When attempting to use one topic as both input and output, the serdes > need to match. However, a non-KVSerde never seems to match itself; > that is, `getInputStream()` and `getOutputStream()` both passed the > same topic and a StringSerde still fail the precondition. Perhaps > this is by design. > > 3. Including samza-log4j now includes samza-core_2.12 which causes > runtime failures when directly using samza-core_2.11. I only mention > this because I cribbed most of my dependencies from hello-samza. > > 4. I receive warnings about PosixCommandBasedStatisticsGetter failing > because `ps` returned null. I'm not sure if this is a > misconfiguration on my part or if it's just not designed to be used > inside a Docker container. > > I'm sure I'll have more as time goes on. Thanks for all the help so far! > > [1]: https://gist.github.com/tdavis/3398dc66fecb948eca53af0142a0c331 > > Jagadish Venkatraman <jagadish1...@gmail.com> writes: > > Hey Tom, >> >> As promised, here's the link to the repository: https://github. >>>> >>> com/sonian/samza-kubernetes >> >> I just reviewed your repo for Kubernetes integration. Really nice work on >> integrating the high-level API and Kubernetes with >> the ZkJobCoordinator!! >> >> Were you also able to spawn multiple instances that share partitions among >> themselves using Zk? >> >> I don't see a great story for operators that need to, e.g., make >>>> >>> database >> calls for each message or perform another blocking operation. >> >> You are right. While the high-level API does not support async-calls yet, >> you can use multi-threading to achieve parallelism. >> Please set *job.container.threadpool.size* to your desired thread-pool >> size. Ideally, you will have threads equal to the number of >> tasks. Messages on different tasks will be processed concurrently by this >> pool of threads. >> >> Please note that while the API is multi-threaded, it is still synchronous >> - >> ie.a message is delivered to a task after the previous >> process() call returns for that task. This guarantees in-order delivery of >> messages and allows one in-flight message per-task. >> If you do not care about this guarantee, you can have multiple in-flight >> messages per-task by configuring a higher value of >> task.max.concurrency. >> >> The section "Your Job Image" covers my remaining questions on the >>>> low-level >>>> >>> API. >> >> For the section on the low-level API, can you use >> LocalApplicationRunner#runTask()? >> It basically creates a new StreamProcessor and runs it. Remember to >> provide >> task.class and set it to your implementation of >> StreamTask or AsyncStreamTask. Please note that this is an evolving API >> and >> hence, subject to change. >> >> The nice thing is that you will not need your *KubernetesJob* that wires >> up >> >> your implementation to K8s. >> >> Please let me know if this solution works for you. As an aside, it would >> be >> great to have your example added to our >> open-source code-base with a tutorial on how to use high-level API and >> Kubernetes. I'd be >> happy to help with design/code-reviews. >> >> On Sun, Jan 28, 2018 at 12:17 PM, Tom Davis <t...@recursivedream.com> >> wrote: >> >> As promised, here's the link to the repository: >>> >>> https://github.com/sonian/samza-kubernetes >>> >>> The section "Your Job Image" covers my remaining questions on the >>> low-level API. We use Clojure on the backend, so I'm using that to >>> sanity-check the example high-level app and will update the example if >>> it turns out I made any goofs! >>> >>> After looking at more code, I believe I better understand how the >>> high-level API functions: it basically makes StreamTask-equivalent >>> objects for every operator (map, etc.) which eventually get run by a JCL >>> (via Container) created by StreamProcessor which execute them in a >>> single-thread pool. There doesn't seem to be an `AsyncStreamTask` >>> equivalent for these operators, though. Although >>> `LocalApplicationRunner#createStreamProcessor` has the ability to handle >>> `AsyncTaskFactory`, `TaskFactoryUtil#createTaskFactory` only returns >>> `StreamTaskFactory` when passed a `StreamApplication`. The crux being: I >>> don't see a great story for operators that need to, e.g., make database >>> calls for each message or perform another blocking operation. >>> >>> Any clarification on these two topics would be much appreciated! >>> >>> >>> Thanks, >>> >>> Tom >>> >>> Jagadish Venkatraman <jagadish1...@gmail.com> writes: >>> >>> +Yi >>> >>>> >>>> Hi Tom, >>>> >>>> Thank you for your feedback on Samza's architecture. Pluggability has >>>> been >>>> a >>>> differentiator that has enabled us to support a wide range of use-cases >>>> - >>>> from stand-alone >>>> deployments to managed services, from streaming to batch inputs and >>>> integrations with >>>> various systems from Kafka, Kinesis, Azure to ElasticSearch. >>>> >>>> Thanks for your ideas on integrating Samza and Kubernetes. Let me >>>> formalize >>>> your >>>> intuition a bit more. >>>> >>>> The following four aspects are key to running Samza with any >>>> environment. >>>> >>>> 1. Liveness detection/monitoring: This provides a means for discovering >>>> the >>>> currently available >>>> processors in the group and discovering when a processor is no longer >>>> running. The different >>>> JC implementations we have rely on Zk, Yarn or AzureBlobStore for >>>> liveness >>>> detection. >>>> >>>> 2. Partition-assignment/coordination: Once there is agreement on the >>>> available processors, >>>> this is just a matter of computing assignments. >>>> >>>> Usually, (1) and (2) will require you to identify each processor and to >>>> agree on the available >>>> processors in the group. For example, when the ClusterBasedJC starts a >>>> container, it >>>> is assigned a durable ID. >>>> >>>> 3. Resource management: This focusses on whether you want your >>>> containers >>>> to be >>>> managed / started by Samza itself or have something external to Samza >>>> that >>>> starts it. While >>>> the former allows you to run a managed service, the latter allows for >>>> more >>>> flexibility in your >>>> deployment environments. We use both models heavily at LinkedIn. >>>> >>>> As an example, the ClusterBasedJC requests resources from YARN and >>>> starts >>>> the >>>> containers itself. The ZkBasedJC assumes a more general deployment model >>>> and allows >>>> containers to be started externally and relies on Samza only for (1) and >>>> (2). >>>> >>>> 4. Auto-scaling: Here again, you can build auto-scaling right into Samza >>>> if >>>> there's support >>>> for resource management or do it externally. >>>> >>>> Having said this, you can implement this integration with Kubernetes at >>>> multiple-levels >>>> depending on how we choose to tackle the above aspects. >>>> >>>> ">> My intuition is that I need a JobCoordinator/Factory in the form of >>>> a >>>> server that sets up Watches on the appropriate Kubernetes resources so >>>> that when I perform the action in [4.1] *something* happens. " >>>> >>>> This alternative does seem more complex. Hence, I would not go down this >>>> path as >>>> the first-step. >>>> >>>> For a start, I would lean on the side of simplicity and recommend the >>>> following solution: >>>> - Configure your Samza job to leverage the existing ZkBasedJC. >>>> - Start multiple instances of your job by running the *run-app.sh* >>>> script. >>>> >>>> I believe Kubernetes >>>> has good support for this as well. >>>> - Configure Kubernetes to auto-scale your instances on-demand depending >>>> on >>>> load. >>>> - As new instances join and leave, Samza will automatically >>>> re-distribute >>>> partitions >>>> among them. >>>> >>>> Additionally, we would be thrilled if you could contribute your >>>> learnings >>>> back to the >>>> community - in the form of a blog-post / documentation to Samza itself >>>> on >>>> running with >>>> Kubernetes. >>>> >>>> Please let us know should you need any further help. Here's an example >>>> to >>>> get you started: >>>> https://github.com/apache/samza-hello-samza/tree/master/src/ >>>> main/java/samza/examples/wikipedia/application >>>> >>>> Best, >>>> Jagdish >>>> >>>> On Sat, Jan 27, 2018 at 8:54 AM, Tom Davis <t...@recursivedream.com> >>>> wrote: >>>> >>>> Hi there! First off, thanks for the continued work on Samza -- I looked >>>> >>>>> into many DC/stream processors and Samza was a real standout with its >>>>> smart architecture and pluggable design. >>>>> >>>>> I'm working on a custom StreamJob/Factory for running Samza jobs on >>>>> Kubernetes. Those two classes are pretty straight forward: I create a >>>>> Deployment in Kubernetes with the appropriate number of Pods (a number >>>>> <= the number of Kafka partitions I created the input topic with). Now >>>>> I'm moving onto what executes in the actual Docker containers and I'm a >>>>> bit confused. >>>>> >>>>> My plan was to mirror as much as possible what the YarnJob does >>>>> which is setup an environment that will work with `run-jc.sh`. However, >>>>> I don't need ClusterBasedJobCoordinator because Kubernetes is not an >>>>> offer-based resource negotiator; if the JobCoordinator is running it >>>>> means, by definition, it received the appropriate resources. So a >>>>> PassThroughJobCoordinator with appropriate main() method seemed like >>>>> the >>>>> ticket. Unfortunately, the PTJC doesn't actually seem to *do* anything >>>>> -- unlike the CBJC which has a run-loop and presumably schedules >>>>> containers and the like. >>>>> >>>>> I saw the preview documentation on flexible deployment, but it didn't >>>>> totally click for me. Perhaps because it was also my first introduction >>>>> to the high-level API? (I've just been writing StreamTask impls) >>>>> >>>>> Here's a brief description of the workflow I'm envisioning, perhaps >>>>> someone could tell me the classes I should implement and what sorts of >>>>> containers I might need running in the environment to coordinate >>>>> everything? >>>>> >>>>> 1. I create a topic in Kafka with N partitions >>>>> 2. I start a job configured to run N-X containers >>>>> 2.1. If my topic has 4 partitions and I have low load, I might want >>>>> X to start at 3 so I only have 1 task instance >>>>> 3. Samza is configured to send all partitions to task instance 1 >>>>> 4. Later, load increases. >>>>> 4.1. I use Kubernetes to scale the job to 4 pods/containers >>>>> 4.2. Samza re-configures such that the new containers receive work >>>>> >>>>> My intuition is that I need a JobCoordinator/Factory in the form of a >>>>> server that sets up Watches on the appropriate Kubernetes resources so >>>>> that when I perform the action in [4.1] *something* happens. Or perhaps >>>>> I should use ZkJobCoordinator? Presumably as pods/containers come and >>>>> go >>>>> they will cause changes in ZK that will trigger Task restarts or >>>>> whatever logic the coordinator employs? >>>>> >>>>> Okay, I'll stop rambling now. Thanks in advance for any tips! >>>>> >>>>> - Tom >>>>> >>>>> >>>>> -- Jagadish V, Graduate Student, Department of Computer Science, Stanford University